package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.bin.Bin;
import org.apache.nifi.processor.util.bin.BinFiles;
import org.apache.nifi.processor.util.bin.BinManager;
import org.apache.nifi.processor.util.bin.BinProcessingResult;
import org.apache.nifi.processor.util.bin.EvictionReason;
import org.apache.nifi.processors.standard.enrichment.EnrichmentRole;
import org.apache.nifi.processors.standard.enrichment.InsertRecordFieldsJoinStrategy;
import org.apache.nifi.processors.standard.enrichment.RecordJoinInput;
import org.apache.nifi.processors.standard.enrichment.RecordJoinResult;
import org.apache.nifi.processors.standard.enrichment.RecordJoinStrategy;
import org.apache.nifi.processors.standard.enrichment.SqlJoinCache;
import org.apache.nifi.processors.standard.enrichment.SqlJoinStrategy;
import org.apache.nifi.processors.standard.enrichment.WrapperJoinStrategy;
import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.db.JdbcProperties;

@CapabilityDescription("Joins together Records from two different FlowFiles where one FlowFile, the 'original' contains arbitrary records and the second FlowFile, the 'enrichment' contains additional data that should be used to enrich the first. See Additional Details for more information on how to configure this processor and the different use cases that it aims to accomplish.")
@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "This Processor will load into heap all FlowFiles that are on its incoming queues. While it loads the FlowFiles themselves, and not their content, the FlowFile attributes can be very memory intensive. Additionally, if the Join Strategy is set to SQL, the SQL engine may require buffering the entire contents of the enrichment FlowFile for each concurrent task. See Processor's Additional Details for more details and for steps on how to mitigate these concerns.")
@WritesAttributes({@WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"), @WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile")})
@TriggerWhenEmpty
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"fork", "join", "enrichment", "record", "sql", "wrap", "recordpath", "merge", "combine", "streams"})
@SeeAlso({ForkEnrichment.class})
@SideEffectFree
/* loaded from: input_file:org/apache/nifi/processors/standard/JoinEnrichment.class */
public class JoinEnrichment extends BinFiles {
    static final String GROUP_ID_ATTRIBUTE = "enrichment.group.id";
    static final String ENRICHMENT_ROLE_ATTRIBUTE = "enrichment.role";
    static final String RECORD_COUNT_ATTRIBUTE = "record.count";
    static final AllowableValue JOIN_WRAPPER = new AllowableValue("Wrapper", "Wrapper", "The output is a Record that contains two fields: (1) 'original', containing the Record from the original FlowFile and (2) 'enrichment' containing the corresponding Record from the enrichment FlowFile. Records will be correlated based on their index in the FlowFile. If one FlowFile has more Records than the other, a null value will be used.");
    static final AllowableValue JOIN_SQL = new AllowableValue(PutDatabaseRecord.SQL_TYPE, PutDatabaseRecord.SQL_TYPE, "The output is derived by evaluating a SQL SELECT statement that allows for two tables: 'original' and 'enrichment'. This allows for SQL JOIN statements to be used in order to correlate the Records of the two FlowFiles, so the index in which the Record is encountered in the FlowFile does not matter.");
    static final AllowableValue JOIN_INSERT_ENRICHMENT_FIELDS = new AllowableValue("Insert Enrichment Fields", "Insert Enrichment Fields", "The enrichment is joined together with the original FlowFile by placing all fields of the enrichment Record into the corresponding Record from the original FlowFile. Records will be correlated based on their index in the FlowFile.");
    static final PropertyDescriptor ORIGINAL_RECORD_READER = new PropertyDescriptor.Builder().name("Original Record Reader").displayName("Original Record Reader").description("The Record Reader for reading the 'original' FlowFile").required(true).identifiesControllerService(RecordReaderFactory.class).build();
    static final PropertyDescriptor ENRICHMENT_RECORD_READER = new PropertyDescriptor.Builder().name("Enrichment Record Reader").displayName("Enrichment Record Reader").description("The Record Reader for reading the 'enrichment' FlowFile").required(true).identifiesControllerService(RecordReaderFactory.class).build();
    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("Record Writer").displayName("Record Writer").description("The Record Writer to use for writing the results. If the Record Writer is configured to inherit the schema from the Record, the schema that it will inherit will be the result of merging both the 'original' record schema and the 'enrichment' record schema.").required(true).identifiesControllerService(RecordSetWriterFactory.class).build();
    static final PropertyDescriptor JOIN_STRATEGY = new PropertyDescriptor.Builder().name("Join Strategy").displayName("Join Strategy").description("Specifies how to join the two FlowFiles into a single FlowFile").required(true).allowableValues(new DescribedValue[]{JOIN_WRAPPER, JOIN_SQL, JOIN_INSERT_ENRICHMENT_FIELDS}).defaultValue(JOIN_WRAPPER.getValue()).build();
    static final PropertyDescriptor SQL = new PropertyDescriptor.Builder().name(PutDatabaseRecord.SQL_TYPE).displayName(PutDatabaseRecord.SQL_TYPE).description("The SQL SELECT statement to evaluate. Expression Language may be provided, but doing so may result in poorer performance. Because this Processor is dealing with two FlowFiles at a time, it's also important to understand how attributes will be referenced. If both FlowFiles have an attribute with the same name but different values, the Expression Language will resolve to the value provided by the 'enrichment' FlowFile.").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).dependsOn(JOIN_STRATEGY, new AllowableValue[]{JOIN_SQL}).defaultValue("SELECT original.*, enrichment.* \nFROM original \nLEFT OUTER JOIN enrichment \nON original.id = enrichment.id").build();
    static final PropertyDescriptor DEFAULT_PRECISION = new PropertyDescriptor.Builder().fromPropertyDescriptor(JdbcProperties.DEFAULT_PRECISION).required(false).dependsOn(JOIN_STRATEGY, new AllowableValue[]{JOIN_SQL}).build();
    static final PropertyDescriptor DEFAULT_SCALE = new PropertyDescriptor.Builder().fromPropertyDescriptor(JdbcProperties.DEFAULT_SCALE).required(false).dependsOn(JOIN_STRATEGY, new AllowableValue[]{JOIN_SQL}).build();
    static final PropertyDescriptor INSERTION_RECORD_PATH = new PropertyDescriptor.Builder().name("Insertion Record Path").displayName("Insertion Record Path").description("Specifies where in the 'original' Record the 'enrichment' Record's fields should be inserted. Note that if the RecordPath does not point to any existing field in the original Record, the enrichment will not be inserted.").required(true).addValidator(new RecordPathValidator()).defaultValue("/").expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).dependsOn(JOIN_STRATEGY, new AllowableValue[]{JOIN_INSERT_ENRICHMENT_FIELDS}).build();
    static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder().name("Timeout").displayName("Timeout").description("Specifies the maximum amount of time to wait for the second FlowFile once the first arrives at the processor, after which point the first FlowFile will be routed to the 'timeout' relationship.").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).defaultValue("10 min").build();
    static final PropertyDescriptor MAX_BIN_COUNT = new PropertyDescriptor.Builder().fromPropertyDescriptor(BinFiles.MAX_BIN_COUNT).defaultValue("10000").build();
    private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(ORIGINAL_RECORD_READER, ENRICHMENT_RECORD_READER, RECORD_WRITER, JOIN_STRATEGY, SQL, DEFAULT_PRECISION, DEFAULT_SCALE, INSERTION_RECORD_PATH, MAX_BIN_COUNT, TIMEOUT));
    static final Relationship REL_JOINED = new Relationship.Builder().name("joined").description("The resultant FlowFile with Records joined together from both the original and enrichment FlowFiles will be routed to this relationship").build();
    static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("Both of the incoming FlowFiles ('original' and 'enrichment') will be routed to this Relationship. I.e., this is the 'original' version of both of these FlowFiles.").autoTerminateDefault(true).build();
    static final Relationship REL_TIMEOUT = new Relationship.Builder().name("timeout").description("If one of the incoming FlowFiles (i.e., the 'original' FlowFile or the 'enrichment' FlowFile) arrives to this Processor but the other does not arrive within the configured Timeout period, the FlowFile that did arrive is routed to this relationship.").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If both the 'original' and 'enrichment' FlowFiles arrive at the processor but there was a failure in joining the records, both of those FlowFiles will be routed to this relationship.").build();
    private static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet(Arrays.asList(REL_JOINED, REL_ORIGINAL, REL_TIMEOUT, REL_FAILURE)));
    private final SqlJoinCache sqlJoinCache = new SqlJoinCache(getLogger());

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return properties;
    }

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    @OnStopped
    public synchronized void cleanup() throws Exception {
        this.sqlJoinCache.close();
    }

    protected FlowFile preprocessFlowFile(ProcessContext processContext, ProcessSession processSession, FlowFile flowFile) {
        return flowFile;
    }

    protected String getGroupId(ProcessContext processContext, FlowFile flowFile, ProcessSession processSession) {
        return flowFile.getAttribute(GROUP_ID_ATTRIBUTE);
    }

    protected void setUpBinManager(BinManager binManager, ProcessContext processContext) {
    }

    protected int getMinEntries(PropertyContext propertyContext) {
        return 2;
    }

    protected int getMaxEntries(PropertyContext propertyContext) {
        return 2;
    }

    protected long getMinBytes(PropertyContext propertyContext) {
        return 0L;
    }

    protected long getMaxBytes(PropertyContext propertyContext) {
        return Long.MAX_VALUE;
    }

    protected int getMaxBinAgeSeconds(PropertyContext propertyContext) {
        return propertyContext.getProperty(TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
    }

    protected BinProcessingResult processBin(Bin bin, ProcessContext processContext) throws ProcessException {
        ProcessSession session = bin.getSession();
        List<FlowFile> contents = bin.getContents();
        if (contents.size() != 2) {
            if (bin.getEvictionReason() == EvictionReason.BIN_MANAGER_FULL) {
                session.transfer(contents);
                session.commitAsync();
                return new BinProcessingResult(true);
            }
            session.transfer(contents, REL_TIMEOUT);
            EnrichmentRole enrichmentRole = getEnrichmentRole(contents.get(0));
            getLogger().warn("Timed out waiting for the {} FlowFile to match {}; routing to {}", new Object[]{enrichmentRole == null ? "other" : getOtherEnrichmentRole(enrichmentRole).name(), contents.get(0), REL_TIMEOUT.getName()});
            session.commitAsync();
            return new BinProcessingResult(true);
        }
        FlowFile flowFileWithRole = getFlowFileWithRole(contents, EnrichmentRole.ORIGINAL);
        FlowFile flowFileWithRole2 = getFlowFileWithRole(contents, EnrichmentRole.ENRICHMENT);
        if (flowFileWithRole == null || flowFileWithRole2 == null) {
            getLogger().error("Received two FlowFiles {} but could not find both an 'original' and an 'enrichment' FlowFile. The FlowFiles do not appear to have the proper attributes necessary for use with this Processor. Routing to {}", new Object[]{contents, REL_FAILURE});
            return transferFailure(contents, session);
        }
        Map<String, String> hashMap = new HashMap<>((Map<? extends String, ? extends String>) flowFileWithRole.getAttributes());
        hashMap.putAll(flowFileWithRole2.getAttributes());
        RecordReaderFactory asControllerService = processContext.getProperty(ORIGINAL_RECORD_READER).asControllerService(RecordReaderFactory.class);
        RecordReaderFactory asControllerService2 = processContext.getProperty(ENRICHMENT_RECORD_READER).asControllerService(RecordReaderFactory.class);
        RecordSetWriterFactory asControllerService3 = processContext.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        try {
            RecordSchema readerSchema = getReaderSchema(asControllerService, flowFileWithRole, session);
            try {
                RecordSchema readerSchema2 = getReaderSchema(asControllerService2, flowFileWithRole2, session);
                try {
                    try {
                        RecordJoinResult join = getJoinStrategy(processContext, hashMap).join(new RecordJoinInput(flowFileWithRole, asControllerService, readerSchema), new RecordJoinInput(flowFileWithRole2, asControllerService2, readerSchema2), hashMap, session, asControllerService3.getSchema(hashMap, DataTypeUtils.merge(readerSchema, readerSchema2)));
                        try {
                            FlowFile create = session.create(contents);
                            OutputStream write = session.write(create);
                            try {
                                RecordSet recordSet = join.getRecordSet();
                                RecordSetWriter createWriter = asControllerService3.createWriter(getLogger(), recordSet.getSchema(), write, hashMap);
                                try {
                                    WriteResult write2 = createWriter.write(recordSet);
                                    String mimeType = createWriter.getMimeType();
                                    if (createWriter != null) {
                                        createWriter.close();
                                    }
                                    if (write != null) {
                                        write.close();
                                    }
                                    if (join != null) {
                                        join.close();
                                    }
                                    HashMap hashMap2 = new HashMap(write2.getAttributes());
                                    int recordCount = write2.getRecordCount();
                                    hashMap2.put("record.count", String.valueOf(recordCount));
                                    hashMap2.put(CoreAttributes.MIME_TYPE.key(), mimeType);
                                    session.putAllAttributes(create, hashMap2);
                                    session.adjustCounter("Records Written", recordCount, false);
                                    session.transfer(create, REL_JOINED);
                                    return new BinProcessingResult(false);
                                } catch (Throwable th) {
                                    if (createWriter != null) {
                                        try {
                                            createWriter.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    }
                                    throw th;
                                }
                            } catch (Throwable th3) {
                                if (write != null) {
                                    try {
                                        write.close();
                                    } catch (Throwable th4) {
                                        th3.addSuppressed(th4);
                                    }
                                }
                                throw th3;
                            }
                        } finally {
                        }
                    } catch (Exception e) {
                        getLogger().error("Failed to join 'original' FlowFile {} and 'enrichment' FlowFile {}; routing to failure", new Object[]{flowFileWithRole, flowFileWithRole2, e});
                        return transferFailure(contents, session);
                    }
                } catch (Exception e2) {
                    getLogger().error("Failed to determine Record Schema for the Record Writer from 'original' FlowFile {} and 'enrichment' FLowFile {}; routing to failure", new Object[]{flowFileWithRole, flowFileWithRole2, e2});
                    return transferFailure(contents, session);
                }
            } catch (Exception e3) {
                getLogger().error("Failed to determine Record Schema from 'enrichment' FlowFile {}; routing to failure", new Object[]{flowFileWithRole, e3});
                return transferFailure(contents, session);
            }
        } catch (Exception e4) {
            getLogger().error("Failed to determine Record Schema from 'original' FlowFile {}; routing to failure", new Object[]{flowFileWithRole, e4});
            return transferFailure(contents, session);
        }
    }

    private RecordSchema getReaderSchema(RecordReaderFactory recordReaderFactory, FlowFile flowFile, ProcessSession processSession) throws IOException, MalformedRecordException, SchemaNotFoundException {
        InputStream read = processSession.read(flowFile);
        try {
            RecordSchema schema = recordReaderFactory.createRecordReader(flowFile.getAttributes(), read, flowFile.getSize(), getLogger()).getSchema();
            if (read != null) {
                read.close();
            }
            return schema;
        } catch (Throwable th) {
            if (read != null) {
                try {
                    read.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private BinProcessingResult transferFailure(List<FlowFile> list, ProcessSession processSession) {
        processSession.transfer(list, REL_FAILURE);
        processSession.commitAsync();
        return new BinProcessingResult(true);
    }

    private RecordJoinStrategy getJoinStrategy(ProcessContext processContext, Map<String, String> map) {
        String value = processContext.getProperty(JOIN_STRATEGY).getValue();
        if (value.equalsIgnoreCase(JOIN_SQL.getValue())) {
            return new SqlJoinStrategy(this.sqlJoinCache, processContext.getProperty(SQL), getLogger(), processContext.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions(map).asInteger().intValue(), processContext.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions(map).asInteger().intValue());
        }
        if (value.equalsIgnoreCase(JOIN_WRAPPER.getValue())) {
            return new WrapperJoinStrategy(getLogger());
        }
        if (!value.equalsIgnoreCase(JOIN_INSERT_ENRICHMENT_FIELDS.getValue())) {
            throw new ProcessException("Invalid Join Strategy: " + value);
        }
        return new InsertRecordFieldsJoinStrategy(getLogger(), processContext.getProperty(INSERTION_RECORD_PATH).evaluateAttributeExpressions(map).getValue());
    }

    private FlowFile getFlowFileWithRole(Collection<FlowFile> collection, EnrichmentRole enrichmentRole) {
        for (FlowFile flowFile : collection) {
            if (getEnrichmentRole(flowFile) == enrichmentRole) {
                return flowFile;
            }
        }
        return null;
    }

    private EnrichmentRole getEnrichmentRole(FlowFile flowFile) {
        String attribute = flowFile.getAttribute(ENRICHMENT_ROLE_ATTRIBUTE);
        if (attribute == null) {
            return EnrichmentRole.UNKNOWN;
        }
        try {
            return EnrichmentRole.valueOf(attribute);
        } catch (Exception e) {
            return EnrichmentRole.UNKNOWN;
        }
    }

    private EnrichmentRole getOtherEnrichmentRole(EnrichmentRole enrichmentRole) {
        if (enrichmentRole == null) {
            return null;
        }
        switch (enrichmentRole) {
            case ENRICHMENT:
                return EnrichmentRole.ORIGINAL;
            case ORIGINAL:
                return EnrichmentRole.ENRICHMENT;
            case UNKNOWN:
                return EnrichmentRole.UNKNOWN;
            default:
                return null;
        }
    }

    protected int processBins(ProcessContext processContext, ProcessSessionFactory processSessionFactory) {
        Bin bin;
        Queue readyBins = getReadyBins();
        if (readyBins.size() <= 1) {
            return super.processBins(processContext, processSessionFactory);
        }
        ComponentLog logger = getLogger();
        int i = 0;
        long j = 0;
        ProcessSession createSession = processSessionFactory.createSession();
        while (isScheduled() && (bin = (Bin) readyBins.poll()) != null) {
            long j2 = 0;
            Iterator it = bin.getContents().iterator();
            while (it.hasNext()) {
                j2 += ((FlowFile) it.next()).getSize();
            }
            try {
                BinProcessingResult processBin = processBin(bin, processContext);
                if (!processBin.isCommitted()) {
                    ProcessSession session = bin.getSession();
                    bin.getContents().forEach(flowFile -> {
                        session.putAllAttributes(flowFile, processBin.getAttributes());
                    });
                    session.transfer(bin.getContents(), REL_ORIGINAL);
                    session.migrate(createSession);
                    session.commitAsync();
                }
                i++;
                j += j2;
            } catch (Exception e) {
                logger.error("Failed to process bundle of {} files due to {}; rolling back sessions", new Object[]{Integer.valueOf(bin.getContents().size()), e});
                bin.getSession().rollback();
            } catch (ProcessException e2) {
                logger.error("Failed to process bundle of {} files due to {}", new Object[]{Integer.valueOf(bin.getContents().size()), e2});
                ProcessSession session2 = bin.getSession();
                Iterator it2 = bin.getContents().iterator();
                while (it2.hasNext()) {
                    session2.transfer((FlowFile) it2.next(), REL_FAILURE);
                }
                session2.commitAsync();
            }
            if (i % 100 == 0 || j > 5000000) {
                createSession.commitAsync();
                break;
            }
        }
        createSession.commitAsync();
        return i;
    }
}
