/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

@EventDriven
@SupportsBatching
@Tags(value={"hash", "dupe", "duplicate", "dedupe"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="Caches a value, computed from FlowFile attributes, for each incoming FlowFile and determines if the cached value has already been seen. If so, routes the FlowFile to 'duplicate' with an attribute named 'original.identifier' that specifies the original FlowFile's\"description\", which is specified in the <FlowFile Description> property. If the FlowFile is not determined to be a duplicate, the Processor routes the FlowFile to 'non-duplicate'")
@WritesAttribute(attribute="original.flowfile.description", description="All FlowFiles routed to the duplicate relationship will have an attribute added named original.flowfile.description. The value of this attribute is determined by the attributes of the original copy of the data and by the FlowFile Description property.")
@SeeAlso(classNames={"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer"})
public class DetectDuplicate
extends AbstractProcessor {
    public static final String ORIGINAL_DESCRIPTION_ATTRIBUTE_NAME = "original.flowfile.description";
    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder().name("Distributed Cache Service").description("The Controller Service that is used to cache unique identifiers, used to determine duplicates").required(true).identifiesControllerService(DistributedMapCacheClient.class).build();
    public static final PropertyDescriptor CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder().name("Cache Entry Identifier").description("A FlowFile attribute, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the value used to identify duplicates; it is this value that is cached").required(true).addValidator(StandardValidators.createAttributeExpressionLanguageValidator((AttributeExpression.ResultType)AttributeExpression.ResultType.STRING, (boolean)true)).defaultValue("${hash.value}").expressionLanguageSupported(true).build();
    public static final PropertyDescriptor FLOWFILE_DESCRIPTION = new PropertyDescriptor.Builder().name("FlowFile Description").description("When a FlowFile is added to the cache, this value is stored along with it so that if a duplicate is found, this description of the original FlowFile will be added to the duplicate's \"original.flowfile.description\" attribute").required(true).addValidator(StandardValidators.createAttributeExpressionLanguageValidator((AttributeExpression.ResultType)AttributeExpression.ResultType.STRING, (boolean)true)).expressionLanguageSupported(true).defaultValue("").build();
    public static final PropertyDescriptor AGE_OFF_DURATION = new PropertyDescriptor.Builder().name("Age Off Duration").description("Time interval to age off cached FlowFiles").required(false).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    public static final Relationship REL_DUPLICATE = new Relationship.Builder().name("duplicate").description("If a FlowFile has been detected to be a duplicate, it will be routed to this relationship").build();
    public static final Relationship REL_NON_DUPLICATE = new Relationship.Builder().name("non-duplicate").description("If a FlowFile's Cache Entry Identifier was not found in the cache, it will be routed to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If unable to communicate with the cache, the FlowFile will be penalized and routed to this relationship").build();
    private final Set<Relationship> relationships;
    private final Serializer<String> keySerializer = new StringSerializer();
    private final Serializer<CacheValue> valueSerializer = new CacheValueSerializer();
    private final Deserializer<CacheValue> valueDeserializer = new CacheValueDeserializer();

    public DetectDuplicate() {
        HashSet<Relationship> rels = new HashSet<Relationship>();
        rels.add(REL_DUPLICATE);
        rels.add(REL_NON_DUPLICATE);
        rels.add(REL_FAILURE);
        this.relationships = Collections.unmodifiableSet(rels);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(CACHE_ENTRY_IDENTIFIER);
        descriptors.add(FLOWFILE_DESCRIPTION);
        descriptors.add(AGE_OFF_DURATION);
        descriptors.add(DISTRIBUTED_CACHE_SERVICE);
        return descriptors;
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        ProcessorLog logger = this.getLogger();
        String cacheKey = context.getProperty(CACHE_ENTRY_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
        if (StringUtils.isBlank((CharSequence)cacheKey)) {
            logger.error("FlowFile {} has no attribute for given Cache Entry Identifier", new Object[]{flowFile});
            flowFile = session.penalize(flowFile);
            session.transfer(flowFile, REL_FAILURE);
            return;
        }
        DistributedMapCacheClient cache = (DistributedMapCacheClient)context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
        Long durationMS = context.getProperty(AGE_OFF_DURATION).asTimePeriod(TimeUnit.MILLISECONDS);
        long now = System.currentTimeMillis();
        try {
            boolean duplicate;
            String flowFileDescription = context.getProperty(FLOWFILE_DESCRIPTION).evaluateAttributeExpressions(flowFile).getValue();
            CacheValue cacheValue = new CacheValue(flowFileDescription, now);
            CacheValue originalCacheValue = (CacheValue)cache.getAndPutIfAbsent((Object)cacheKey, (Object)cacheValue, this.keySerializer, this.valueSerializer, this.valueDeserializer);
            boolean bl = duplicate = originalCacheValue != null;
            if (duplicate && durationMS != null && now >= originalCacheValue.getEntryTimeMS() + durationMS) {
                boolean status = cache.remove((Object)cacheKey, this.keySerializer);
                logger.debug("Removal of expired cached entry with key {} returned {}", new Object[]{cacheKey, status});
                boolean bl2 = duplicate = !cache.putIfAbsent((Object)cacheKey, (Object)cacheValue, this.keySerializer, this.valueSerializer);
            }
            if (duplicate) {
                session.getProvenanceReporter().route(flowFile, REL_DUPLICATE, "Duplicate of: original.flowfile.description");
                String originalFlowFileDescription = originalCacheValue.getDescription();
                flowFile = session.putAttribute(flowFile, ORIGINAL_DESCRIPTION_ATTRIBUTE_NAME, originalFlowFileDescription);
                session.transfer(flowFile, REL_DUPLICATE);
                logger.info("Found {} to be a duplicate of FlowFile with description {}", new Object[]{flowFile, originalFlowFileDescription});
                session.adjustCounter("Duplicates Detected", 1L, false);
            } else {
                session.getProvenanceReporter().route(flowFile, REL_NON_DUPLICATE);
                session.transfer(flowFile, REL_NON_DUPLICATE);
                logger.info("Could not find a duplicate entry in cache for {}; routing to non-duplicate", new Object[]{flowFile});
                session.adjustCounter("Non-Duplicate Files Processed", 1L, false);
            }
        }
        catch (IOException e) {
            flowFile = session.penalize(flowFile);
            session.transfer(flowFile, REL_FAILURE);
            logger.error("Unable to communicate with cache when processing {} due to {}", new Object[]{flowFile, e});
        }
    }

    private static class StringSerializer
    implements Serializer<String> {
        private StringSerializer() {
        }

        public void serialize(String value, OutputStream out) throws SerializationException, IOException {
            out.write(value.getBytes(StandardCharsets.UTF_8));
        }
    }

    private static class CacheValueDeserializer
    implements Deserializer<CacheValue> {
        private CacheValueDeserializer() {
        }

        public CacheValue deserialize(byte[] input) throws DeserializationException, IOException {
            if (input.length == 0) {
                return null;
            }
            long time = ((long)input[0] << 56) + ((long)(input[1] & 0xFF) << 48) + ((long)(input[2] & 0xFF) << 40) + ((long)(input[3] & 0xFF) << 32) + ((long)(input[4] & 0xFF) << 24) + (long)((input[5] & 0xFF) << 16) + (long)((input[6] & 0xFF) << 8) + (long)(input[7] & 0xFF);
            String description = new String(input, 8, input.length - 8, StandardCharsets.UTF_8);
            CacheValue value = new CacheValue(description, time);
            return value;
        }
    }

    private static class CacheValueSerializer
    implements Serializer<CacheValue> {
        private CacheValueSerializer() {
        }

        public void serialize(CacheValue entry, OutputStream out) throws SerializationException, IOException {
            long time = entry.getEntryTimeMS();
            byte[] writeBuffer = new byte[]{(byte)(time >>> 56), (byte)(time >>> 48), (byte)(time >>> 40), (byte)(time >>> 32), (byte)(time >>> 24), (byte)(time >>> 16), (byte)(time >>> 8), (byte)time};
            out.write(writeBuffer, 0, 8);
            out.write(entry.getDescription().getBytes(StandardCharsets.UTF_8));
        }
    }

    private static class CacheValue {
        private final String description;
        private final long entryTimeMS;

        public CacheValue(String description, long entryTimeMS) {
            this.description = description;
            this.entryTimeMS = entryTimeMS;
        }

        public String getDescription() {
            return this.description;
        }

        public long getEntryTimeMS() {
            return this.entryTimeMS;
        }
    }
}

