/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.mongodb;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.BasicDBObject;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.JsonValidator;
import org.apache.nifi.processors.mongodb.AbstractMongoProcessor;
import org.bson.Document;
import org.bson.conversions.Bson;

@Tags(value={"mongo", "aggregation", "aggregate"})
@CapabilityDescription(value="A processor that runs an aggregation query whenever a flowfile is received.")
@InputRequirement(value=InputRequirement.Requirement.INPUT_ALLOWED)
@EventDriven
public class RunMongoAggregation
extends AbstractMongoProcessor {
    private static final Set<Relationship> relationships;
    private static final List<PropertyDescriptor> propertyDescriptors;
    static final Relationship REL_ORIGINAL;
    static final Relationship REL_FAILURE;
    static final Relationship REL_RESULTS;
    static final PropertyDescriptor QUERY;

    static final List<Bson> buildAggregationQuery(String query) throws IOException {
        ArrayList<Bson> result = new ArrayList<Bson>();
        ObjectMapper mapper = new ObjectMapper();
        List values = (List)mapper.readValue(query, List.class);
        for (Map val : values) {
            result.add((Bson)new BasicDBObject(val));
        }
        return result;
    }

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    private String buildBatch(List<Document> batch) {
        String retVal;
        try {
            retVal = this.objectMapper.writeValueAsString(batch.size() > 1 ? batch : batch.get(0));
        }
        catch (Exception e) {
            retVal = null;
        }
        return retVal;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = null;
        if (context.hasIncomingConnection() && (flowFile = session.get()) == null && context.hasNonLoopConnection()) {
            return;
        }
        String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue();
        String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
        Integer batchSize = context.getProperty(BATCH_SIZE).asInteger();
        Integer resultsPerFlowfile = context.getProperty(RESULTS_PER_FLOWFILE).asInteger();
        String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue();
        String dateFormat = context.getProperty(DATE_FORMAT).evaluateAttributeExpressions(flowFile).getValue();
        this.configureMapper(jsonTypeSetting, dateFormat);
        HashMap<String, String> attrs = new HashMap<String, String>();
        if (queryAttr != null && queryAttr.trim().length() > 0) {
            attrs.put(queryAttr, query);
        }
        try (MongoCursor iter = null;){
            MongoCollection<Document> collection = this.getCollection(context, flowFile);
            List<Bson> aggQuery = RunMongoAggregation.buildAggregationQuery(query);
            AggregateIterable it = collection.aggregate(aggQuery);
            it.batchSize(batchSize != null ? batchSize : 1);
            iter = it.iterator();
            ArrayList<Object> batch = new ArrayList<Document>();
            while (iter.hasNext()) {
                batch.add((Document)iter.next());
                if (batch.size() != resultsPerFlowfile.intValue()) continue;
                this.writeBatch(this.buildBatch(batch), flowFile, context, session, attrs, REL_RESULTS);
                batch = new ArrayList();
            }
            if (batch.size() > 0) {
                this.writeBatch(this.buildBatch(batch), flowFile, context, session, attrs, REL_RESULTS);
            }
            if (flowFile != null) {
                session.transfer(flowFile, REL_ORIGINAL);
            }
        }
    }

    static {
        REL_ORIGINAL = new Relationship.Builder().description("The input flowfile gets sent to this relationship when the query succeeds.").name("original").build();
        REL_FAILURE = new Relationship.Builder().description("The input flowfile gets sent to this relationship when the query fails.").name("failure").build();
        REL_RESULTS = new Relationship.Builder().description("The result set of the aggregation will be sent to this relationship.").name("results").build();
        QUERY = new PropertyDescriptor.Builder().name("mongo-agg-query").displayName("Query").expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).description("The aggregation query to be executed.").required(true).addValidator((Validator)JsonValidator.INSTANCE).build();
        ArrayList<PropertyDescriptor> _propertyDescriptors = new ArrayList<PropertyDescriptor>();
        _propertyDescriptors.addAll(descriptors);
        _propertyDescriptors.add(CHARSET);
        _propertyDescriptors.add(QUERY);
        _propertyDescriptors.add(JSON_TYPE);
        _propertyDescriptors.add(QUERY_ATTRIBUTE);
        _propertyDescriptors.add(BATCH_SIZE);
        _propertyDescriptors.add(RESULTS_PER_FLOWFILE);
        _propertyDescriptors.add(DATE_FORMAT);
        _propertyDescriptors.add(SSL_CONTEXT_SERVICE);
        _propertyDescriptors.add(CLIENT_AUTH);
        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
        HashSet<Relationship> _relationships = new HashSet<Relationship>();
        _relationships.add(REL_RESULTS);
        _relationships.add(REL_ORIGINAL);
        _relationships.add(REL_FAILURE);
        relationships = Collections.unmodifiableSet(_relationships);
    }
}

