/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;

public abstract class AbstractRecordProcessor
extends AbstractProcessor {
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for reading incoming data").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("Specifies the Controller Service to use for writing out the records").identifiesControllerService(RecordSetWriterFactory.class).required(true).build();
    static final PropertyDescriptor INCLUDE_ZERO_RECORD_FLOWFILES = new PropertyDescriptor.Builder().name("include-zero-record-flowfiles").displayName("Include Zero Record FlowFiles").description("When converting an incoming FlowFile, if the conversion results in no data, this property specifies whether or not a FlowFile will be sent to the corresponding relationship").expressionLanguageSupported(ExpressionLanguageScope.NONE).allowableValues(new String[]{"true", "false"}).defaultValue("true").required(true).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that are successfully transformed will be routed to this relationship").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a FlowFile cannot be transformed from the configured input format to the configured output format, the unchanged FlowFile will be routed to this relationship").build();

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(RECORD_READER);
        properties.add(RECORD_WRITER);
        return properties;
    }

    public Set<Relationship> getRelationships() {
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_FAILURE);
        return relationships;
    }

    public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        final RecordReaderFactory readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        final RecordSetWriterFactory writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        boolean includeZeroRecordFlowFiles = context.getProperty(INCLUDE_ZERO_RECORD_FLOWFILES).isSet() ? context.getProperty(INCLUDE_ZERO_RECORD_FLOWFILES).asBoolean() : true;
        final HashMap attributes = new HashMap();
        final AtomicInteger recordCount = new AtomicInteger();
        final FlowFile original = flowFile;
        final Map originalAttributes = flowFile.getAttributes();
        try {
            flowFile = session.write(flowFile, new StreamCallback(){

                public void process(InputStream in, OutputStream out) throws IOException {
                    try (RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, original.getSize(), AbstractRecordProcessor.this.getLogger());){
                        Record firstRecord = reader.nextRecord();
                        if (firstRecord == null) {
                            RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, reader.getSchema());
                            try (RecordSetWriter writer = writerFactory.createWriter(AbstractRecordProcessor.this.getLogger(), writeSchema, out, originalAttributes);){
                                writer.beginRecordSet();
                                WriteResult writeResult = writer.finishRecordSet();
                                attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
                                attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
                                attributes.putAll(writeResult.getAttributes());
                                recordCount.set(writeResult.getRecordCount());
                            }
                            return;
                        }
                        firstRecord = AbstractRecordProcessor.this.process(firstRecord, original, context, 1L);
                        RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, firstRecord.getSchema());
                        try (RecordSetWriter writer = writerFactory.createWriter(AbstractRecordProcessor.this.getLogger(), writeSchema, out, originalAttributes);){
                            Record record;
                            writer.beginRecordSet();
                            writer.write(firstRecord);
                            long count = 1L;
                            while ((record = reader.nextRecord()) != null) {
                                Record processed = AbstractRecordProcessor.this.process(record, original, context, ++count);
                                writer.write(processed);
                            }
                            WriteResult writeResult = writer.finishRecordSet();
                            attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
                            attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
                            attributes.putAll(writeResult.getAttributes());
                            recordCount.set(writeResult.getRecordCount());
                        }
                    }
                    catch (SchemaNotFoundException e) {
                        throw new ProcessException(e.getLocalizedMessage(), (Throwable)e);
                    }
                    catch (MalformedRecordException e) {
                        throw new ProcessException("Could not parse incoming data", (Throwable)e);
                    }
                }
            });
        }
        catch (Exception e) {
            this.getLogger().error("Failed to process {}; will route to failure", new Object[]{flowFile, e});
            session.transfer(flowFile, REL_FAILURE);
            return;
        }
        flowFile = session.putAllAttributes(flowFile, attributes);
        if (!includeZeroRecordFlowFiles && recordCount.get() == 0) {
            session.remove(flowFile);
        } else {
            session.transfer(flowFile, REL_SUCCESS);
        }
        int count = recordCount.get();
        session.adjustCounter("Records Processed", (long)count, false);
        this.getLogger().info("Successfully converted {} records for {}", new Object[]{count, flowFile});
    }

    protected abstract Record process(Record var1, FlowFile var2, ProcessContext var3, long var4);
}

