package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.annotation.CapabilityDescription;
import org.apache.nifi.processor.annotation.EventDriven;
import org.apache.nifi.processor.annotation.SupportsBatching;
import org.apache.nifi.processor.annotation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

@SupportsBatching
@Tags({"experimental", "hash", "dupe", "duplicate", "dedupe"})
@EventDriven
@CapabilityDescription("Caches a value, computed from FlowFile attributes, for each incoming FlowFile and determines if the cached value has already been seen. If so, routes the FlowFile to 'duplicate' with an attribute named 'original.identifier' that specifies the original FlowFile's\"description\", which is specified in the <FlowFile Description> property. If the FlowFile is not determined to be a duplicate, the Processor routes the FlowFile to 'non-duplicate'")
/* loaded from: input_file:org/apache/nifi/processors/standard/DetectDuplicate.class */
public class DetectDuplicate extends AbstractProcessor {
    public static final String ORIGINAL_DESCRIPTION_ATTRIBUTE_NAME = "original.flowfile.description";
    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder().name("Distributed Cache Service").description("The Controller Service that is used to cache unique identifiers, used to determine duplicates").required(true).identifiesControllerService(DistributedMapCacheClient.class).build();
    public static final PropertyDescriptor CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder().name("Cache Entry Identifier").description("A FlowFile attribute, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the value used to identify duplicates; it is this value that is cached").required(true).addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)).defaultValue("${hash.value}").expressionLanguageSupported(true).build();
    public static final PropertyDescriptor FLOWFILE_DESCRIPTION = new PropertyDescriptor.Builder().name("FlowFile Description").description("When a FlowFile is added to the cache, this value is stored along with it so that if a duplicate is found, this description of the original FlowFile will be added to the duplicate's \"original.flowfile.description\" attribute").required(true).addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)).expressionLanguageSupported(true).defaultValue("").build();
    public static final PropertyDescriptor AGE_OFF_DURATION = new PropertyDescriptor.Builder().name("Age Off Duration").description("Time interval to age off cached FlowFiles").required(false).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    public static final Relationship REL_DUPLICATE = new Relationship.Builder().name("duplicate").description("If a FlowFile has been detected to be a duplicate, it will be routed to this relationship").build();
    public static final Relationship REL_NON_DUPLICATE = new Relationship.Builder().name("non-duplicate").description("If a FlowFile's Cache Entry Identifier was not found in the cache, it will be routed to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If unable to communicate with the cache, the FlowFile will be penalized and routed to this relationship").build();
    private final Set<Relationship> relationships;
    private final Serializer<String> keySerializer = new StringSerializer();
    private final Serializer<CacheValue> valueSerializer = new CacheValueSerializer();
    private final Deserializer<CacheValue> valueDeserializer = new CacheValueDeserializer();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processors/standard/DetectDuplicate$CacheValue.class */
    public static class CacheValue {
        private final String description;
        private final long entryTimeMS;

        public CacheValue(String str, long j) {
            this.description = str;
            this.entryTimeMS = j;
        }

        public String getDescription() {
            return this.description;
        }

        public long getEntryTimeMS() {
            return this.entryTimeMS;
        }
    }

    /* loaded from: input_file:org/apache/nifi/processors/standard/DetectDuplicate$CacheValueDeserializer.class */
    private static class CacheValueDeserializer implements Deserializer<CacheValue> {
        private CacheValueDeserializer() {
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public CacheValue m6deserialize(byte[] bArr) throws DeserializationException, IOException {
            if (bArr.length == 0) {
                return null;
            }
            return new CacheValue(new String(bArr, 8, bArr.length - 8, StandardCharsets.UTF_8), (bArr[0] << 56) + ((bArr[1] & 255) << 48) + ((bArr[2] & 255) << 40) + ((bArr[3] & 255) << 32) + ((bArr[4] & 255) << 24) + ((bArr[5] & 255) << 16) + ((bArr[6] & 255) << 8) + (bArr[7] & 255));
        }
    }

    /* loaded from: input_file:org/apache/nifi/processors/standard/DetectDuplicate$CacheValueSerializer.class */
    private static class CacheValueSerializer implements Serializer<CacheValue> {
        private CacheValueSerializer() {
        }

        public void serialize(CacheValue cacheValue, OutputStream outputStream) throws SerializationException, IOException {
            outputStream.write(new byte[]{(byte) (r0 >>> 56), (byte) (r0 >>> 48), (byte) (r0 >>> 40), (byte) (r0 >>> 32), (byte) (r0 >>> 24), (byte) (r0 >>> 16), (byte) (r0 >>> 8), (byte) cacheValue.getEntryTimeMS()}, 0, 8);
            outputStream.write(cacheValue.getDescription().getBytes(StandardCharsets.UTF_8));
        }
    }

    /* loaded from: input_file:org/apache/nifi/processors/standard/DetectDuplicate$StringSerializer.class */
    private static class StringSerializer implements Serializer<String> {
        private StringSerializer() {
        }

        public void serialize(String str, OutputStream outputStream) throws SerializationException, IOException {
            outputStream.write(str.getBytes(StandardCharsets.UTF_8));
        }
    }

    public DetectDuplicate() {
        HashSet hashSet = new HashSet();
        hashSet.add(REL_DUPLICATE);
        hashSet.add(REL_NON_DUPLICATE);
        hashSet.add(REL_FAILURE);
        this.relationships = Collections.unmodifiableSet(hashSet);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(CACHE_ENTRY_IDENTIFIER);
        arrayList.add(FLOWFILE_DESCRIPTION);
        arrayList.add(AGE_OFF_DURATION);
        arrayList.add(DISTRIBUTED_CACHE_SERVICE);
        return arrayList;
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        ProcessorLog logger = getLogger();
        String value = processContext.getProperty(CACHE_ENTRY_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
        if (StringUtils.isBlank(value)) {
            logger.error("FlowFile {} has no attribute for given Cache Entry Identifier", new Object[]{flowFile});
            processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
            return;
        }
        DistributedMapCacheClient asControllerService = processContext.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
        Long asTimePeriod = processContext.getProperty(AGE_OFF_DURATION).asTimePeriod(TimeUnit.MILLISECONDS);
        long currentTimeMillis = System.currentTimeMillis();
        try {
            CacheValue cacheValue = new CacheValue(processContext.getProperty(FLOWFILE_DESCRIPTION).evaluateAttributeExpressions(flowFile).getValue(), currentTimeMillis);
            CacheValue cacheValue2 = (CacheValue) asControllerService.getAndPutIfAbsent(value, cacheValue, this.keySerializer, this.valueSerializer, this.valueDeserializer);
            boolean z = cacheValue2 != null;
            if (z && asTimePeriod != null && currentTimeMillis >= cacheValue2.getEntryTimeMS() + asTimePeriod.longValue()) {
                logger.debug("Removal of expired cached entry with key {} returned {}", new Object[]{value, Boolean.valueOf(asControllerService.remove(value, this.keySerializer))});
                z = !asControllerService.putIfAbsent(value, cacheValue, this.keySerializer, this.valueSerializer);
            }
            if (z) {
                processSession.getProvenanceReporter().route(flowFile, REL_DUPLICATE, "Duplicate of: original.flowfile.description");
                String description = cacheValue2.getDescription();
                FlowFile putAttribute = processSession.putAttribute(flowFile, ORIGINAL_DESCRIPTION_ATTRIBUTE_NAME, description);
                processSession.transfer(putAttribute, REL_DUPLICATE);
                logger.info("Found {} to be a duplicate of FlowFile with description {}", new Object[]{putAttribute, description});
                processSession.adjustCounter("Duplicates Detected", 1L, false);
            } else {
                processSession.getProvenanceReporter().route(flowFile, REL_NON_DUPLICATE);
                processSession.transfer(flowFile, REL_NON_DUPLICATE);
                logger.info("Could not find a duplicate entry in cache for {}; routing to non-duplicate", new Object[]{flowFile});
                processSession.adjustCounter("Non-Duplicate Files Processed", 1L, false);
            }
        } catch (IOException e) {
            FlowFile penalize = processSession.penalize(flowFile);
            processSession.transfer(penalize, REL_FAILURE);
            logger.error("Unable to communicate with cache when processing {} due to {}", new Object[]{penalize, e});
        }
    }
}
