package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.JmsProperties;

@CapabilityDescription("Caches a release signal identifier in the distributed cache, optionally along with the FlowFile's attributes.  Any flow files held at a corresponding Wait processor will be released once this signal in the cache is discovered.")
@SupportsBatching
@WritesAttribute(attribute = Notify.NOTIFIED_ATTRIBUTE_NAME, description = "All FlowFiles will have an attribute 'notified'. The value of this attribute is true, is the FlowFile is notified, otherwise false.")
@EventDriven
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({JmsProperties.MSG_TYPE_MAP, "cache", "notify", "distributed", "signal", "release"})
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.processors.standard.Wait"})
/* loaded from: input_file:org/apache/nifi/processors/standard/Notify.class */
public class Notify extends AbstractProcessor {
    public static final String NOTIFIED_ATTRIBUTE_NAME = "notified";
    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder().name("distributed-cache-service").displayName("Distributed Cache Service").description("The Controller Service that is used to cache release signals in order to release files queued at a corresponding Wait processor").required(true).identifiesControllerService(AtomicDistributedMapCacheClient.class).build();
    public static final PropertyDescriptor RELEASE_SIGNAL_IDENTIFIER = new PropertyDescriptor.Builder().name("release-signal-id").displayName("Release Signal Identifier").description("A value, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the release signal cache key").required(true).addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)).expressionLanguageSupported(true).build();
    public static final PropertyDescriptor SIGNAL_COUNTER_NAME = new PropertyDescriptor.Builder().name("signal-counter-name").displayName("Signal Counter Name").description("A value, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the signal counter name. Signal counter name is useful when a corresponding Wait processor needs to know the number of occurrences of different types of events, such as success or failure, or destination data source names, etc.").required(true).addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)).expressionLanguageSupported(true).defaultValue("default").build();
    public static final PropertyDescriptor SIGNAL_COUNTER_DELTA = new PropertyDescriptor.Builder().name("signal-counter-delta").displayName("Signal Counter Delta").description("A value, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the signal counter delta. Specify how much the counter should increase. For example, if multiple signal events are processed at upstream flow in batch oriented way, the number of events processed can be notified with this property at once. Zero (0) has a special meaning, it clears target count back to 0, which is especially useful when used with Wait " + Wait.RELEASABLE_FLOWFILE_COUNT.getDisplayName() + " = Zero (0) mode, to provide 'open-close-gate' type of flow control. One (1) can open a corresponding Wait processor, and Zero (0) can negate it as if closing a gate.").required(true).addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)).expressionLanguageSupported(true).defaultValue("1").build();
    public static final PropertyDescriptor SIGNAL_BUFFER_COUNT = new PropertyDescriptor.Builder().name("signal-buffer-count").displayName("Signal Buffer Count").description("Specify the maximum number of incoming flow files that can be buffered until signals are notified to cache service. The more buffer can provide the better performance, as it reduces the number of interactions with cache service by grouping signals by signal identifier when multiple incoming flow files share the same signal identifier.").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("1").build();
    public static final PropertyDescriptor ATTRIBUTE_CACHE_REGEX = new PropertyDescriptor.Builder().name("attribute-cache-regex").displayName("Attribute Cache Regex").description("Any attributes whose names match this regex will be stored in the distributed cache to be copied to any FlowFiles released from a corresponding Wait processor.  Note that the uuid attribute will not be cached regardless of this value.  If blank, no attributes will be cached.").addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).expressionLanguageSupported(false).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles where the release signal has been successfully entered in the cache will be routed to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("When the cache cannot be reached, or if the Release Signal Identifier evaluates to null or empty, FlowFiles will be routed to this relationship").build();
    private final Set<Relationship> relationships;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processors/standard/Notify$SignalBuffer.class */
    public class SignalBuffer {
        final Map<String, Integer> deltas;
        final Map<String, String> attributesToCache;
        final List<FlowFile> flowFiles;

        private SignalBuffer() {
            this.deltas = new HashMap();
            this.attributesToCache = new HashMap();
            this.flowFiles = new ArrayList();
        }

        int incrementDelta(String str, int i) {
            int intValue = i == 0 ? 0 : (this.deltas.containsKey(str) ? this.deltas.get(str).intValue() : 0) + i;
            this.deltas.put(str, Integer.valueOf(intValue));
            return intValue;
        }
    }

    public Notify() {
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_FAILURE);
        this.relationships = Collections.unmodifiableSet(hashSet);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(RELEASE_SIGNAL_IDENTIFIER);
        arrayList.add(SIGNAL_COUNTER_NAME);
        arrayList.add(SIGNAL_COUNTER_DELTA);
        arrayList.add(SIGNAL_BUFFER_COUNT);
        arrayList.add(DISTRIBUTED_CACHE_SERVICE);
        arrayList.add(ATTRIBUTE_CACHE_REGEX);
        return arrayList;
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FlowFile flowFile;
        ComponentLog logger = getLogger();
        PropertyValue property = processContext.getProperty(RELEASE_SIGNAL_IDENTIFIER);
        PropertyValue property2 = processContext.getProperty(SIGNAL_COUNTER_NAME);
        PropertyValue property3 = processContext.getProperty(SIGNAL_COUNTER_DELTA);
        String value = processContext.getProperty(ATTRIBUTE_CACHE_REGEX).getValue();
        Integer asInteger = processContext.getProperty(SIGNAL_BUFFER_COUNT).asInteger();
        WaitNotifyProtocol waitNotifyProtocol = new WaitNotifyProtocol(processContext.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(AtomicDistributedMapCacheClient.class));
        HashMap hashMap = new HashMap();
        for (int i = 0; i < asInteger.intValue() && (flowFile = processSession.get()) != null; i++) {
            String value2 = property.evaluateAttributeExpressions(flowFile).getValue();
            if (StringUtils.isBlank(value2)) {
                logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[]{flowFile});
                processSession.transfer(processSession.putAttribute(flowFile, NOTIFIED_ATTRIBUTE_NAME, String.valueOf(false)), REL_FAILURE);
            } else {
                String value3 = property2.evaluateAttributeExpressions(flowFile).getValue();
                if (StringUtils.isEmpty(value3)) {
                    value3 = "default";
                }
                int i2 = 1;
                if (property3.isSet()) {
                    try {
                        i2 = Integer.parseInt(property3.evaluateAttributeExpressions(flowFile).getValue());
                    } catch (NumberFormatException e) {
                        logger.error("Failed to calculate delta for FlowFile {} due to {}", new Object[]{flowFile, e}, e);
                        processSession.transfer(processSession.putAttribute(flowFile, NOTIFIED_ATTRIBUTE_NAME, String.valueOf(false)), REL_FAILURE);
                    }
                }
                if (!hashMap.containsKey(value2)) {
                    hashMap.put(value2, new SignalBuffer());
                }
                SignalBuffer signalBuffer = (SignalBuffer) hashMap.get(value2);
                if (StringUtils.isNotEmpty(value)) {
                    flowFile.getAttributes().entrySet().stream().filter(entry -> {
                        return !((String) entry.getKey()).equals("uuid") && ((String) entry.getKey()).matches(value);
                    }).forEach(entry2 -> {
                    });
                }
                signalBuffer.incrementDelta(value3, i2);
                signalBuffer.flowFiles.add(flowFile);
                if (logger.isDebugEnabled()) {
                    logger.debug("Cached release signal identifier {} counterName {} from FlowFile {}", new Object[]{value2, value3, flowFile});
                }
            }
        }
        hashMap.forEach((str, signalBuffer2) -> {
            try {
                waitNotifyProtocol.notify(str, signalBuffer2.deltas, signalBuffer2.attributesToCache);
                signalBuffer2.flowFiles.forEach(flowFile2 -> {
                    processSession.transfer(processSession.putAttribute(flowFile2, NOTIFIED_ATTRIBUTE_NAME, String.valueOf(true)), REL_SUCCESS);
                });
            } catch (IOException e2) {
                throw new RuntimeException(String.format("Unable to communicate with cache when processing %s due to %s", str, e2), e2);
            }
        });
    }
}
