/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;

@Tags(value={"record", "stats", "metrics"})
@CapabilityDescription(value="A processor that can count the number of items in a record set, as well as provide counts based on user-defined criteria on subsets of the record set.")
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@DynamicProperty(name="Record Path property", value="The Record Path value", expressionLanguageScope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, description="A Record Path value, pointing to a field to be counted")
@WritesAttributes(value={@WritesAttribute(attribute="record.count", description="A count of the records in the record set in the FlowFile."), @WritesAttribute(attribute="recordStats.<User Defined Property Name>.count", description="A count of the records that contain a value for the user defined property."), @WritesAttribute(attribute="recordStats.<User Defined Property Name>.<value>.count", description="Each value discovered for the user defined property will have its own count attribute. Total number of top N value counts to be added is defined by the limit configuration.")})
public class CalculateRecordStats
extends AbstractProcessor {
    static final String RECORD_COUNT_ATTR = "record.count";
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("record-stats-reader").displayName("Record Reader").description("A record reader to use for reading the records.").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder().name("record-stats-limit").description("Limit the number of individual stats that are returned for each record path to the top N results.").required(true).defaultValue("10").addValidator(StandardValidators.INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("If a flowfile is successfully processed, it goes here.").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a flowfile fails to be processed, it goes here.").build();
    private RecordPathCache cache;
    static final Set RELATIONSHIPS;
    static final List<PropertyDescriptor> PROPERTIES;

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
        return new PropertyDescriptor.Builder().name(propertyDescriptorName).displayName(propertyDescriptorName).dynamic(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTIES;
    }

    @OnScheduled
    public void onEnabled(ProcessContext context) {
        this.cache = new RecordPathCache(25);
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile input = session.get();
        if (input == null) {
            return;
        }
        try {
            Map<String, RecordPath> paths = this.getRecordPaths(context, input);
            Map<String, String> stats = this.getStats(input, paths, context, session);
            input = session.putAllAttributes(input, stats);
            session.transfer(input, REL_SUCCESS);
        }
        catch (Exception ex) {
            this.getLogger().error("Error processing stats.", (Throwable)ex);
            session.transfer(input, REL_FAILURE);
        }
    }

    protected Map<String, RecordPath> getRecordPaths(ProcessContext context, FlowFile flowFile) {
        return context.getProperties().keySet().stream().filter(p -> p.isDynamic()).collect(Collectors.toMap(e -> e.getName(), e -> {
            String val = context.getProperty(e).evaluateAttributeExpressions(flowFile).getValue();
            return this.cache.getCompiled(val);
        }));
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    protected Map<String, String> getStats(FlowFile flowFile, Map<String, RecordPath> paths, ProcessContext context, ProcessSession session) {
        try (InputStream is = session.read(flowFile);){
            RecordReaderFactory factory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
            Integer limit = context.getProperty(LIMIT).evaluateAttributeExpressions(flowFile).asInteger();
            RecordReader reader = factory.createRecordReader(flowFile, is, this.getLogger());
            Map<String, Integer> retVal = new HashMap<String, Integer>();
            int recordCount = 0;
            ArrayList<String> baseKeys = new ArrayList<String>();
            while (true) {
                Object map;
                Record record;
                if ((record = reader.nextRecord()) != null) {
                    map = paths.entrySet().iterator();
                } else {
                    retVal = this.filterBySize(retVal, limit, baseKeys);
                    retVal.put(RECORD_COUNT_ATTR, recordCount);
                    map = retVal.entrySet().stream().collect(Collectors.toMap(e -> (String)e.getKey(), e -> ((Integer)e.getValue()).toString()));
                    return map;
                }
                while (map.hasNext()) {
                    Map.Entry<String, RecordPath> entry = map.next();
                    RecordPathResult result = entry.getValue().evaluate(record);
                    Optional value = result.getSelectedFields().findFirst();
                    if (!value.isPresent() || ((FieldValue)value.get()).getValue() == null) continue;
                    String approxValue = ((FieldValue)value.get()).getValue().toString();
                    String baseKey = String.format("recordStats.%s", entry.getKey());
                    String key = String.format("%s.%s", baseKey, approxValue);
                    Integer stat = retVal.containsKey(key) ? retVal.get(key) : Integer.valueOf(0);
                    Integer baseStat = retVal.getOrDefault(baseKey, 0);
                    Integer n = stat;
                    Integer n2 = stat = Integer.valueOf(stat + 1);
                    n = baseStat;
                    n2 = baseStat = Integer.valueOf(baseStat + 1);
                    retVal.put(key, stat);
                    retVal.put(baseKey, baseStat);
                    if (baseKeys.contains(baseKey)) continue;
                    baseKeys.add(baseKey);
                }
                ++recordCount;
            }
        }
        catch (Exception e2) {
            this.getLogger().error("Could not read flowfile", (Throwable)e2);
            throw new ProcessException((Throwable)e2);
        }
    }

    protected Map filterBySize(Map<String, Integer> values, Integer limit, List<String> baseKeys) {
        Map<String, Integer> toFilter = values.entrySet().stream().filter(e -> !baseKeys.contains(e.getKey())).collect(Collectors.toMap(e -> (String)e.getKey(), e -> (Integer)e.getValue()));
        Map<String, Integer> retVal = values.entrySet().stream().filter(e -> baseKeys.contains(e.getKey())).collect(Collectors.toMap(e -> (String)e.getKey(), e -> (Integer)e.getValue()));
        ArrayList<Map.Entry<String, Integer>> _flat = new ArrayList<Map.Entry<String, Integer>>(toFilter.entrySet());
        _flat.sort(Map.Entry.comparingByValue());
        Collections.reverse(_flat);
        for (int index = 0; index < _flat.size() && index < limit; ++index) {
            retVal.put((String)((Map.Entry)_flat.get(index)).getKey(), (Integer)((Map.Entry)_flat.get(index)).getValue());
        }
        return retVal;
    }

    static {
        HashSet<Relationship> _rels = new HashSet<Relationship>();
        _rels.add(REL_SUCCESS);
        _rels.add(REL_FAILURE);
        RELATIONSHIPS = Collections.unmodifiableSet(_rels);
        ArrayList<PropertyDescriptor> _temp = new ArrayList<PropertyDescriptor>();
        _temp.add(RECORD_READER);
        _temp.add(LIMIT);
        PROPERTIES = Collections.unmodifiableList(_temp);
    }
}

