/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.PushBackRecordSet;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;

@EventDriven
@SupportsBatching
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@SideEffectFree
@Tags(value={"split", "generic", "schema", "json", "csv", "avro", "log", "logs", "freeform", "text"})
@WritesAttributes(value={@WritesAttribute(attribute="mime.type", description="Sets the mime.type attribute to the MIME Type specified by the Record Writer for the FlowFiles routed to the 'splits' Relationship."), @WritesAttribute(attribute="record.count", description="The number of records in the FlowFile. This is added to FlowFiles that are routed to the 'splits' Relationship."), @WritesAttribute(attribute="fragment.identifier", description="All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"), @WritesAttribute(attribute="fragment.index", description="A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile"), @WritesAttribute(attribute="fragment.count", description="The number of split FlowFiles generated from the parent FlowFile"), @WritesAttribute(attribute="segment.original.filename ", description="The filename of the parent FlowFile")})
@CapabilityDescription(value="Splits up an input FlowFile that is in a record-oriented data format into multiple smaller FlowFiles")
public class SplitRecord
extends AbstractProcessor {
    public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
    public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
    public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
    public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("Record Reader").description("Specifies the Controller Service to use for reading incoming data").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("Record Writer").description("Specifies the Controller Service to use for writing out the records").identifiesControllerService(RecordSetWriterFactory.class).required(true).build();
    static final PropertyDescriptor RECORDS_PER_SPLIT = new PropertyDescriptor.Builder().name("Records Per Split").description("Specifies how many records should be written to each 'split' or 'segment' FlowFile").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).required(true).build();
    static final Relationship REL_SPLITS = new Relationship.Builder().name("splits").description("The individual 'segments' of the original FlowFile will be routed to this relationship.").build();
    static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("Upon successfully splitting an input FlowFile, the original FlowFile will be sent to this relationship.").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a FlowFile cannot be transformed from the configured input format to the configured output format, the unchanged FlowFile will be routed to this relationship.").build();

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(RECORD_READER);
        properties.add(RECORD_WRITER);
        properties.add(RECORDS_PER_SPLIT);
        return properties;
    }

    public Set<Relationship> getRelationships() {
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SPLITS);
        relationships.add(REL_ORIGINAL);
        relationships.add(REL_FAILURE);
        return relationships;
    }

    public void onTrigger(ProcessContext context, final ProcessSession session) throws ProcessException {
        final FlowFile original = session.get();
        if (original == null) {
            return;
        }
        final RecordReaderFactory readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        final RecordSetWriterFactory writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        final int maxRecords = context.getProperty(RECORDS_PER_SPLIT).evaluateAttributeExpressions(original).asInteger();
        final ArrayList splits = new ArrayList();
        final Map originalAttributes = original.getAttributes();
        final String fragmentId = UUID.randomUUID().toString();
        try {
            session.read(original, new InputStreamCallback(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void process(InputStream in) throws IOException {
                    try (RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, SplitRecord.this.getLogger());){
                        RecordSchema schema = writerFactory.getSchema(originalAttributes, reader.getSchema());
                        RecordSet recordSet = reader.createRecordSet();
                        PushBackRecordSet pushbackSet = new PushBackRecordSet(recordSet);
                        int fragmentIndex = 0;
                        while (pushbackSet.isAnotherRecord()) {
                            FlowFile split = session.create(original);
                            try {
                                HashMap<String, String> attributes = new HashMap<String, String>();
                                try (OutputStream out = session.write(split);
                                     RecordSetWriter writer = writerFactory.createWriter(SplitRecord.this.getLogger(), schema, out);){
                                    WriteResult writeResult;
                                    if (maxRecords == 1) {
                                        Record record = pushbackSet.next();
                                        writeResult = writer.write(record);
                                    } else {
                                        RecordSet limitedSet = pushbackSet.limit(maxRecords);
                                        writeResult = writer.write(limitedSet);
                                    }
                                    attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
                                    attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
                                    attributes.put(FRAGMENT_INDEX, String.valueOf(fragmentIndex));
                                    attributes.put(FRAGMENT_ID, fragmentId);
                                    attributes.put(SEGMENT_ORIGINAL_FILENAME, original.getAttribute(CoreAttributes.FILENAME.key()));
                                    attributes.putAll(writeResult.getAttributes());
                                    session.adjustCounter("Records Split", (long)writeResult.getRecordCount(), false);
                                }
                                split = session.putAllAttributes(split, attributes);
                            }
                            finally {
                                splits.add(split);
                            }
                            ++fragmentIndex;
                        }
                    }
                    catch (SchemaNotFoundException | MalformedRecordException e) {
                        throw new ProcessException("Failed to parse incoming data", e);
                    }
                }
            });
        }
        catch (ProcessException pe) {
            this.getLogger().error("Failed to split {}", new Object[]{original, pe});
            session.remove(splits);
            session.transfer(original, REL_FAILURE);
            return;
        }
        session.transfer(original, REL_ORIGINAL);
        for (FlowFile split : splits) {
            session.putAttribute(split, FRAGMENT_COUNT, String.valueOf(splits.size()));
        }
        session.transfer(splits, REL_SPLITS);
        this.getLogger().info("Successfully split {} into {} FlowFiles, each containing up to {} records", new Object[]{original, splits.size(), maxRecords});
    }
}

