package org.apache.nifi.processors.standard;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.JmsProperties;

@CapabilityDescription("Gets the content of a FlowFile and puts it to a distributed map cache, using a cache key computed from FlowFile attributes. If the cache already contains the entry and the cache update strategy is 'keep original' the entry is not replaced.'")
@SupportsBatching
@WritesAttribute(attribute = PutDistributedMapCache.CACHED_ATTRIBUTE_NAME, description = "All FlowFiles will have an attribute 'cached'. The value of this attribute is true, is the FlowFile is cached, otherwise false.")
@EventDriven
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({JmsProperties.MSG_TYPE_MAP, "cache", "put", "distributed"})
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.processors.standard.FetchDistributedMapCache"})
/* loaded from: input_file:org/apache/nifi/processors/standard/PutDistributedMapCache.class */
public class PutDistributedMapCache extends AbstractProcessor {
    public static final String CACHED_ATTRIBUTE_NAME = "cached";
    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder().name("Distributed Cache Service").description("The Controller Service that is used to cache flow files").required(true).identifiesControllerService(DistributedMapCacheClient.class).build();
    public static final PropertyDescriptor CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder().name("Cache Entry Identifier").description("A FlowFile attribute, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the cache key").required(true).addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final AllowableValue CACHE_UPDATE_REPLACE = new AllowableValue(PutFile.REPLACE_RESOLUTION, "Replace if present", "Adds the specified entry to the cache, replacing any value that is currently set.");
    public static final AllowableValue CACHE_UPDATE_KEEP_ORIGINAL = new AllowableValue("keeporiginal", "Keep original", "Adds the specified entry to the cache, if the key does not exist.");
    public static final PropertyDescriptor CACHE_UPDATE_STRATEGY = new PropertyDescriptor.Builder().name("Cache update strategy").description("Determines how the cache is updated if the cache already contains the entry").required(true).allowableValues(new AllowableValue[]{CACHE_UPDATE_REPLACE, CACHE_UPDATE_KEEP_ORIGINAL}).defaultValue(CACHE_UPDATE_REPLACE.getValue()).build();
    public static final PropertyDescriptor CACHE_ENTRY_MAX_BYTES = new PropertyDescriptor.Builder().name("Max cache entry size").description("The maximum amount of data to put into cache").required(false).addValidator(StandardValidators.DATA_SIZE_VALIDATOR).defaultValue("1 MB").expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Any FlowFile that is successfully inserted into cache will be routed to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any FlowFile that cannot be inserted into the cache will be routed to this relationship").build();
    private final Set<Relationship> relationships;
    private final Serializer<String> keySerializer = new StringSerializer();
    private final Serializer<byte[]> valueSerializer = new CacheValueSerializer();
    private final Deserializer<byte[]> valueDeserializer = new CacheValueDeserializer();

    /* loaded from: input_file:org/apache/nifi/processors/standard/PutDistributedMapCache$CacheValueDeserializer.class */
    public static class CacheValueDeserializer implements Deserializer<byte[]> {
        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public byte[] m103deserialize(byte[] bArr) throws DeserializationException, IOException {
            if (bArr == null || bArr.length == 0) {
                return null;
            }
            return bArr;
        }
    }

    /* loaded from: input_file:org/apache/nifi/processors/standard/PutDistributedMapCache$CacheValueSerializer.class */
    public static class CacheValueSerializer implements Serializer<byte[]> {
        public void serialize(byte[] bArr, OutputStream outputStream) throws SerializationException, IOException {
            outputStream.write(bArr);
        }
    }

    /* loaded from: input_file:org/apache/nifi/processors/standard/PutDistributedMapCache$StringSerializer.class */
    public static class StringSerializer implements Serializer<String> {
        public void serialize(String str, OutputStream outputStream) throws SerializationException, IOException {
            outputStream.write(str.getBytes(StandardCharsets.UTF_8));
        }
    }

    public PutDistributedMapCache() {
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_FAILURE);
        this.relationships = Collections.unmodifiableSet(hashSet);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(CACHE_ENTRY_IDENTIFIER);
        arrayList.add(DISTRIBUTED_CACHE_SERVICE);
        arrayList.add(CACHE_UPDATE_STRATEGY);
        arrayList.add(CACHE_ENTRY_MAX_BYTES);
        return arrayList;
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        ComponentLog logger = getLogger();
        String value = processContext.getProperty(CACHE_ENTRY_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
        if (StringUtils.isBlank(value)) {
            logger.error("FlowFile {} has no attribute for given Cache Entry Identifier", new Object[]{flowFile});
            processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
            return;
        }
        DistributedMapCacheClient asControllerService = processContext.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
        try {
            long longValue = processContext.getProperty(CACHE_ENTRY_MAX_BYTES).asDataSize(DataUnit.B).longValue();
            long size = flowFile.getSize();
            if (size > longValue) {
                logger.warn("Flow file {} size {} exceeds the max cache entry size ({} B).", new Object[]{flowFile, Long.valueOf(size), Long.valueOf(longValue)});
                processSession.transfer(flowFile, REL_FAILURE);
                return;
            }
            if (size == 0) {
                logger.warn("Flow file {} is empty, there is nothing to cache.", new Object[]{flowFile});
                processSession.transfer(flowFile, REL_FAILURE);
                return;
            }
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            processSession.exportTo(flowFile, byteArrayOutputStream);
            byte[] byteArray = byteArrayOutputStream.toByteArray();
            String value2 = processContext.getProperty(CACHE_UPDATE_STRATEGY).getValue();
            boolean z = false;
            if (value2.equals(CACHE_UPDATE_REPLACE.getValue())) {
                asControllerService.put(value, byteArray, this.keySerializer, this.valueSerializer);
                z = true;
            } else if (value2.equals(CACHE_UPDATE_KEEP_ORIGINAL.getValue()) && ((byte[]) asControllerService.getAndPutIfAbsent(value, byteArray, this.keySerializer, this.valueSerializer, this.valueDeserializer)) == null) {
                z = true;
            }
            FlowFile putAttribute = processSession.putAttribute(flowFile, CACHED_ATTRIBUTE_NAME, String.valueOf(z));
            if (z) {
                processSession.transfer(putAttribute, REL_SUCCESS);
            } else {
                processSession.transfer(putAttribute, REL_FAILURE);
            }
        } catch (IOException e) {
            FlowFile penalize = processSession.penalize(flowFile);
            processSession.transfer(penalize, REL_FAILURE);
            logger.error("Unable to communicate with cache when processing {} due to {}", new Object[]{penalize, e});
        }
    }
}
