/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.kudu;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.security.auth.login.LoginException;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.OperationResponse;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.RowError;
import org.apache.kudu.client.SessionConfiguration;
import org.apache.kudu.client.Upsert;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.kudu.OperationType;
import org.apache.nifi.security.krb.KerberosAction;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSet;

@EventDriven
@SupportsBatching
@RequiresInstanceClassLoading
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"put", "database", "NoSQL", "kudu", "HDFS", "record"})
@CapabilityDescription(value="Reads records from an incoming FlowFile using the provided Record Reader, and writes those records to the specified Kudu's table. The schema for the table must be provided in the processor properties or from your source. If any error occurs while reading records from the input, or writing records to Kudu, the FlowFile will be routed to failure")
@WritesAttribute(attribute="record.count", description="Number of records written to Kudu")
public class PutKudu
extends AbstractProcessor {
    protected static final PropertyDescriptor KUDU_MASTERS = new PropertyDescriptor.Builder().name("Kudu Masters").description("List all kudu masters's ip with port (e.g. 7051), comma separated").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder().name("Table Name").description("The name of the Kudu Table to put data into").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder().name("kerberos-credentials-service").displayName("Kerberos Credentials Service").description("Specifies the Kerberos Credentials to use for authentication").required(false).identifiesControllerService(KerberosCredentialsService.class).build();
    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("The service for reading records from incoming flow files.").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    protected static final PropertyDescriptor SKIP_HEAD_LINE = new PropertyDescriptor.Builder().name("Skip head line").description("Deprecated. Used to ignore header lines, but this should be handled by a RecordReader (e.g. \"Treat First Line as Header\" property of CSVReader)").allowableValues(new String[]{"true", "false"}).defaultValue("false").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    protected static final PropertyDescriptor INSERT_OPERATION = new PropertyDescriptor.Builder().name("Insert Operation").description("Specify operationType for this processor. Insert-Ignore will ignore duplicated rows").allowableValues((Enum[])OperationType.values()).defaultValue(OperationType.INSERT.toString()).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    protected static final PropertyDescriptor FLUSH_MODE = new PropertyDescriptor.Builder().name("Flush Mode").description("Set the new flush mode for a kudu session.\nAUTO_FLUSH_SYNC: the call returns when the operation is persisted, else it throws an exception.\nAUTO_FLUSH_BACKGROUND: the call returns when the operation has been added to the buffer. This call should normally perform only fast in-memory operations but it may have to wait when the buffer is full and there's another buffer being flushed.\nMANUAL_FLUSH: the call returns when the operation has been added to the buffer, else it throws a KuduException if the buffer is full.").allowableValues((Enum[])SessionConfiguration.FlushMode.values()).defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND.toString()).required(true).build();
    protected static final PropertyDescriptor FLOWFILE_BATCH_SIZE = new PropertyDescriptor.Builder().name("FlowFiles per Batch").description("The maximum number of FlowFiles to process in a single execution, between 1 - 100000. Depending on your memory size, and data size per row set an appropriate batch size for the number of FlowFiles to process per client connection setup.Gradually increase this number, only if your FlowFiles typically contain a few records.").defaultValue("1").required(true).addValidator(StandardValidators.createLongValidator((long)1L, (long)100000L, (boolean)true)).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("Batch Size").displayName("Max Records per Batch").description("The maximum number of Records to process in a single Kudu-client batch, between 1 - 100000. Depending on your memory size, and data size per row set an appropriate batch size. Gradually increase this number to find out the best one for best performances.").defaultValue("100").required(true).addValidator(StandardValidators.createLongValidator((long)1L, (long)100000L, (boolean)true)).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    protected static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("A FlowFile is routed to this relationship after it has been successfully stored in Kudu").build();
    protected static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("A FlowFile is routed to this relationship if it cannot be sent to Kudu").build();
    public static final String RECORD_COUNT_ATTR = "record.count";
    protected OperationType operationType;
    protected SessionConfiguration.FlushMode flushMode;
    protected int batchSize = 100;
    protected int ffbatch = 1;
    protected KuduClient kuduClient;
    protected KuduTable kuduTable;
    private volatile KerberosUser kerberosUser;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(KUDU_MASTERS);
        properties.add(TABLE_NAME);
        properties.add(KERBEROS_CREDENTIALS_SERVICE);
        properties.add(SKIP_HEAD_LINE);
        properties.add(RECORD_READER);
        properties.add(INSERT_OPERATION);
        properties.add(FLUSH_MODE);
        properties.add(FLOWFILE_BATCH_SIZE);
        properties.add(BATCH_SIZE);
        return properties;
    }

    public Set<Relationship> getRelationships() {
        HashSet<Relationship> rels = new HashSet<Relationship>();
        rels.add(REL_SUCCESS);
        rels.add(REL_FAILURE);
        return rels;
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) throws IOException, LoginException {
        String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
        String kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
        this.operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
        this.batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
        this.ffbatch = context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
        this.flushMode = SessionConfiguration.FlushMode.valueOf((String)context.getProperty(FLUSH_MODE).getValue());
        this.getLogger().debug("Setting up Kudu connection...");
        KerberosCredentialsService credentialsService = (KerberosCredentialsService)context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
        this.kuduClient = this.createClient(kuduMasters, credentialsService);
        this.kuduTable = this.kuduClient.openTable(tableName);
        this.getLogger().debug("Kudu connection successfully initialized");
    }

    protected KuduClient createClient(String masters, KerberosCredentialsService credentialsService) throws LoginException {
        if (credentialsService == null) {
            return this.buildClient(masters);
        }
        String keytab = credentialsService.getKeytab();
        String principal = credentialsService.getPrincipal();
        this.kerberosUser = this.loginKerberosUser(principal, keytab);
        KerberosAction kerberosAction = new KerberosAction(this.kerberosUser, () -> this.buildClient(masters), this.getLogger());
        return (KuduClient)kerberosAction.execute();
    }

    protected KuduClient buildClient(String masters) {
        return new KuduClient.KuduClientBuilder(masters).build();
    }

    protected KerberosUser loginKerberosUser(String principal, String keytab) throws LoginException {
        KerberosKeytabUser kerberosUser = new KerberosKeytabUser(principal, keytab);
        kerberosUser.login();
        return kerberosUser;
    }

    @OnStopped
    public final void closeClient() throws KuduException, LoginException {
        try {
            if (this.kuduClient != null) {
                this.getLogger().debug("Closing KuduClient");
                this.kuduClient.close();
                this.kuduClient = null;
            }
        }
        finally {
            if (this.kerberosUser != null) {
                this.kerberosUser.logout();
                this.kerberosUser = null;
            }
        }
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        List flowFiles = session.get(this.ffbatch);
        if (flowFiles.isEmpty()) {
            return;
        }
        KerberosUser user = this.kerberosUser;
        if (user == null) {
            this.trigger(context, session, flowFiles);
            return;
        }
        PrivilegedExceptionAction<Void> privelegedAction = () -> {
            this.trigger(context, session, flowFiles);
            return null;
        };
        KerberosAction action = new KerberosAction(user, privelegedAction, this.getLogger());
        action.execute();
    }

    private void trigger(ProcessContext context, ProcessSession session, List<FlowFile> flowFiles) throws ProcessException {
        KuduSession kuduSession = this.getKuduSession(this.kuduClient);
        RecordReaderFactory recordReaderFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        HashMap<FlowFile, Integer> numRecords = new HashMap<FlowFile, Integer>();
        HashMap<FlowFile, Object> flowFileFailures = new HashMap<FlowFile, Object>();
        HashMap<Upsert, FlowFile> operationFlowFileMap = new HashMap<Upsert, FlowFile>();
        int numBuffered = 0;
        ArrayList<RowError> pendingRowErrors = new ArrayList<RowError>();
        block22: for (FlowFile flowFile : flowFiles) {
            try {
                InputStream in = session.read(flowFile);
                Object object = null;
                try {
                    RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, this.getLogger());
                    Throwable throwable = null;
                    try {
                        List fieldNames = recordReader.getSchema().getFieldNames();
                        RecordSet recordSet = recordReader.createRecordSet();
                        Record record = recordSet.next();
                        while (record != null) {
                            OperationResponse response;
                            Upsert operation = this.operationType == OperationType.UPSERT ? this.upsertRecordToKudu(this.kuduTable, record, fieldNames) : this.insertRecordToKudu(this.kuduTable, record, fieldNames);
                            operationFlowFileMap.put(operation, flowFile);
                            if (numBuffered == this.batchSize && this.flushMode == SessionConfiguration.FlushMode.MANUAL_FLUSH) {
                                numBuffered = 0;
                                this.flushKuduSession(kuduSession, false, pendingRowErrors);
                            }
                            if ((response = kuduSession.apply((Operation)operation)) != null && response.hasRowError()) {
                                flowFileFailures.put(flowFile, response.getRowError());
                                continue block22;
                            }
                            ++numBuffered;
                            numRecords.merge(flowFile, 1, Integer::sum);
                            record = recordSet.next();
                        }
                    }
                    catch (Throwable fieldNames) {
                        throwable = fieldNames;
                        throw fieldNames;
                    }
                    finally {
                        if (recordReader == null) continue;
                        if (throwable != null) {
                            try {
                                recordReader.close();
                            }
                            catch (Throwable fieldNames) {
                                throwable.addSuppressed(fieldNames);
                            }
                            continue;
                        }
                        recordReader.close();
                    }
                }
                catch (Throwable recordReader) {
                    object = recordReader;
                    throw recordReader;
                }
                finally {
                    if (in == null) continue;
                    if (object != null) {
                        try {
                            in.close();
                        }
                        catch (Throwable recordReader) {
                            ((Throwable)object).addSuppressed(recordReader);
                        }
                        continue;
                    }
                    in.close();
                }
            }
            catch (Exception ex) {
                flowFileFailures.put(flowFile, ex);
            }
        }
        if (numBuffered > 0) {
            try {
                this.flushKuduSession(kuduSession, true, pendingRowErrors);
            }
            catch (Exception e2) {
                this.getLogger().error("Failed to flush/close Kudu Session", (Throwable)e2);
                for (FlowFile flowFile : flowFiles) {
                    session.transfer(flowFile, REL_FAILURE);
                }
                return;
            }
        }
        Map<FlowFile, List<RowError>> flowFileRowErrors = pendingRowErrors.stream().collect(Collectors.groupingBy(e -> (FlowFile)operationFlowFileMap.get(e.getOperation())));
        long l = 0L;
        for (FlowFile flowFile : flowFiles) {
            int count = numRecords.getOrDefault(flowFile, 0);
            l += (long)count;
            List<RowError> rowErrors = flowFileRowErrors.get(flowFile);
            if (rowErrors != null) {
                rowErrors.forEach(rowError -> this.getLogger().error("Failed to write due to {}", new Object[]{rowError.toString()}));
                session.putAttribute(flowFile, RECORD_COUNT_ATTR, String.valueOf(count - rowErrors.size()));
                session.transfer(flowFile, REL_FAILURE);
                continue;
            }
            session.putAttribute(flowFile, RECORD_COUNT_ATTR, String.valueOf(count));
            if (flowFileFailures.containsKey(flowFile)) {
                this.getLogger().error("Failed to write due to {}", new Object[]{flowFileFailures.get(flowFile)});
                session.transfer(flowFile, REL_FAILURE);
                continue;
            }
            session.transfer(flowFile, REL_SUCCESS);
            session.getProvenanceReporter().send(flowFile, "Successfully added FlowFile to Kudu");
        }
        session.adjustCounter("Records Inserted", l, false);
    }

    protected KuduSession getKuduSession(KuduClient client) {
        KuduSession kuduSession = client.newSession();
        kuduSession.setMutationBufferSpace(this.batchSize);
        kuduSession.setFlushMode(this.flushMode);
        if (this.operationType == OperationType.INSERT_IGNORE) {
            kuduSession.setIgnoreAllDuplicateRows(true);
        }
        return kuduSession;
    }

    private void flushKuduSession(KuduSession kuduSession, boolean close, List<RowError> rowErrors) throws KuduException {
        List responses;
        List list = responses = close ? kuduSession.close() : kuduSession.flush();
        if (kuduSession.getFlushMode() == SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
            rowErrors.addAll(Arrays.asList(kuduSession.getPendingErrors().getRowErrors()));
        } else {
            responses.stream().filter(OperationResponse::hasRowError).map(OperationResponse::getRowError).forEach(rowErrors::add);
        }
    }

    protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) {
        Upsert upsert = kuduTable.newUpsert();
        this.buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record, fieldNames);
        return upsert;
    }

    protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) {
        Insert insert = kuduTable.newInsert();
        this.buildPartialRow(kuduTable.getSchema(), insert.getRow(), record, fieldNames);
        return insert;
    }

    @VisibleForTesting
    void buildPartialRow(Schema schema, PartialRow row, Record record, List<String> fieldNames) {
        block12: for (String colName : fieldNames) {
            int colIdx = this.getColumnIndex(schema, colName);
            if (colIdx == -1) continue;
            ColumnSchema colSchema = schema.getColumnByIndex(colIdx);
            Type colType = colSchema.getType();
            if (record.getValue(colName) == null) {
                row.setNull(colName);
                continue;
            }
            switch (colType.getDataType(colSchema.getTypeAttributes())) {
                case BOOL: {
                    row.addBoolean(colIdx, record.getAsBoolean(colName).booleanValue());
                    continue block12;
                }
                case FLOAT: {
                    row.addFloat(colIdx, record.getAsFloat(colName).floatValue());
                    continue block12;
                }
                case DOUBLE: {
                    row.addDouble(colIdx, record.getAsDouble(colName).doubleValue());
                    continue block12;
                }
                case BINARY: {
                    row.addBinary(colIdx, record.getAsString(colName).getBytes());
                    continue block12;
                }
                case INT8: {
                    row.addByte(colIdx, record.getAsInt(colName).byteValue());
                    continue block12;
                }
                case INT16: {
                    row.addShort(colIdx, record.getAsInt(colName).shortValue());
                    continue block12;
                }
                case INT32: {
                    row.addInt(colIdx, record.getAsInt(colName).intValue());
                    continue block12;
                }
                case INT64: 
                case UNIXTIME_MICROS: {
                    row.addLong(colIdx, record.getAsLong(colName).longValue());
                    continue block12;
                }
                case STRING: {
                    row.addString(colIdx, record.getAsString(colName));
                    continue block12;
                }
                case DECIMAL32: 
                case DECIMAL64: 
                case DECIMAL128: {
                    row.addDecimal(colIdx, new BigDecimal(record.getAsString(colName)));
                    continue block12;
                }
            }
            throw new IllegalStateException(String.format("unknown column type %s", colType));
        }
    }

    private int getColumnIndex(Schema columns, String colName) {
        try {
            return columns.getColumnIndex(colName);
        }
        catch (Exception ex) {
            return -1;
        }
    }
}

