/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;

@SideEffectFree
@SupportsBatching
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"fork", "record", "content", "array", "stream", "event"})
@CapabilityDescription(value="This processor allows the user to fork a record into multiple records. The user must specify at least one Record Path, as a dynamic property, pointing to a field of type ARRAY containing RECORD objects. The processor accepts two modes: 'split' and 'extract'. In both modes, there is one record generated per element contained in the designated array. In the 'split' mode, each generated record will preserve the same schema as given in the input but the array will contain only one element. In the 'extract' mode, the element of the array must be of record type and will be the generated record. Additionally, in the 'extract' mode, it is possible to specify if each generated record should contain all the fields of the parent records from the root level to the extracted record. This assumes that the fields to add in the record are defined in the schema of the Record Writer controller service. See examples in the additional details documentation of this processor.")
@WritesAttributes(value={@WritesAttribute(attribute="record.count", description="The generated FlowFile will have a 'record.count' attribute indicating the number of records that were written to the FlowFile."), @WritesAttribute(attribute="mime.type", description="The MIME Type indicated by the Record Writer"), @WritesAttribute(attribute="<Attributes from Record Writer>", description="Any Attribute that the configured Record Writer returns will be added to the FlowFile.")})
public class ForkRecord
extends AbstractProcessor {
    private volatile RecordPathCache recordPathCache = new RecordPathCache(25);
    static final AllowableValue MODE_EXTRACT = new AllowableValue("extract", "Extract", "Generated records will be the elements of the array");
    static final AllowableValue MODE_SPLIT = new AllowableValue("split", "Split", "Generated records will preserve the input schema and will contain a one-element array");
    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for reading incoming data").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("Specifies the Controller Service to use for writing out the records").identifiesControllerService(RecordSetWriterFactory.class).required(true).build();
    public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder().name("fork-mode").displayName("Mode").description("Specifies the forking mode of the processor").allowableValues(new AllowableValue[]{MODE_EXTRACT, MODE_SPLIT}).defaultValue(MODE_SPLIT.getValue()).required(true).build();
    public static final PropertyDescriptor INCLUDE_PARENT_FIELDS = new PropertyDescriptor.Builder().name("include-parent-fields").displayName("Include Parent Fields").description("This parameter is only valid with the 'extract' mode. If set to true, all the fields from the root level to the given array will be added as fields of each element of the array to fork.").allowableValues(new String[]{"true", "false"}).defaultValue("false").required(true).build();
    public static final Relationship REL_FORK = new Relationship.Builder().name("fork").description("The FlowFiles containing the forked records will be routed to this relationship").build();
    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFiles will be routed to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("In case a FlowFile generates an error during the fork operation, it will be routed to this relationship").build();

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(RECORD_READER);
        properties.add(RECORD_WRITER);
        properties.add(MODE);
        properties.add(INCLUDE_PARENT_FIELDS);
        return properties;
    }

    public Set<Relationship> getRelationships() {
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_ORIGINAL);
        relationships.add(REL_FAILURE);
        relationships.add(REL_FORK);
        return relationships;
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
        return new PropertyDescriptor.Builder().name(propertyDescriptorName).required(false).addValidator(StandardValidators.createAttributeExpressionLanguageValidator((AttributeExpression.ResultType)AttributeExpression.ResultType.STRING, (boolean)true)).addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).dynamic(true).build();
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>(super.customValidate(validationContext));
        RecordPathValidator validator = new RecordPathValidator();
        Map processorProperties = validationContext.getProperties();
        for (Map.Entry entry : processorProperties.entrySet()) {
            String dynamicValue;
            PropertyDescriptor property = (PropertyDescriptor)entry.getKey();
            if (!property.isDynamic() || !property.isExpressionLanguageSupported() || validationContext.isExpressionLanguagePresent(dynamicValue = validationContext.getProperty(property).getValue())) continue;
            results.add(validator.validate(property.getDisplayName(), dynamicValue, validationContext));
        }
        return results;
    }

    public void onTrigger(ProcessContext context, final ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        final ArrayList<RecordPath> recordPaths = new ArrayList<RecordPath>();
        Map processorProperties = context.getProperties();
        for (Map.Entry entry : processorProperties.entrySet()) {
            String path;
            PropertyDescriptor property = (PropertyDescriptor)entry.getKey();
            if (!property.isDynamic() || !property.isExpressionLanguageSupported() || !StringUtils.isNotBlank((CharSequence)(path = context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue()))) continue;
            recordPaths.add(this.recordPathCache.getCompiled(context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue()));
        }
        final RecordReaderFactory readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        final RecordSetWriterFactory writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        final boolean addParentFields = context.getProperty(INCLUDE_PARENT_FIELDS).asBoolean();
        final boolean isSplitMode = context.getProperty(MODE).getValue().equals(MODE_SPLIT.getValue());
        final FlowFile original = flowFile;
        final Map originalAttributes = original.getAttributes();
        final FlowFile outFlowFile = session.create(original);
        final AtomicInteger readCount = new AtomicInteger(0);
        final AtomicInteger writeCount = new AtomicInteger(0);
        try {
            session.read(flowFile, new InputStreamCallback(){

                public void process(InputStream in) throws IOException {
                    try (RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, original.getSize(), ForkRecord.this.getLogger());){
                        RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, reader.getSchema());
                        OutputStream out = session.write(outFlowFile);
                        try (RecordSetWriter recordSetWriter = writerFactory.createWriter(ForkRecord.this.getLogger(), writeSchema, out, outFlowFile);){
                            Record record;
                            recordSetWriter.beginRecordSet();
                            while ((record = reader.nextRecord()) != null) {
                                readCount.incrementAndGet();
                                for (RecordPath recordPath : recordPaths) {
                                    Iterator it = recordPath.evaluate(record).getSelectedFields().iterator();
                                    while (it.hasNext()) {
                                        Object[] records;
                                        FieldValue fieldValue = (FieldValue)it.next();
                                        RecordFieldType fieldType = fieldValue.getField().getDataType().getFieldType();
                                        if (fieldType != RecordFieldType.ARRAY) {
                                            ForkRecord.this.getLogger().debug("The record path " + recordPath.getPath() + " is matching a field of type " + fieldType + " when the type ARRAY is expected.");
                                            continue;
                                        }
                                        if (isSplitMode) {
                                            Object[] items;
                                            for (Object item : items = (Object[])fieldValue.getValue()) {
                                                fieldValue.updateValue((Object)new Object[]{item});
                                                recordSetWriter.write(record);
                                            }
                                            continue;
                                        }
                                        ArrayDataType arrayDataType = (ArrayDataType)fieldValue.getField().getDataType();
                                        DataType elementType = arrayDataType.getElementType();
                                        if (elementType.getFieldType() != RecordFieldType.RECORD) {
                                            ForkRecord.this.getLogger().debug("The record path " + recordPath.getPath() + " is matching an array field with values of type " + elementType.getFieldType() + " when the type RECORD is expected.");
                                            continue;
                                        }
                                        for (Object elementRecord : records = (Object[])fieldValue.getValue()) {
                                            if (elementRecord == null) continue;
                                            Record recordToWrite = (Record)elementRecord;
                                            if (addParentFields) {
                                                recordToWrite.incorporateSchema(writeSchema);
                                                this.recursivelyAddParentFields(recordToWrite, fieldValue);
                                            }
                                            recordSetWriter.write(recordToWrite);
                                        }
                                    }
                                }
                            }
                            WriteResult writeResult = recordSetWriter.finishRecordSet();
                            try {
                                recordSetWriter.close();
                            }
                            catch (IOException ioe) {
                                ForkRecord.this.getLogger().warn("Failed to close Writer for {}", new Object[]{outFlowFile});
                            }
                            HashMap<String, String> attributes = new HashMap<String, String>();
                            writeCount.set(writeResult.getRecordCount());
                            attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
                            attributes.put(CoreAttributes.MIME_TYPE.key(), recordSetWriter.getMimeType());
                            attributes.putAll(writeResult.getAttributes());
                            session.transfer(session.putAllAttributes(outFlowFile, attributes), REL_FORK);
                        }
                    }
                    catch (SchemaNotFoundException | MalformedRecordException e) {
                        throw new ProcessException("Could not parse incoming data: " + e.getLocalizedMessage(), e);
                    }
                }

                private void recursivelyAddParentFields(Record recordToWrite, FieldValue fieldValue) {
                    try {
                        FieldValue parentField = (FieldValue)fieldValue.getParent().get();
                        Record parentRecord = (Record)fieldValue.getParentRecord().get();
                        for (String field : parentRecord.getSchema().getFieldNames()) {
                            if (recordToWrite.getValue(field) != null) continue;
                            recordToWrite.setValue(field, parentRecord.getValue(field));
                        }
                        this.recursivelyAddParentFields(recordToWrite, parentField);
                    }
                    catch (NoSuchElementException e) {
                        return;
                    }
                }
            });
        }
        catch (Exception e) {
            this.getLogger().error("Failed to fork {}", new Object[]{flowFile, e});
            session.remove(outFlowFile);
            session.transfer(original, REL_FAILURE);
            return;
        }
        session.adjustCounter("Records Processed", (long)readCount.get(), false);
        session.adjustCounter("Records Generated", (long)writeCount.get(), false);
        this.getLogger().debug("Successfully forked {} records into {} records in {}", new Object[]{readCount.get(), writeCount.get(), flowFile});
        session.transfer(original, REL_ORIGINAL);
    }
}

