/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.solr;

import java.io.IOException;
import java.io.InputStream;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.solr.SolrProcessor;
import org.apache.nifi.processors.solr.SolrUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.MultiMapSolrParams;

@Tags(value={"Apache", "Solr", "Put", "Send", "Record"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="Indexes the Records from a FlowFile into Solr")
@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value", description="These parameters will be passed to Solr on the request")
public class PutSolrRecord
extends SolrProcessor {
    public static final PropertyDescriptor UPDATE_PATH = new PropertyDescriptor.Builder().name("Solr Update Path").displayName("Solr Update Path").description("The path in Solr to post the Flowfile Records").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("/update").build();
    public static final PropertyDescriptor FIELDS_TO_INDEX = new PropertyDescriptor.Builder().name("Fields To Index").displayName("Fields To Index").displayName("Fields To Index").description("Comma-separated list of field names to write").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor COMMIT_WITHIN = new PropertyDescriptor.Builder().name("Commit Within").displayName("Commit Within").description("The number of milliseconds before the given update is committed").required(false).addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("5000").build();
    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("Batch Size").displayName("Batch Size").description("The number of solr documents to index per batch").required(false).addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("500").build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("The original FlowFile").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles that failed for any reason other than Solr being unreachable").build();
    public static final Relationship REL_CONNECTION_FAILURE = new Relationship.Builder().name("connection_failure").description("FlowFiles that failed because Solr is unreachable").build();
    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("put-solr-record-record-reader").displayName("put-solr-record-record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    public static final String COLLECTION_PARAM_NAME = "collection";
    public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin";
    public static final String REPEATING_PARAM_PATTERN = "\\w+\\.\\d+";
    private Set<Relationship> relationships;
    private List<PropertyDescriptor> descriptors;
    private static final String EMPTY_STRING = "";

    protected void init(ProcessorInitializationContext context) {
        super.init(context);
        ArrayList<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(SolrUtils.SOLR_TYPE);
        descriptors.add(SolrUtils.SOLR_LOCATION);
        descriptors.add(SolrUtils.COLLECTION);
        descriptors.add(UPDATE_PATH);
        descriptors.add(RECORD_READER);
        descriptors.add(FIELDS_TO_INDEX);
        descriptors.add(COMMIT_WITHIN);
        descriptors.add(SolrUtils.KERBEROS_CREDENTIALS_SERVICE);
        descriptors.add(SolrUtils.BASIC_USERNAME);
        descriptors.add(SolrUtils.BASIC_PASSWORD);
        descriptors.add(SolrUtils.SSL_CONTEXT_SERVICE);
        descriptors.add(SolrUtils.SOLR_SOCKET_TIMEOUT);
        descriptors.add(SolrUtils.SOLR_CONNECTION_TIMEOUT);
        descriptors.add(SolrUtils.SOLR_MAX_CONNECTIONS);
        descriptors.add(SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST);
        descriptors.add(SolrUtils.ZK_CLIENT_TIMEOUT);
        descriptors.add(SolrUtils.ZK_CONNECTION_TIMEOUT);
        descriptors.add(BATCH_SIZE);
        this.descriptors = Collections.unmodifiableList(descriptors);
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_FAILURE);
        relationships.add(REL_CONNECTION_FAILURE);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.descriptors;
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
        return new PropertyDescriptor.Builder().description("Specifies the value to send for the '" + propertyDescriptorName + "' request parameter").name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    }

    @Override
    public void doOnTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        AtomicReference<Object> error = new AtomicReference<Object>(null);
        AtomicReference<Object> connectionError = new AtomicReference<Object>(null);
        boolean isSolrCloud = SolrUtils.SOLR_TYPE_CLOUD.getValue().equals(context.getProperty(SolrUtils.SOLR_TYPE).getValue());
        String collection = context.getProperty(SolrUtils.COLLECTION).evaluateAttributeExpressions(flowFile).getValue();
        Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong();
        String contentStreamPath = context.getProperty(UPDATE_PATH).evaluateAttributeExpressions(flowFile).getValue();
        MultiMapSolrParams requestParams = new MultiMapSolrParams(SolrUtils.getRequestParams(context, flowFile));
        RecordReaderFactory readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        String fieldsToIndex = context.getProperty(FIELDS_TO_INDEX).evaluateAttributeExpressions(flowFile).getValue();
        Long batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(flowFile).asLong();
        ArrayList<String> fieldList = new ArrayList<String>();
        if (!StringUtils.isBlank((String)fieldsToIndex)) {
            Arrays.stream(fieldsToIndex.split(",")).forEach(f -> fieldList.add(f.trim()));
        }
        StopWatch timer = new StopWatch(true);
        try (InputStream in = session.read(flowFile);
             RecordReader reader = readerFactory.createRecordReader(flowFile, in, this.getLogger());){
            AbstractList inputDocumentList = new LinkedList<SolrInputDocument>();
            try {
                Record record;
                while ((record = reader.nextRecord()) != null) {
                    SolrInputDocument inputDoc = new SolrInputDocument(new String[0]);
                    SolrUtils.writeRecord(record, inputDoc, fieldList, EMPTY_STRING);
                    inputDocumentList.add(inputDoc);
                    if ((long)inputDocumentList.size() == batchSize) {
                        this.index(isSolrCloud, collection, commitWithin, contentStreamPath, requestParams, inputDocumentList);
                        inputDocumentList = new ArrayList();
                    }
                    this.index(isSolrCloud, collection, commitWithin, contentStreamPath, requestParams, inputDocumentList);
                }
            }
            catch (SolrException e) {
                error.set(e);
            }
            catch (SolrServerException e) {
                if (this.causedByIOException(e)) {
                    connectionError.set(e);
                } else {
                    error.set(e);
                }
            }
            catch (IOException e) {
                connectionError.set(e);
            }
        }
        catch (IOException | SchemaNotFoundException | MalformedRecordException e) {
            this.getLogger().error("Could not parse incoming data", e);
            flowFile = session.penalize(flowFile);
            session.transfer(flowFile, REL_FAILURE);
            return;
        }
        timer.stop();
        if (error.get() != null) {
            this.getLogger().error("Failed to send all the records of the {} to Solr due to {}; routing to failure", new Object[]{flowFile, error.get()});
            session.transfer(flowFile, REL_FAILURE);
        } else if (connectionError.get() != null) {
            this.getLogger().error("Failed to send {} to Solr due to {}; routing to connection_failure", new Object[]{flowFile, connectionError.get()});
            flowFile = session.penalize(flowFile);
            session.transfer(flowFile, REL_CONNECTION_FAILURE);
        } else {
            StringBuilder transitUri = new StringBuilder("solr://");
            transitUri.append(this.getSolrLocation());
            if (isSolrCloud) {
                transitUri.append(":").append(collection);
            }
            long duration = timer.getDuration(TimeUnit.MILLISECONDS);
            session.getProvenanceReporter().send(flowFile, transitUri.toString(), duration, true);
            this.getLogger().info("Successfully sent {} to Solr in {} millis", new Object[]{flowFile, duration});
            session.transfer(flowFile, REL_SUCCESS);
        }
    }

    private void index(boolean isSolrCloud, String collection, Long commitWithin, String contentStreamPath, MultiMapSolrParams requestParams, List<SolrInputDocument> inputDocumentList) throws IOException, SolrServerException, SolrException {
        UpdateRequest request = new UpdateRequest(contentStreamPath);
        request.setParams(new ModifiableSolrParams());
        Iterator paramNames = requestParams.getParameterNamesIterator();
        while (paramNames.hasNext()) {
            String paramName = (String)paramNames.next();
            for (String paramValue : requestParams.getParams(paramName)) {
                request.getParams().add(paramName, new String[]{paramValue});
            }
        }
        if (isSolrCloud) {
            request.setParam(COLLECTION_PARAM_NAME, collection);
        }
        if (commitWithin != null && commitWithin > 0L) {
            request.setParam(COMMIT_WITHIN_PARAM_NAME, commitWithin.toString());
        }
        if (this.isBasicAuthEnabled()) {
            request.setBasicAuthCredentials(this.getUsername(), this.getPassword());
        }
        request.add(inputDocumentList);
        UpdateResponse response = (UpdateResponse)request.process(this.getSolrClient());
        this.getLogger().debug("Got {} response from Solr", new Object[]{response.getStatus()});
        inputDocumentList.clear();
    }

    private boolean causedByIOException(SolrServerException e) {
        boolean foundIOException = false;
        for (Throwable cause = e.getCause(); cause != null; cause = cause.getCause()) {
            if (!(cause instanceof IOException)) continue;
            foundIOException = true;
            break;
        }
        return foundIOException;
    }
}

