package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.WaitNotifyProtocol;

@CapabilityDescription("Routes incoming FlowFiles to the 'wait' relationship until a matching release signal is stored in the distributed cache from a corresponding Notify processor. When a matching release signal is identified, a waiting FlowFile is routed to the 'success' relationship. The release signal entry is then removed from the cache. The attributes of the FlowFile that produced the release signal are copied to the waiting FlowFile if the Attribute Cache Regex property of the corresponding Notify processor is set properly. If there are multiple release signals in the cache identified by the Release Signal Identifier, and the Notify processor is configured to copy the FlowFile attributes to the cache, then the FlowFile passing the Wait processor receives the union of the attributes of the FlowFiles that produced the release signals in the cache (identified by Release Signal Identifier). Waiting FlowFiles will be routed to 'expired' if they exceed the Expiration Duration. If you need to wait for more than one signal, specify the desired number of signals via the 'Target Signal Count' property. This is particularly useful with processors that split a source FlowFile into multiple fragments, such as SplitText. In order to wait for all fragments to be processed, connect the 'original' relationship to a Wait processor, and the 'splits' relationship to a corresponding Notify processor. Configure the Notify and Wait processors to use the '${fragment.identifier}' as the value of 'Release Signal Identifier', and specify '${fragment.count}' as the value of 'Target Signal Count' in the Wait processor.It is recommended to use a prioritizer (for instance First In First Out) when using the 'wait' relationship as a loop.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@SupportsBatching
@Tags({"map", "cache", "wait", "hold", "distributed", "signal", "release"})
@WritesAttributes({@WritesAttribute(attribute = Wait.WAIT_START_TIMESTAMP, description = "All FlowFiles will have an attribute 'wait.start.timestamp', which sets the initial epoch timestamp when the file first entered this processor.  This is used to determine the expiration time of the FlowFile.  This attribute is not written when the FlowFile is transferred to failure, expired or success"), @WritesAttribute(attribute = "wait.counter.<counterName>", description = "The name of each counter for which at least one signal has been present in the cache since the last time the cache was empty gets copied to the current FlowFile as an attribute.")})
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.processors.standard.Notify"})
/* loaded from: input_file:org/apache/nifi/processors/standard/Wait.class */
public class Wait extends AbstractProcessor {
    public static final String WAIT_START_TIMESTAMP = "wait.start.timestamp";
    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder().name("distributed-cache-service").displayName("Distributed Cache Service").description("The Controller Service that is used to check for release signals from a corresponding Notify processor").required(true).identifiesControllerService(AtomicDistributedMapCacheClient.class).build();
    public static final PropertyDescriptor RELEASE_SIGNAL_IDENTIFIER = new PropertyDescriptor.Builder().name("release-signal-id").displayName("Release Signal Identifier").description("A value that specifies the key to a specific release signal cache. To decide whether the FlowFile that is being processed by the Wait processor should be sent to the 'success' or the 'wait' relationship, the processor checks the signals in the cache specified by this key.").required(true).addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor TARGET_SIGNAL_COUNT = new PropertyDescriptor.Builder().name("target-signal-count").displayName("Target Signal Count").description("The number of signals that need to be in the cache (specified by the Release Signal Identifier) in order for the FlowFile processed by the Wait processor to be sent to the ‘success’ relationship. If the number of signals in the cache has reached this number, the FlowFile is routed to the 'success' relationship and the number of signals in the cache is decreased by this value. If Signal Counter Name is specified, this processor checks a particular counter, otherwise checks against the total number of signals in the cache.").required(true).addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("1").build();
    public static final PropertyDescriptor SIGNAL_COUNTER_NAME = new PropertyDescriptor.Builder().name("signal-counter-name").displayName("Signal Counter Name").description("Within the cache (specified by the Release Signal Identifier) the signals may belong to different counters. If this property is specified, the processor checks the number of signals in the cache that belong to this particular counter. If not specified, the processor checks the total number of signals in the cache.").required(false).addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor WAIT_BUFFER_COUNT = new PropertyDescriptor.Builder().name("wait-buffer-count").displayName("Wait Buffer Count").description("Specify the maximum number of incoming FlowFiles that can be buffered to check whether it can move forward. The more buffer can provide the better performance, as it reduces the number of interactions with cache service by grouping FlowFiles by signal identifier. Only a signal identifier can be processed at a processor execution.").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("1").build();
    public static final PropertyDescriptor RELEASABLE_FLOWFILE_COUNT = new PropertyDescriptor.Builder().name("releasable-flowfile-count").displayName("Releasable FlowFile Count").description("A value, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the releasable FlowFile count. This specifies how many FlowFiles can be released when a target count reaches target signal count. Zero (0) has a special meaning, any number of FlowFiles can be released as long as signal count matches target.").required(true).addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("1").build();
    public static final PropertyDescriptor EXPIRATION_DURATION = new PropertyDescriptor.Builder().name("expiration-duration").displayName("Expiration Duration").description("Indicates the duration after which waiting FlowFiles will be routed to the 'expired' relationship").required(true).defaultValue("10 min").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    public static final AllowableValue ATTRIBUTE_COPY_REPLACE = new AllowableValue(PutFile.REPLACE_RESOLUTION, "Replace if present", "When cached attributes are copied onto released FlowFiles, they replace any matching attributes.");
    public static final AllowableValue ATTRIBUTE_COPY_KEEP_ORIGINAL = new AllowableValue("keeporiginal", "Keep original", "Attributes on released FlowFiles are not overwritten by copied cached attributes.");
    public static final PropertyDescriptor ATTRIBUTE_COPY_MODE = new PropertyDescriptor.Builder().name("attribute-copy-mode").displayName("Attribute Copy Mode").description("Specifies how to handle attributes copied from FlowFiles entering the Notify processor").defaultValue(ATTRIBUTE_COPY_KEEP_ORIGINAL.getValue()).required(true).allowableValues(new DescribedValue[]{ATTRIBUTE_COPY_REPLACE, ATTRIBUTE_COPY_KEEP_ORIGINAL}).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    public static final AllowableValue WAIT_MODE_TRANSFER_TO_WAIT = new AllowableValue("wait", "Transfer to wait relationship", "Transfer a FlowFile to the 'wait' relationship when whose release signal has not been notified yet. This mode allows other incoming FlowFiles to be enqueued by moving FlowFiles into the wait relationship. It is recommended to set a prioritizer (for instance First In First Out) on the 'wait' relationship.");
    public static final AllowableValue WAIT_MODE_KEEP_IN_UPSTREAM = new AllowableValue("keep", "Keep in the upstream connection", "Transfer a FlowFile to the upstream connection where it comes from when whose release signal has not been notified yet. This mode helps keeping upstream connection being full so that the upstream source processor will not be scheduled while back-pressure is active and limit incoming FlowFiles. ");
    public static final PropertyDescriptor WAIT_MODE = new PropertyDescriptor.Builder().name("wait-mode").displayName("Wait Mode").description("Specifies how to handle a FlowFile waiting for a notify signal").defaultValue(WAIT_MODE_TRANSFER_TO_WAIT.getValue()).required(true).allowableValues(new DescribedValue[]{WAIT_MODE_TRANSFER_TO_WAIT, WAIT_MODE_KEEP_IN_UPSTREAM}).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    public static final PropertyDescriptor WAIT_PENALTY_DURATION = new PropertyDescriptor.Builder().name("wait-penalty-duration").displayName("Wait Penalty Duration").description("If configured, after a signal identifier got processed but did not meet the release criteria, the signal identifier is penalized and FlowFiles having the signal identifier will not be processed again for the specified period of time, so that the signal identifier will not block others to be processed. This can be useful for use cases where a Wait processor is expected to process multiple signal identifiers, and each signal identifier has multiple FlowFiles, and also the order of releasing FlowFiles is important within a signal identifier. The FlowFile order can be configured with Prioritizers. IMPORTANT: There is a limitation of number of queued signals can be processed, and Wait processor may not be able to check all queued signal ids. See additional details for the best practice.").required(false).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("A FlowFile with a matching release signal in the cache will be routed to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("When the cache cannot be reached, or if the Release Signal Identifier evaluates to null or empty, FlowFiles will be routed to this relationship").build();
    public static final Relationship REL_WAIT = new Relationship.Builder().name("wait").description("A FlowFile with no matching release signal in the cache will be routed to this relationship").build();
    public static final Relationship REL_EXPIRED = new Relationship.Builder().name("expired").description("A FlowFile that has exceeded the configured Expiration Duration will be routed to this relationship").build();
    private final Set<Relationship> relationships;
    private final Map<String, Long> signalIdPenalties = new HashMap();

    public Wait() {
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_WAIT);
        hashSet.add(REL_EXPIRED);
        hashSet.add(REL_FAILURE);
        this.relationships = Collections.unmodifiableSet(hashSet);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(RELEASE_SIGNAL_IDENTIFIER);
        arrayList.add(TARGET_SIGNAL_COUNT);
        arrayList.add(SIGNAL_COUNTER_NAME);
        arrayList.add(WAIT_BUFFER_COUNT);
        arrayList.add(RELEASABLE_FLOWFILE_COUNT);
        arrayList.add(EXPIRATION_DURATION);
        arrayList.add(DISTRIBUTED_CACHE_SERVICE);
        arrayList.add(ATTRIBUTE_COPY_MODE);
        arrayList.add(WAIT_MODE);
        arrayList.add(WAIT_PENALTY_DURATION);
        return arrayList;
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        ComponentLog logger = getLogger();
        PropertyValue property = processContext.getProperty(RELEASE_SIGNAL_IDENTIFIER);
        Integer asInteger = processContext.getProperty(WAIT_BUFFER_COUNT).asInteger();
        HashMap hashMap = new HashMap();
        Function function = relationship -> {
            return (List) hashMap.computeIfAbsent(relationship, relationship -> {
                return new ArrayList();
            });
        };
        AtomicReference atomicReference = new AtomicReference();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ArrayList arrayList = new ArrayList();
        Supplier supplier = () -> {
            return atomicInteger.incrementAndGet() == asInteger.intValue() ? FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE : FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
        };
        if (!this.signalIdPenalties.isEmpty()) {
            Iterator<Map.Entry<String, Long>> it = this.signalIdPenalties.entrySet().iterator();
            long currentTimeMillis = System.currentTimeMillis();
            while (it.hasNext()) {
                if (it.next().getValue().longValue() < currentTimeMillis) {
                    it.remove();
                }
            }
        }
        List<FlowFile> list = processSession.get(flowFile -> {
            String value = property.evaluateAttributeExpressions(flowFile).getValue();
            if (StringUtils.isBlank(value)) {
                logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[]{flowFile});
                arrayList.add(flowFile);
                return FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
            }
            if (this.signalIdPenalties.containsKey(value)) {
                return FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
            }
            String str = (String) atomicReference.get();
            if (str != null) {
                return str.equals(value) ? (FlowFileFilter.FlowFileFilterResult) supplier.get() : FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
            }
            atomicReference.set(value);
            return (FlowFileFilter.FlowFileFilterResult) supplier.get();
        });
        boolean equals = ATTRIBUTE_COPY_REPLACE.getValue().equals(processContext.getProperty(ATTRIBUTE_COPY_MODE).getValue());
        AtomicReference atomicReference2 = new AtomicReference();
        HashMap hashMap2 = new HashMap();
        Consumer consumer = flowFile2 -> {
            ((List) function.apply(REL_FAILURE)).add(clearWaitState(processSession, processSession.penalize(flowFile2)));
        };
        Consumer consumer2 = entry -> {
            Relationship relationship2 = (Relationship) entry.getKey();
            if (REL_WAIT.equals(relationship2)) {
                if (WAIT_MODE_KEEP_IN_UPSTREAM.getValue().equals(processContext.getProperty(WAIT_MODE).getValue())) {
                    relationship2 = Relationship.SELF;
                }
            }
            Relationship relationship3 = relationship2;
            processSession.transfer((List) ((List) entry.getValue()).stream().map(flowFile3 -> {
                if (REL_SUCCESS.equals(relationship3) || REL_EXPIRED.equals(relationship3)) {
                    flowFile3 = clearWaitState(processSession, flowFile3);
                }
                return copySignalAttributes(processSession, flowFile3, (WaitNotifyProtocol.Signal) atomicReference2.get(), hashMap2, equals);
            }).collect(Collectors.toList()), relationship2);
        };
        arrayList.forEach(flowFile3 -> {
            list.remove(flowFile3);
            consumer.accept(flowFile3);
        });
        if (list.isEmpty()) {
            hashMap.entrySet().forEach(consumer2);
            return;
        }
        WaitNotifyProtocol waitNotifyProtocol = new WaitNotifyProtocol(processContext.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(AtomicDistributedMapCacheClient.class));
        String str = (String) atomicReference.get();
        try {
            WaitNotifyProtocol.Signal signal = waitNotifyProtocol.getSignal(str);
            if (signal != null) {
                hashMap2.putAll(signal.getCounts());
            }
            atomicReference2.set(signal);
            String str2 = null;
            long j = 1;
            int i = 1;
            ArrayList arrayList2 = new ArrayList();
            for (FlowFile flowFile4 : list) {
                String attribute = flowFile4.getAttribute(WAIT_START_TIMESTAMP);
                if (attribute == null) {
                    attribute = String.valueOf(System.currentTimeMillis());
                    flowFile4 = processSession.putAttribute(flowFile4, WAIT_START_TIMESTAMP, attribute);
                }
                try {
                    long parseLong = Long.parseLong(attribute);
                    long longValue = processContext.getProperty(EXPIRATION_DURATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
                    long currentTimeMillis2 = System.currentTimeMillis();
                    if (currentTimeMillis2 > parseLong + longValue) {
                        logger.info("FlowFile {} expired after {}ms", new Object[]{flowFile4, Long.valueOf(currentTimeMillis2 - parseLong)});
                        ((List) function.apply(REL_EXPIRED)).add(flowFile4);
                    } else if (signal == null) {
                        if (logger.isDebugEnabled()) {
                            logger.debug("No release signal found for {} on FlowFile {} yet", new Object[]{str, flowFile4});
                        }
                        ((List) function.apply(REL_WAIT)).add(flowFile4);
                    } else {
                        if (arrayList2.isEmpty()) {
                            str2 = processContext.getProperty(SIGNAL_COUNTER_NAME).evaluateAttributeExpressions(flowFile4).getValue();
                            try {
                                j = Long.valueOf(processContext.getProperty(TARGET_SIGNAL_COUNT).evaluateAttributeExpressions(flowFile4).getValue()).longValue();
                                try {
                                    i = Integer.valueOf(processContext.getProperty(RELEASABLE_FLOWFILE_COUNT).evaluateAttributeExpressions(flowFile4).getValue()).intValue();
                                } catch (NumberFormatException e) {
                                    consumer.accept(flowFile4);
                                    logger.error("Failed to parse releasableFlowFileCount when processing {} due to {}", new Object[]{flowFile4, e, e});
                                }
                            } catch (NumberFormatException e2) {
                                consumer.accept(flowFile4);
                                logger.error("Failed to parse targetCount when processing {} due to {}", new Object[]{flowFile4, e2, e2});
                            }
                        }
                        arrayList2.add(flowFile4);
                    }
                } catch (NumberFormatException e3) {
                    logger.error("{} has an invalid value '{}' on FlowFile {}", new Object[]{WAIT_START_TIMESTAMP, attribute, flowFile4});
                    consumer.accept(flowFile4);
                }
            }
            boolean z = false;
            boolean z2 = false;
            if (signal != null && !arrayList2.isEmpty()) {
                if (i > 0) {
                    signal.releaseCandidates(str2, j, i, arrayList2, list2 -> {
                        ((List) function.apply(REL_SUCCESS)).addAll(list2);
                    }, list3 -> {
                        ((List) function.apply(REL_WAIT)).addAll(list3);
                    });
                    z = signal.getTotalCount() == 0 && signal.getReleasableCount() == 0;
                    z2 = !((List) function.apply(REL_SUCCESS)).isEmpty();
                } else if (StringUtils.isBlank(str2) ? signal.isTotalCountReached(j) : signal.isCountReached(str2, j)) {
                    ((List) function.apply(REL_SUCCESS)).addAll(arrayList2);
                } else {
                    ((List) function.apply(REL_WAIT)).addAll(arrayList2);
                }
            }
            hashMap.entrySet().forEach(consumer2);
            PropertyValue property2 = processContext.getProperty(WAIT_PENALTY_DURATION);
            if (property2.isSet() && ((List) function.apply(REL_SUCCESS)).isEmpty()) {
                this.signalIdPenalties.put(str, Long.valueOf(System.currentTimeMillis() + property2.asTimePeriod(TimeUnit.MILLISECONDS).longValue()));
            }
            try {
                if (!z) {
                    if (z2) {
                        waitNotifyProtocol.replace(signal);
                    }
                }
                waitNotifyProtocol.complete(str);
            } catch (IOException e4) {
                processSession.rollback();
                throw new ProcessException(String.format("Unable to communicate with cache while updating %s due to %s", str, e4), e4);
            }
        } catch (IOException e5) {
            throw new ProcessException(String.format("Failed to get signal for %s due to %s", str, e5), e5);
        }
    }

    private FlowFile clearWaitState(ProcessSession processSession, FlowFile flowFile) {
        return processSession.removeAttribute(flowFile, WAIT_START_TIMESTAMP);
    }

    private FlowFile copySignalAttributes(ProcessSession processSession, FlowFile flowFile, WaitNotifyProtocol.Signal signal, Map<String, Long> map, boolean z) {
        Map map2;
        if (signal == null) {
            return flowFile;
        }
        if (z) {
            map2 = new HashMap(signal.getAttributes());
            map2.remove("uuid");
        } else {
            map2 = (Map) signal.getAttributes().entrySet().stream().filter(entry -> {
                return flowFile.getAttribute((String) entry.getKey()) == null;
            }).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            }));
        }
        Map map3 = map2;
        map2.put("wait.counter.total", String.valueOf(map.entrySet().stream().mapToLong(entry2 -> {
            Long l = (Long) entry2.getValue();
            map3.put("wait.counter." + ((String) entry2.getKey()), String.valueOf(l));
            return l.longValue();
        }).sum()));
        return processSession.putAllAttributes(flowFile, map2);
    }

    @OnStopped
    public void onStopped(ProcessContext processContext) {
        this.signalIdPenalties.clear();
    }

    Map<String, Long> getSignalIdPenalties() {
        return this.signalIdPenalties;
    }
}
