/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.Tuple;

public abstract class AbstractRouteRecord<T>
extends AbstractProcessor {
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for reading incoming data").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("Specifies the Controller Service to use for writing out the records").identifiesControllerService(RecordSetWriterFactory.class).required(true).build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a FlowFile cannot be transformed from the configured input format to the configured output format, the unchanged FlowFile will be routed to this relationship").build();
    static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("Once a FlowFile has been processed and any derivative FlowFiles have been transferred, the original FlowFile will be transferred to this relationship.").build();

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(RECORD_READER);
        properties.add(RECORD_WRITER);
        return properties;
    }

    public Set<Relationship> getRelationships() {
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        if (this.isRouteOriginal()) {
            relationships.add(REL_ORIGINAL);
        }
        relationships.add(REL_FAILURE);
        return relationships;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        T flowFileContext;
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        try {
            flowFileContext = this.getFlowFileContext(flowFile, context);
        }
        catch (Exception e) {
            this.getLogger().error("Failed to process {}; routing to failure", new Object[]{flowFile, e});
            session.transfer(flowFile, REL_FAILURE);
            return;
        }
        final RecordReaderFactory readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        final RecordSetWriterFactory writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        final AtomicInteger numRecords = new AtomicInteger(0);
        final HashMap writers = new HashMap();
        final FlowFile original = flowFile;
        final Map originalAttributes = original.getAttributes();
        try {
            session.read(flowFile, new InputStreamCallback(){

                public void process(InputStream in) throws IOException {
                    try (RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, original.getSize(), AbstractRouteRecord.this.getLogger());){
                        Record record;
                        Record firstRecord = reader.nextRecord();
                        if (firstRecord == null) {
                            AbstractRouteRecord.this.getLogger().info("{} has no Records, so routing just the original FlowFile to 'original'", new Object[]{original});
                            return;
                        }
                        RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, firstRecord.getSchema());
                        Set<Relationship> firstRecordRelationships = AbstractRouteRecord.this.route(firstRecord, writeSchema, original, context, flowFileContext);
                        for (Relationship relationship : firstRecordRelationships) {
                            AbstractRouteRecord.this.writeRecord(firstRecord, relationship, writers, session, original, originalAttributes, writerFactory);
                        }
                        while ((record = reader.nextRecord()) != null) {
                            Set<Relationship> relationships = AbstractRouteRecord.this.route(record, writeSchema, original, context, flowFileContext);
                            numRecords.incrementAndGet();
                            for (Relationship relationship : relationships) {
                                AbstractRouteRecord.this.writeRecord(record, relationship, writers, session, original, originalAttributes, writerFactory);
                            }
                        }
                    }
                    catch (SchemaNotFoundException | MalformedRecordException e) {
                        throw new ProcessException("Could not parse incoming data", e);
                    }
                }
            });
            for (Map.Entry entry : writers.entrySet()) {
                Relationship relationship = (Relationship)entry.getKey();
                Tuple tuple = (Tuple)entry.getValue();
                RecordSetWriter writer = (RecordSetWriter)tuple.getValue();
                FlowFile childFlowFile = (FlowFile)tuple.getKey();
                WriteResult writeResult = writer.finishRecordSet();
                try {
                    writer.close();
                }
                catch (IOException ioe) {
                    this.getLogger().warn("Failed to close Writer for {}", new Object[]{childFlowFile});
                }
                HashMap<String, String> attributes = new HashMap<String, String>();
                attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
                attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
                attributes.putAll(writeResult.getAttributes());
                childFlowFile = session.putAllAttributes(childFlowFile, attributes);
                session.transfer(childFlowFile, relationship);
                session.adjustCounter("Records Processed", (long)writeResult.getRecordCount(), false);
                session.adjustCounter("Records Routed to " + relationship.getName(), (long)writeResult.getRecordCount(), false);
                session.getProvenanceReporter().route(childFlowFile, relationship);
            }
        }
        catch (Exception e) {
            try {
                this.getLogger().error("Failed to process {}", new Object[]{flowFile, e});
                for (Tuple tuple : writers.values()) {
                    try {
                        ((RecordSetWriter)tuple.getValue()).close();
                    }
                    catch (Exception e1) {
                        this.getLogger().warn("Failed to close Writer for {}; some resources may not be cleaned up appropriately", new Object[]{tuple.getKey()});
                    }
                    session.remove((FlowFile)tuple.getKey());
                }
                session.transfer(flowFile, REL_FAILURE);
            }
            catch (Throwable throwable) {
                for (Tuple tuple : writers.values()) {
                    RecordSetWriter writer = (RecordSetWriter)tuple.getValue();
                    try {
                        writer.close();
                    }
                    catch (Exception e2) {
                        this.getLogger().warn("Failed to close Record Writer for {}; some resources may not be properly cleaned up", new Object[]{tuple.getKey(), e2});
                    }
                }
                throw throwable;
            }
            for (Tuple tuple : writers.values()) {
                RecordSetWriter writer = (RecordSetWriter)tuple.getValue();
                try {
                    writer.close();
                }
                catch (Exception e3) {
                    this.getLogger().warn("Failed to close Record Writer for {}; some resources may not be properly cleaned up", new Object[]{tuple.getKey(), e3});
                }
            }
            return;
        }
        for (Tuple tuple : writers.values()) {
            RecordSetWriter writer = (RecordSetWriter)tuple.getValue();
            try {
                writer.close();
            }
            catch (Exception e) {
                this.getLogger().warn("Failed to close Record Writer for {}; some resources may not be properly cleaned up", new Object[]{tuple.getKey(), e});
            }
        }
        if (this.isRouteOriginal()) {
            flowFile = session.putAttribute(flowFile, "record.count", String.valueOf(numRecords));
            session.transfer(flowFile, REL_ORIGINAL);
        } else {
            session.remove(flowFile);
        }
        this.getLogger().info("Successfully processed {}, creating {} derivative FlowFiles and processing {} records", new Object[]{flowFile, writers.size(), numRecords});
    }

    private void writeRecord(Record record, Relationship relationship, Map<Relationship, Tuple<FlowFile, RecordSetWriter>> writers, ProcessSession session, FlowFile original, Map<String, String> originalAttributes, RecordSetWriterFactory writerFactory) throws IOException, SchemaNotFoundException {
        RecordSetWriter recordSetWriter;
        Tuple tuple = writers.get(relationship);
        if (tuple == null) {
            FlowFile outFlowFile = session.create(original);
            OutputStream out = session.write(outFlowFile);
            RecordSchema recordWriteSchema = writerFactory.getSchema(originalAttributes, record.getSchema());
            recordSetWriter = writerFactory.createWriter(this.getLogger(), recordWriteSchema, out, outFlowFile);
            recordSetWriter.beginRecordSet();
            tuple = new Tuple((Object)outFlowFile, (Object)recordSetWriter);
            writers.put(relationship, (Tuple<FlowFile, RecordSetWriter>)tuple);
        } else {
            recordSetWriter = (RecordSetWriter)tuple.getValue();
        }
        recordSetWriter.write(record);
    }

    protected abstract Set<Relationship> route(Record var1, RecordSchema var2, FlowFile var3, ProcessContext var4, T var5);

    protected abstract boolean isRouteOriginal();

    protected abstract T getFlowFileContext(FlowFile var1, ProcessContext var2);
}

