/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.ConvertRecord;
import org.apache.nifi.processors.standard.QueryRecord;
import org.apache.nifi.processors.standard.SplitRecord;
import org.apache.nifi.processors.standard.UpdateRecord;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;

@EventDriven
@SupportsBatching
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="Splits, or partitions, record-oriented data based on the configured fields in the data. One or more properties must be added. The name of the property is the name of an attribute to add. The value of the property is a RecordPath to evaluate against each Record. Two records will go to the same outbound FlowFile only if they have the same value for each of the given RecordPaths. Because we know that all records in a given output FlowFile have the same value for the fields that are specified by the RecordPath, an attribute is added for each field. See Additional Details on the Usage page for more information and examples.")
@DynamicProperty(name="The name given to the dynamic property is the name of the attribute that will be used to denote the value of the associated RecordPath.", value="A RecordPath that points to a field in the Record.", description="Each dynamic property represents a RecordPath that will be evaluated against each record in an incoming FlowFile. When the value of the RecordPath is determined for a Record, an attribute is added to the outgoing FlowFile. The name of the attribute is the same as the name of this property. The value of the attribute is the same as the value of the field in the Record that the RecordPath points to. Note that no attribute will be added if the value returned for the RecordPath is null or is not a scalar value (i.e., the value is an Array, Map, or Record).", expressionLanguageScope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@WritesAttributes(value={@WritesAttribute(attribute="record.count", description="The number of records in an outgoing FlowFile"), @WritesAttribute(attribute="mime.type", description="The MIME Type that the configured Record Writer indicates is appropriate"), @WritesAttribute(attribute="fragment.identifier", description="All partitioned FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"), @WritesAttribute(attribute="fragment.index", description="A one-up number that indicates the ordering of the partitioned FlowFiles that were created from a single parent FlowFile"), @WritesAttribute(attribute="fragment.count", description="The number of partitioned FlowFiles generated from the parent FlowFile"), @WritesAttribute(attribute="segment.original.filename ", description="The filename of the parent FlowFile"), @WritesAttribute(attribute="<dynamic property name>", description="For each dynamic property that is added, an attribute may be added to the FlowFile. See the description for Dynamic Properties for more information.")})
@Tags(value={"record", "partition", "recordpath", "rpath", "segment", "split", "group", "bin", "organize"})
@SeeAlso(value={ConvertRecord.class, SplitRecord.class, UpdateRecord.class, QueryRecord.class})
public class PartitionRecord
extends AbstractProcessor {
    private final RecordPathCache recordPathCache = new RecordPathCache(25);
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for reading incoming data").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("Specifies the Controller Service to use for writing out the records").identifiesControllerService(RecordSetWriterFactory.class).required(true).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that are successfully partitioned will be routed to this relationship").build();
    static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("Once all records in an incoming FlowFile have been partitioned, the original FlowFile is routed to this relationship.").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a FlowFile cannot be partitioned from the configured input format to the configured output format, the unchanged FlowFile will be routed to this relationship").build();

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(RECORD_READER);
        properties.add(RECORD_WRITER);
        return properties;
    }

    public Set<Relationship> getRelationships() {
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_FAILURE);
        relationships.add(REL_ORIGINAL);
        return relationships;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        boolean hasDynamic = validationContext.getProperties().keySet().stream().anyMatch(prop -> prop.isDynamic());
        if (hasDynamic) {
            return Collections.emptyList();
        }
        return Collections.singleton(new ValidationResult.Builder().subject("User-defined Properties").valid(false).explanation("At least one RecordPath must be added to this processor by adding a user-defined property").build());
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
        return new PropertyDescriptor.Builder().name(propertyDescriptorName).dynamic(true).required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator((Validator)new RecordPathValidator()).build();
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        Map<String, RecordPath> recordPaths;
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        RecordReaderFactory readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        RecordSetWriterFactory writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        try {
            recordPaths = context.getProperties().keySet().stream().filter(prop -> prop.isDynamic()).collect(Collectors.toMap(prop -> prop.getName(), prop -> this.getRecordPath(context, (PropertyDescriptor)prop, flowFile)));
        }
        catch (Exception e) {
            this.getLogger().error("Failed to compile RecordPath for {}; routing to failure", new Object[]{flowFile, e});
            session.transfer(flowFile, REL_FAILURE);
            return;
        }
        HashMap<RecordValueMap, RecordSetWriter> writerMap = new HashMap<RecordValueMap, RecordSetWriter>();
        try (InputStream in = session.read(flowFile);){
            Record record;
            Map originalAttributes = flowFile.getAttributes();
            RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, flowFile.getSize(), this.getLogger());
            RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, reader.getSchema());
            while ((record = reader.nextRecord()) != null) {
                HashMap<String, List<ValueWrapper>> recordMap = new HashMap<String, List<ValueWrapper>>();
                for (Map.Entry<String, RecordPath> entry : recordPaths.entrySet()) {
                    String propName = entry.getKey();
                    RecordPath recordPath = entry.getValue();
                    Stream fieldValueStream = recordPath.evaluate(record).getSelectedFields();
                    List fieldValues = fieldValueStream.map(fieldVal -> new ValueWrapper(fieldVal.getValue())).collect(Collectors.toList());
                    recordMap.put(propName, fieldValues);
                }
                RecordValueMap recordValueMap = new RecordValueMap(recordMap);
                RecordSetWriter writer = (RecordSetWriter)writerMap.get(recordValueMap);
                if (writer == null) {
                    FlowFile childFlowFile = session.create(flowFile);
                    recordValueMap.setFlowFile(childFlowFile);
                    OutputStream out = session.write(childFlowFile);
                    writer = writerFactory.createWriter(this.getLogger(), writeSchema, out, childFlowFile);
                    writer.beginRecordSet();
                    writerMap.put(recordValueMap, writer);
                }
                writer.write(record);
            }
            int fragmentIndex = 0;
            String fragmentId = UUID.randomUUID().toString();
            for (Map.Entry entry : writerMap.entrySet()) {
                RecordValueMap valueMap = (RecordValueMap)entry.getKey();
                RecordSetWriter writer = (RecordSetWriter)entry.getValue();
                WriteResult writeResult = writer.finishRecordSet();
                writer.close();
                HashMap<String, String> attributes = new HashMap<String, String>();
                attributes.putAll(valueMap.getAttributes());
                attributes.putAll(writeResult.getAttributes());
                attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
                attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
                attributes.put(FragmentAttributes.FRAGMENT_INDEX.key(), String.valueOf(fragmentIndex));
                attributes.put(FragmentAttributes.FRAGMENT_ID.key(), fragmentId);
                attributes.put(FragmentAttributes.FRAGMENT_COUNT.key(), String.valueOf(writerMap.size()));
                attributes.put(FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key(), flowFile.getAttribute(CoreAttributes.FILENAME.key()));
                FlowFile childFlowFile = valueMap.getFlowFile();
                childFlowFile = session.putAllAttributes(childFlowFile, attributes);
                session.adjustCounter("Record Processed", (long)writeResult.getRecordCount(), false);
                ++fragmentIndex;
            }
        }
        catch (Exception e) {
            for (Map.Entry entry : writerMap.entrySet()) {
                RecordValueMap valueMap = (RecordValueMap)entry.getKey();
                RecordSetWriter writer = (RecordSetWriter)entry.getValue();
                try {
                    writer.close();
                }
                catch (IOException e1) {
                    this.getLogger().warn("Failed to close Record Writer for {}; some resources may not be cleaned up appropriately", new Object[]{flowFile, e1});
                }
                session.remove(valueMap.getFlowFile());
            }
            this.getLogger().error("Failed to partition {}", new Object[]{flowFile, e});
            session.transfer(flowFile, REL_FAILURE);
            return;
        }
        for (RecordValueMap valueMap : writerMap.keySet()) {
            session.transfer(valueMap.getFlowFile(), REL_SUCCESS);
        }
        session.transfer(flowFile, REL_ORIGINAL);
    }

    private RecordPath getRecordPath(ProcessContext context, PropertyDescriptor prop, FlowFile flowFile) {
        String pathText = context.getProperty(prop).evaluateAttributeExpressions(flowFile).getValue();
        RecordPath recordPath = this.recordPathCache.getCompiled(pathText);
        return recordPath;
    }

    private static class RecordValueMap {
        private final Map<String, List<ValueWrapper>> values;
        private FlowFile flowFile;

        public RecordValueMap(Map<String, List<ValueWrapper>> values) {
            this.values = values;
        }

        public Map<String, String> getAttributes() {
            HashMap<String, String> attributes = new HashMap<String, String>();
            for (Map.Entry<String, List<ValueWrapper>> entry : this.values.entrySet()) {
                Object value;
                List<ValueWrapper> values = entry.getValue();
                if (values.size() != 1 || (value = values.get(0).get()) == null || value instanceof Object[] || value instanceof Map || value instanceof Record) continue;
                String attributeValue = DataTypeUtils.toString((Object)value, (String)null);
                attributes.put(entry.getKey(), attributeValue);
            }
            return attributes;
        }

        public FlowFile getFlowFile() {
            return this.flowFile;
        }

        public void setFlowFile(FlowFile flowFile) {
            this.flowFile = flowFile;
        }

        public int hashCode() {
            return 41 + 37 * this.values.hashCode();
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (!(obj instanceof RecordValueMap)) {
                return false;
            }
            RecordValueMap other = (RecordValueMap)obj;
            return this.values.equals(other.values);
        }

        public String toString() {
            return "RecordMapValue[" + this.values + "]";
        }
    }

    static class ValueWrapper {
        private final Object value;

        public ValueWrapper(Object value) {
            this.value = value;
        }

        public Object get() {
            return this.value;
        }

        public int hashCode() {
            if (this.value == null) {
                return 31;
            }
            if (this.value instanceof Object[]) {
                return 31 + Arrays.deepHashCode((Object[])this.value);
            }
            return 31 + this.value.hashCode();
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (!(obj instanceof ValueWrapper)) {
                return false;
            }
            ValueWrapper other = (ValueWrapper)obj;
            if (this.value == null && other.value == null) {
                return true;
            }
            if (this.value == null || other.value == null) {
                return false;
            }
            if (this.value instanceof Object[] && other.value instanceof Object[]) {
                return Arrays.equals((Object[])this.value, (Object[])other.value);
            }
            return this.value.equals(other.value);
        }
    }
}

