package org.apache.nifi.jms.processors;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
import org.apache.nifi.jms.processors.JMSConsumer;
import org.apache.nifi.jms.processors.ioconcept.writer.FlowFileWriterCallback;
import org.apache.nifi.jms.processors.ioconcept.writer.record.OutputStrategy;
import org.apache.nifi.jms.processors.ioconcept.writer.record.RecordWriter;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.connection.SingleConnectionFactory;
import org.springframework.jms.core.JmsTemplate;

@CapabilityDescription("Consumes JMS Message of type BytesMessage, TextMessage, ObjectMessage, MapMessage or StreamMessage transforming its content to a FlowFile and transitioning it to 'success' relationship. JMS attributes such as headers and properties will be copied as FlowFile attributes. MapMessages will be transformed into JSONs and then into byte arrays. The other types will have their raw contents as byte array transferred into the flowfile.")
@DynamicProperty(name = "The name of a Connection Factory configuration property.", value = "The value of a given Connection Factory configuration property.", description = "Additional configuration property for the Connection Factory. It can be used when the Connection Factory is being configured via the 'JNDI *' or the 'JMS *'properties of the processor. For more information, see the Additional Details page.", expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT)
@Restricted(restrictions = {@Restriction(requiredPermission = RequiredPermission.REFERENCE_REMOTE_RESOURCES, explanation = "Client Library Location can reference resources over HTTP")})
@WritesAttributes({@WritesAttribute(attribute = "jms_deliveryMode", description = "The JMSDeliveryMode from the message header."), @WritesAttribute(attribute = "jms_expiration", description = "The JMSExpiration from the message header."), @WritesAttribute(attribute = "jms_priority", description = "The JMSPriority from the message header."), @WritesAttribute(attribute = "jms_redelivered", description = "The JMSRedelivered from the message header."), @WritesAttribute(attribute = "jms_timestamp", description = "The JMSTimestamp from the message header."), @WritesAttribute(attribute = "jms_correlationId", description = "The JMSCorrelationID from the message header."), @WritesAttribute(attribute = "jms_messageId", description = "The JMSMessageID from the message header."), @WritesAttribute(attribute = "jms_type", description = "The JMSType from the message header."), @WritesAttribute(attribute = "jms_replyTo", description = "The JMSReplyTo from the message header."), @WritesAttribute(attribute = "jms_destination", description = "The JMSDestination from the message header."), @WritesAttribute(attribute = ConsumeJMS.JMS_MESSAGETYPE, description = "The JMS message type, can be TextMessage, BytesMessage, ObjectMessage, MapMessage or StreamMessage)."), @WritesAttribute(attribute = "other attributes", description = "Each message property is written to an attribute.")})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"jms", "get", "message", "receive", "consume"})
@SeeAlso({PublishJMS.class, JMSConnectionFactoryProvider.class})
/* loaded from: input_file:org/apache/nifi/jms/processors/ConsumeJMS.class */
public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
    public static final String JMS_MESSAGETYPE = "jms.messagetype";
    private static final String COUNTER_PARSE_FAILURES = "Parse Failures";
    private static final String COUNTER_RECORDS_RECEIVED = "Records Received";
    private static final String COUNTER_RECORDS_PROCESSED = "Records Processed";
    public static final String JMS_SOURCE_DESTINATION_NAME = "jms.source.destination";
    private static final Set<Relationship> relationships;
    private static final List<PropertyDescriptor> propertyDescriptors;
    static final AllowableValue AUTO_ACK = new AllowableValue(String.valueOf(1), "AUTO_ACKNOWLEDGE (1)", "Automatically acknowledges a client's receipt of a message, regardless if NiFi session has been commited. Can result in data loss in the event where NiFi abruptly stopped before session was commited.");
    static final AllowableValue CLIENT_ACK = new AllowableValue(String.valueOf(2), "CLIENT_ACKNOWLEDGE (2)", "(DEFAULT) Manually acknowledges a client's receipt of a message after NiFi Session was commited, thus ensuring no data loss");
    static final AllowableValue DUPS_OK = new AllowableValue(String.valueOf(3), "DUPS_OK_ACKNOWLEDGE (3)", "This acknowledgment mode instructs the session to lazily acknowledge the delivery of messages. May result in both data duplication and data loss while achieving the best throughput.");
    static final PropertyDescriptor MESSAGE_SELECTOR = new PropertyDescriptor.Builder().name("Message Selector").displayName("Message Selector").description("The JMS Message Selector to filter the messages that the processor will receive").required(false).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor ACKNOWLEDGEMENT_MODE = new PropertyDescriptor.Builder().name("Acknowledgement Mode").description("The JMS Acknowledgement Mode. Using Auto Acknowledge can cause messages to be lost on restart of NiFi but may provide better performance than Client Acknowledge.").required(true).allowableValues(new DescribedValue[]{AUTO_ACK, CLIENT_ACK, DUPS_OK}).defaultValue(CLIENT_ACK.getValue()).build();
    static final PropertyDescriptor DURABLE_SUBSCRIBER = new PropertyDescriptor.Builder().name("Durable subscription").displayName("Durable Subscription").description("If destination is Topic if present then make it the consumer durable. @see https://jakarta.ee/specifications/platform/9/apidocs/jakarta/jms/session#createDurableConsumer-jakarta.jms.Topic-java.lang.String-").required(false).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).defaultValue("false").allowableValues(new String[]{"true", "false"}).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor SHARED_SUBSCRIBER = new PropertyDescriptor.Builder().name("Shared subscription").displayName("Shared Subscription").description("If destination is Topic if present then make it the consumer shared. @see https://jakarta.ee/specifications/platform/9/apidocs/jakarta/jms/session#createSharedConsumer-jakarta.jms.Topic-java.lang.String-").required(false).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).defaultValue("false").allowableValues(new String[]{"true", "false"}).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor SUBSCRIPTION_NAME = new PropertyDescriptor.Builder().name("Subscription Name").description("The name of the subscription to use if destination is Topic and is shared or durable.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).build();
    static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder().name("Timeout").description("How long to wait to consume a message from the remote broker before giving up.").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).defaultValue("1 sec").expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).build();
    static final PropertyDescriptor ERROR_QUEUE = new PropertyDescriptor.Builder().name("Error Queue Name").description("The name of a JMS Queue where - if set - unprocessed messages will be routed. Usually provided by the administrator (e.g., 'queue://myErrorQueue' or 'myErrorQueue').Only applicable if 'Destination Type' is set to 'QUEUE'").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().fromPropertyDescriptor(BASE_RECORD_READER).description("The Record Reader to use for parsing received JMS Messages into Records.").build();
    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().fromPropertyDescriptor(BASE_RECORD_WRITER).description("The Record Writer to use for serializing Records before writing them to a FlowFile.").build();
    static final PropertyDescriptor OUTPUT_STRATEGY = new PropertyDescriptor.Builder().name("output-strategy").displayName("Output Strategy").description("The format used to output the JMS message into a FlowFile record.").dependsOn(RECORD_READER, new AllowableValue[0]).required(true).defaultValue(OutputStrategy.USE_VALUE.getValue()).allowableValues(OutputStrategy.class).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are received from the JMS Destination are routed to this relationship").build();
    public static final Relationship REL_PARSE_FAILURE = new Relationship.Builder().name("parse.failure").description("If a message cannot be parsed using the configured Record Reader, the contents of the message will be routed to this Relationship as its own individual FlowFile.").autoTerminateDefault(true).build();

    private static boolean isDurableSubscriber(ProcessContext processContext) {
        Boolean asBoolean = processContext.getProperty(DURABLE_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
        if (asBoolean == null) {
            return false;
        }
        return asBoolean.booleanValue();
    }

    private static boolean isShared(ProcessContext processContext) {
        Boolean asBoolean = processContext.getProperty(SHARED_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
        if (asBoolean == null) {
            return false;
        }
        return asBoolean.booleanValue();
    }

    @OnScheduled
    public void onSchedule(ProcessContext processContext) {
        if (processContext.getMaxConcurrentTasks() > 1 && isDurableSubscriber(processContext) && !isShared(processContext)) {
            throw new ProcessException("Durable non shared subscriptions cannot work on multiple threads. Check javax/jms/Session#createDurableConsumer API doc.");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.nifi.jms.processors.AbstractJMSProcessor
    public Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList arrayList = new ArrayList(super.customValidate(validationContext));
        String value = validationContext.getProperty(DESTINATION_TYPE).getValue();
        if (validationContext.getProperty(ERROR_QUEUE).getValue() != null && !"QUEUE".equals(value)) {
            arrayList.add(new ValidationResult.Builder().valid(false).subject(ERROR_QUEUE.getDisplayName()).explanation("'" + ERROR_QUEUE.getDisplayName() + "' is applicable only when '" + DESTINATION_TYPE.getDisplayName() + "'='QUEUE'").build());
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.nifi.jms.processors.AbstractJMSProcessor
    public void rendezvousWithJms(ProcessContext processContext, ProcessSession processSession, JMSConsumer jMSConsumer) throws ProcessException {
        String value = processContext.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
        String value2 = processContext.getProperty(ERROR_QUEUE).evaluateAttributeExpressions().getValue();
        boolean isDurableSubscriber = isDurableSubscriber(processContext);
        boolean isShared = isShared(processContext);
        String value3 = processContext.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue();
        String value4 = processContext.getProperty(MESSAGE_SELECTOR).evaluateAttributeExpressions().getValue();
        String value5 = processContext.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
        try {
            if (processContext.getProperty(RECORD_READER).isSet()) {
                processMessageSet(processContext, processSession, jMSConsumer, value, value2, isDurableSubscriber, isShared, value3, value4, value5);
            } else {
                processSingleMessage(processSession, jMSConsumer, value, value2, isDurableSubscriber, isShared, value3, value4, value5);
            }
        } catch (Exception e) {
            getLogger().error("Error while trying to process JMS message", e);
            jMSConsumer.setValid(false);
            processContext.yield();
            throw e;
        }
    }

    private void processSingleMessage(ProcessSession processSession, JMSConsumer jMSConsumer, String str, String str2, boolean z, boolean z2, String str3, String str4, String str5) {
        jMSConsumer.consumeSingleMessage(str, str2, z, z2, str3, str4, str5, jMSResponse -> {
            if (jMSResponse == null) {
                return;
            }
            try {
                FlowFile createFlowFileFromMessage = createFlowFileFromMessage(processSession, str, jMSResponse);
                processSession.getProvenanceReporter().receive(createFlowFileFromMessage, str);
                processSession.transfer(createFlowFileFromMessage, REL_SUCCESS);
                processSession.commitAsync(() -> {
                    withLog(() -> {
                        acknowledge(jMSResponse);
                    });
                }, th -> {
                    withLog(() -> {
                        jMSResponse.reject();
                    });
                });
            } catch (Throwable th2) {
                jMSResponse.reject();
                throw th2;
            }
        });
    }

    private FlowFile createFlowFileFromMessage(ProcessSession processSession, String str, JMSConsumer.JMSResponse jMSResponse) {
        FlowFile write = processSession.write(processSession.create(), outputStream -> {
            outputStream.write(jMSResponse.getMessageBody());
        });
        Map<String, String> mergeJmsAttributes = mergeJmsAttributes(jMSResponse.getMessageHeaders(), jMSResponse.getMessageProperties());
        mergeJmsAttributes.put(JMS_SOURCE_DESTINATION_NAME, str);
        mergeJmsAttributes.put(JMS_MESSAGETYPE, jMSResponse.getMessageType());
        return processSession.putAllAttributes(write, mergeJmsAttributes);
    }

    private void processMessageSet(ProcessContext processContext, ProcessSession processSession, JMSConsumer jMSConsumer, String str, String str2, boolean z, boolean z2, String str3, String str4, String str5) {
        RecordWriter recordWriter = new RecordWriter(processContext.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class), processContext.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class), jMSResponse -> {
            return jMSResponse.getMessageBody() == null ? new byte[0] : jMSResponse.getMessageBody();
        }, jMSResponse2 -> {
            return mergeJmsAttributes(jMSResponse2.getMessageHeaders(), jMSResponse2.getMessageProperties());
        }, OutputStrategy.valueOf(processContext.getProperty(OUTPUT_STRATEGY).getValue()), getLogger());
        jMSConsumer.consumeMessageSet(str, str2, z, z2, str3, str4, str5, list -> {
            recordWriter.write(processSession, list, new FlowFileWriterCallback<JMSConsumer.JMSResponse>() { // from class: org.apache.nifi.jms.processors.ConsumeJMS.1
                @Override // org.apache.nifi.jms.processors.ioconcept.writer.FlowFileWriterCallback
                public void onSuccess(FlowFile flowFile, List<JMSConsumer.JMSResponse> list, List<JMSConsumer.JMSResponse> list2) {
                    processSession.getProvenanceReporter().receive(flowFile, str);
                    processSession.adjustCounter(ConsumeJMS.COUNTER_RECORDS_RECEIVED, list.size() + list2.size(), false);
                    processSession.adjustCounter(ConsumeJMS.COUNTER_RECORDS_PROCESSED, list.size(), false);
                    processSession.transfer(flowFile, ConsumeJMS.REL_SUCCESS);
                    processSession.commitAsync(() -> {
                        ConsumeJMS.this.withLog(() -> {
                            ConsumeJMS.this.acknowledge(list, list2);
                        });
                    }, th -> {
                        ConsumeJMS.this.withLog(() -> {
                            ConsumeJMS.this.reject(list, list2);
                        });
                    });
                }

                @Override // org.apache.nifi.jms.processors.ioconcept.writer.FlowFileWriterCallback
                public void onParseFailure(FlowFile flowFile, JMSConsumer.JMSResponse jMSResponse3, Exception exc) {
                    processSession.adjustCounter(ConsumeJMS.COUNTER_PARSE_FAILURES, 1L, false);
                    processSession.transfer(ConsumeJMS.this.createFlowFileFromMessage(processSession, str, jMSResponse3), ConsumeJMS.REL_PARSE_FAILURE);
                }

                @Override // org.apache.nifi.jms.processors.ioconcept.writer.FlowFileWriterCallback
                public void onFailure(FlowFile flowFile, List<JMSConsumer.JMSResponse> list, List<JMSConsumer.JMSResponse> list2, Exception exc) {
                    ConsumeJMS.this.reject(list, list2);
                    throw new ProcessException(exc);
                }
            });
        });
    }

    private void acknowledge(JMSConsumer.JMSResponse jMSResponse) {
        try {
            jMSResponse.acknowledge();
        } catch (Exception e) {
            getLogger().error("Failed to acknowledge JMS Message that was received", e);
            throw new ProcessException(e);
        }
    }

    private void acknowledge(List<JMSConsumer.JMSResponse> list, List<JMSConsumer.JMSResponse> list2) {
        acknowledge(findLastBatchedJmsResponse(list, list2));
    }

    private void reject(List<JMSConsumer.JMSResponse> list, List<JMSConsumer.JMSResponse> list2) {
        findLastBatchedJmsResponse(list, list2).reject();
    }

    private void withLog(Runnable runnable) {
        try {
            runnable.run();
        } catch (Exception e) {
            getLogger().error("An error happened during commitAsync callback", e);
            throw e;
        }
    }

    private JMSConsumer.JMSResponse findLastBatchedJmsResponse(List<JMSConsumer.JMSResponse> list, List<JMSConsumer.JMSResponse> list2) {
        return (JMSConsumer.JMSResponse) Stream.of((Object[]) new List[]{list, list2}).flatMap((v0) -> {
            return v0.stream();
        }).max(Comparator.comparing((v0) -> {
            return v0.getBatchOrder();
        })).get();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.nifi.jms.processors.AbstractJMSProcessor
    public JMSConsumer finishBuildingJmsWorker(CachingConnectionFactory cachingConnectionFactory, JmsTemplate jmsTemplate, ProcessContext processContext) {
        jmsTemplate.setSessionAcknowledgeMode(processContext.getProperty(ACKNOWLEDGEMENT_MODE).asInteger().intValue());
        jmsTemplate.setReceiveTimeout(processContext.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).longValue());
        return new JMSConsumer(cachingConnectionFactory, jmsTemplate, getLogger());
    }

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.nifi.jms.processors.AbstractJMSProcessor
    public void setClientId(ProcessContext processContext, SingleConnectionFactory singleConnectionFactory) {
        if (!isDurableSubscriber(processContext) || isShared(processContext)) {
            super.setClientId(processContext, singleConnectionFactory);
        } else {
            singleConnectionFactory.setClientId(getClientId(processContext));
        }
    }

    private Map<String, String> mergeJmsAttributes(Map<String, String> map, Map<String, String> map2) {
        HashMap hashMap = new HashMap(map);
        map2.forEach((str, str2) -> {
            if (hashMap.containsKey(str)) {
                getLogger().warn("JMS Header and Property name collides as an attribute. JMS Property will override the JMS Header attribute. attributeName=[{}]", new Object[]{str});
            }
            hashMap.put(str, str2);
        });
        return hashMap;
    }

    static {
        ArrayList arrayList = new ArrayList();
        arrayList.add(CF_SERVICE);
        arrayList.add(DESTINATION);
        arrayList.add(DESTINATION_TYPE);
        arrayList.add(MESSAGE_SELECTOR);
        arrayList.add(USER);
        arrayList.add(PASSWORD);
        arrayList.add(CLIENT_ID);
        arrayList.add(new PropertyDescriptor.Builder().fromPropertyDescriptor(CHARSET).addValidator(StandardValidators.CHARACTER_SET_VALIDATOR_WITH_EVALUATION).build());
        arrayList.add(ACKNOWLEDGEMENT_MODE);
        arrayList.add(DURABLE_SUBSCRIBER);
        arrayList.add(SHARED_SUBSCRIBER);
        arrayList.add(SUBSCRIPTION_NAME);
        arrayList.add(TIMEOUT);
        arrayList.add(ERROR_QUEUE);
        arrayList.add(RECORD_READER);
        arrayList.add(RECORD_WRITER);
        arrayList.add(OUTPUT_STRATEGY);
        arrayList.addAll(JNDI_JMS_CF_PROPERTIES);
        arrayList.addAll(JMS_CF_PROPERTIES);
        propertyDescriptors = Collections.unmodifiableList(arrayList);
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_PARSE_FAILURE);
        relationships = Collections.unmodifiableSet(hashSet);
    }
}
