package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.JmsProperties;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;

@CapabilityDescription("This processor allows the user to fork a record into multiple records. The user must specify at least one Record Path, as a dynamic property, pointing to a field of type ARRAY containing RECORD objects. The processor accepts two modes: 'split' and 'extract'. In both modes, there is one record generated per element contained in the designated array. In the 'split' mode, each generated record will preserve the same schema as given in the input but the array will contain only one element. In the 'extract' mode, the element of the array must be of record type and will be the generated record. Additionally, in the 'extract' mode, it is possible to specify if each generated record should contain all the fields of the parent records from the root level to the extracted record. This assumes that the fields to add in the record are defined in the schema of the Record Writer controller service. See examples in the additional details documentation of this processor.")
@DynamicProperty(name = "Record Path property", value = "The Record Path value", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, description = "A Record Path value, pointing to a field of type ARRAY containing RECORD objects")
@SupportsBatching
@WritesAttributes({@WritesAttribute(attribute = ListenUDPRecord.RECORD_COUNT_ATTR, description = "The generated FlowFile will have a 'record.count' attribute indicating the number of records that were written to the FlowFile."), @WritesAttribute(attribute = "mime.type", description = "The MIME Type indicated by the Record Writer"), @WritesAttribute(attribute = "<Attributes from Record Writer>", description = "Any Attribute that the configured Record Writer returns will be added to the FlowFile.")})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"fork", "record", "content", "array", JmsProperties.MSG_TYPE_STREAM, "event"})
@SideEffectFree
/* loaded from: input_file:org/apache/nifi/processors/standard/ForkRecord.class */
public class ForkRecord extends AbstractProcessor {
    private volatile RecordPathCache recordPathCache = new RecordPathCache(25);
    static final AllowableValue MODE_EXTRACT = new AllowableValue("extract", "Extract", "Generated records will be the elements of the array");
    static final AllowableValue MODE_SPLIT = new AllowableValue("split", "Split", "Generated records will preserve the input schema and will contain a one-element array");
    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for reading incoming data").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("Specifies the Controller Service to use for writing out the records").identifiesControllerService(RecordSetWriterFactory.class).required(true).build();
    public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder().name("fork-mode").displayName("Mode").description("Specifies the forking mode of the processor").allowableValues(new AllowableValue[]{MODE_EXTRACT, MODE_SPLIT}).defaultValue(MODE_SPLIT.getValue()).required(true).build();
    public static final PropertyDescriptor INCLUDE_PARENT_FIELDS = new PropertyDescriptor.Builder().name("include-parent-fields").displayName("Include Parent Fields").description("This parameter is only valid with the 'extract' mode. If set to true, all the fields from the root level to the given array will be added as fields of each element of the array to fork.").allowableValues(new String[]{"true", "false"}).defaultValue("false").required(true).build();
    public static final Relationship REL_FORK = new Relationship.Builder().name("fork").description("The FlowFiles containing the forked records will be routed to this relationship").build();
    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFiles will be routed to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("In case a FlowFile generates an error during the fork operation, it will be routed to this relationship").build();

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(RECORD_READER);
        arrayList.add(RECORD_WRITER);
        arrayList.add(MODE);
        arrayList.add(INCLUDE_PARENT_FIELDS);
        return arrayList;
    }

    public Set<Relationship> getRelationships() {
        HashSet hashSet = new HashSet();
        hashSet.add(REL_ORIGINAL);
        hashSet.add(REL_FAILURE);
        hashSet.add(REL_FORK);
        return hashSet;
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String str) {
        return new PropertyDescriptor.Builder().name(str).required(false).addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)).addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).dynamic(true).build();
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList arrayList = new ArrayList(super.customValidate(validationContext));
        RecordPathValidator recordPathValidator = new RecordPathValidator();
        Iterator it = validationContext.getProperties().entrySet().iterator();
        while (it.hasNext()) {
            PropertyDescriptor propertyDescriptor = (PropertyDescriptor) ((Map.Entry) it.next()).getKey();
            if (propertyDescriptor.isDynamic() && propertyDescriptor.isExpressionLanguageSupported()) {
                String value = validationContext.getProperty(propertyDescriptor).getValue();
                if (!validationContext.isExpressionLanguagePresent(value)) {
                    arrayList.add(recordPathValidator.validate(propertyDescriptor.getDisplayName(), value, validationContext));
                }
            }
        }
        return arrayList;
    }

    public void onTrigger(ProcessContext processContext, final ProcessSession processSession) throws ProcessException {
        final FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        final ArrayList arrayList = new ArrayList();
        Iterator it = processContext.getProperties().entrySet().iterator();
        while (it.hasNext()) {
            PropertyDescriptor propertyDescriptor = (PropertyDescriptor) ((Map.Entry) it.next()).getKey();
            if (propertyDescriptor.isDynamic() && propertyDescriptor.isExpressionLanguageSupported() && StringUtils.isNotBlank(processContext.getProperty(propertyDescriptor).evaluateAttributeExpressions(flowFile).getValue())) {
                arrayList.add(this.recordPathCache.getCompiled(processContext.getProperty(propertyDescriptor).evaluateAttributeExpressions(flowFile).getValue()));
            }
        }
        final RecordReaderFactory asControllerService = processContext.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        final RecordSetWriterFactory asControllerService2 = processContext.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        final boolean booleanValue = processContext.getProperty(INCLUDE_PARENT_FIELDS).asBoolean().booleanValue();
        final boolean equals = processContext.getProperty(MODE).getValue().equals(MODE_SPLIT.getValue());
        final Map attributes = flowFile.getAttributes();
        final FlowFile create = processSession.create(flowFile);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        try {
            processSession.read(flowFile, new InputStreamCallback() { // from class: org.apache.nifi.processors.standard.ForkRecord.1
                /* JADX WARN: Finally extract failed */
                public void process(InputStream inputStream) throws IOException {
                    try {
                        RecordReader createRecordReader = asControllerService.createRecordReader(attributes, inputStream, flowFile.getSize(), ForkRecord.this.getLogger());
                        Throwable th = null;
                        try {
                            RecordSchema schema = asControllerService2.getSchema(attributes, createRecordReader.getSchema());
                            RecordSetWriter createWriter = asControllerService2.createWriter(ForkRecord.this.getLogger(), schema, processSession.write(create), create);
                            Throwable th2 = null;
                            try {
                                try {
                                    createWriter.beginRecordSet();
                                    while (true) {
                                        Record nextRecord = createRecordReader.nextRecord();
                                        if (nextRecord == null) {
                                            break;
                                        }
                                        atomicInteger.incrementAndGet();
                                        for (RecordPath recordPath : arrayList) {
                                            for (FieldValue fieldValue : recordPath.evaluate(nextRecord).getSelectedFields()) {
                                                RecordFieldType fieldType = fieldValue.getField().getDataType().getFieldType();
                                                if (fieldType != RecordFieldType.ARRAY) {
                                                    ForkRecord.this.getLogger().debug("The record path " + recordPath.getPath() + " is matching a field of type " + fieldType + " when the type ARRAY is expected.");
                                                } else if (equals) {
                                                    for (Object obj : (Object[]) fieldValue.getValue()) {
                                                        fieldValue.updateValue(new Object[]{obj});
                                                        createWriter.write(nextRecord);
                                                    }
                                                } else {
                                                    DataType elementType = fieldValue.getField().getDataType().getElementType();
                                                    if (elementType.getFieldType() != RecordFieldType.RECORD) {
                                                        ForkRecord.this.getLogger().debug("The record path " + recordPath.getPath() + " is matching an array field with values of type " + elementType.getFieldType() + " when the type RECORD is expected.");
                                                    } else {
                                                        for (Object obj2 : (Object[]) fieldValue.getValue()) {
                                                            if (obj2 != null) {
                                                                Record record = (Record) obj2;
                                                                if (booleanValue) {
                                                                    record.incorporateSchema(schema);
                                                                    recursivelyAddParentFields(record, fieldValue);
                                                                }
                                                                createWriter.write(record);
                                                            }
                                                        }
                                                    }
                                                }
                                            }
                                        }
                                    }
                                    WriteResult finishRecordSet = createWriter.finishRecordSet();
                                    try {
                                        createWriter.close();
                                    } catch (IOException e) {
                                        ForkRecord.this.getLogger().warn("Failed to close Writer for {}", new Object[]{create});
                                    }
                                    HashMap hashMap = new HashMap();
                                    atomicInteger2.set(finishRecordSet.getRecordCount());
                                    hashMap.put(ListenUDPRecord.RECORD_COUNT_ATTR, String.valueOf(finishRecordSet.getRecordCount()));
                                    hashMap.put(CoreAttributes.MIME_TYPE.key(), createWriter.getMimeType());
                                    hashMap.putAll(finishRecordSet.getAttributes());
                                    processSession.transfer(processSession.putAllAttributes(create, hashMap), ForkRecord.REL_FORK);
                                    if (createWriter != null) {
                                        if (0 != 0) {
                                            try {
                                                createWriter.close();
                                            } catch (Throwable th3) {
                                                th2.addSuppressed(th3);
                                            }
                                        } else {
                                            createWriter.close();
                                        }
                                    }
                                    if (createRecordReader != null) {
                                        if (0 != 0) {
                                            try {
                                                createRecordReader.close();
                                            } catch (Throwable th4) {
                                                th.addSuppressed(th4);
                                            }
                                        } else {
                                            createRecordReader.close();
                                        }
                                    }
                                } finally {
                                }
                            } catch (Throwable th5) {
                                if (createWriter != null) {
                                    if (th2 != null) {
                                        try {
                                            createWriter.close();
                                        } catch (Throwable th6) {
                                            th2.addSuppressed(th6);
                                        }
                                    } else {
                                        createWriter.close();
                                    }
                                }
                                throw th5;
                            }
                        } catch (Throwable th7) {
                            if (createRecordReader != null) {
                                if (0 != 0) {
                                    try {
                                        createRecordReader.close();
                                    } catch (Throwable th8) {
                                        th.addSuppressed(th8);
                                    }
                                } else {
                                    createRecordReader.close();
                                }
                            }
                            throw th7;
                        }
                    } catch (SchemaNotFoundException | MalformedRecordException e2) {
                        throw new ProcessException("Could not parse incoming data: " + e2.getLocalizedMessage(), e2);
                    }
                }

                private void recursivelyAddParentFields(Record record, FieldValue fieldValue) {
                    try {
                        FieldValue fieldValue2 = (FieldValue) fieldValue.getParent().get();
                        Record record2 = (Record) fieldValue.getParentRecord().get();
                        for (String str : record2.getSchema().getFieldNames()) {
                            if (record.getValue(str) == null) {
                                record.setValue(str, record2.getValue(str));
                            }
                        }
                        recursivelyAddParentFields(record, fieldValue2);
                    } catch (NoSuchElementException e) {
                    }
                }
            });
            processSession.adjustCounter("Records Processed", atomicInteger.get(), false);
            processSession.adjustCounter("Records Generated", atomicInteger2.get(), false);
            getLogger().debug("Successfully forked {} records into {} records in {}", new Object[]{Integer.valueOf(atomicInteger.get()), Integer.valueOf(atomicInteger2.get()), flowFile});
            processSession.transfer(flowFile, REL_ORIGINAL);
        } catch (Exception e) {
            getLogger().error("Failed to fork {}", new Object[]{flowFile, e});
            processSession.remove(create);
            processSession.transfer(flowFile, REL_FAILURE);
        }
    }
}
