package org.apache.nifi.jms.processors;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.nifi.jms.processors.MessageBodyToBytesConverter;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.SessionCallback;
import org.springframework.jms.support.JmsUtils;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/nifi/jms/processors/JMSConsumer.class */
public class JMSConsumer extends JMSWorker {
    private static final int MAX_MESSAGES_PER_FLOW_FILE = 10000;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/nifi/jms/processors/JMSConsumer$JMSResponse.class */
    public static class JMSResponse {
        private final Message message;
        private final int acknowledgementMode;
        private final byte[] messageBody;
        private final String messageType;
        private final Map<String, String> messageHeaders;
        private final Map<String, String> messageProperties;
        private final MessageConsumer messageConsumer;
        private Integer batchOrder;

        JMSResponse(Message message, int i, String str, byte[] bArr, Map<String, String> map, Map<String, String> map2, MessageConsumer messageConsumer) {
            this.message = message;
            this.acknowledgementMode = i;
            this.messageType = str;
            this.messageBody = bArr;
            this.messageHeaders = Collections.unmodifiableMap(map);
            this.messageProperties = Collections.unmodifiableMap(map2);
            this.messageConsumer = messageConsumer;
        }

        public String getMessageType() {
            return this.messageType;
        }

        public byte[] getMessageBody() {
            return this.messageBody;
        }

        public Map<String, String> getMessageHeaders() {
            return this.messageHeaders;
        }

        public Map<String, String> getMessageProperties() {
            return this.messageProperties;
        }

        public void acknowledge() throws JMSException {
            try {
                if (this.acknowledgementMode == 2) {
                    this.message.acknowledge();
                }
            } finally {
                JmsUtils.closeMessageConsumer(this.messageConsumer);
            }
        }

        public void reject() {
            JmsUtils.closeMessageConsumer(this.messageConsumer);
        }

        public Integer getBatchOrder() {
            return this.batchOrder;
        }

        public void setBatchOrder(Integer num) {
            this.batchOrder = num;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/nifi/jms/processors/JMSConsumer$MessageReceiver.class */
    public interface MessageReceiver {
        void consume(Session session, MessageConsumer messageConsumer) throws JMSException;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public JMSConsumer(CachingConnectionFactory cachingConnectionFactory, JmsTemplate jmsTemplate, ComponentLog componentLog) {
        super(cachingConnectionFactory, jmsTemplate, componentLog);
        componentLog.debug("Created Message Consumer for '{}'", new Object[]{jmsTemplate});
    }

    private MessageConsumer createMessageConsumer(Session session, String str, boolean z, boolean z2, String str2, String str3) throws JMSException {
        boolean isPubSubDomain = this.jmsTemplate.isPubSubDomain();
        Topic resolveDestinationName = this.jmsTemplate.getDestinationResolver().resolveDestinationName(session, str, isPubSubDomain);
        if (!isPubSubDomain) {
            return session.createConsumer(resolveDestinationName, str3, this.jmsTemplate.isPubSubDomain());
        }
        if (!z2) {
            return z ? session.createDurableConsumer(resolveDestinationName, str2, str3, this.jmsTemplate.isPubSubDomain()) : session.createConsumer(resolveDestinationName, str3, this.jmsTemplate.isPubSubDomain());
        }
        try {
            return z ? session.createSharedDurableConsumer(resolveDestinationName, str2, str3) : session.createSharedConsumer(resolveDestinationName, str2, str3);
        } catch (AbstractMethodError e) {
            throw new ProcessException("Failed to create a shared consumer. Make sure the target broker is JMS 2.0 compliant.", e);
        }
    }

    public void consumeSingleMessage(String str, String str2, boolean z, boolean z2, String str3, String str4, String str5, Consumer<JMSResponse> consumer) {
        doWithJmsTemplate(str, z, z2, str3, str4, (session, messageConsumer) -> {
            JMSResponse receiveMessage = receiveMessage(session, messageConsumer, str5, str2);
            if (receiveMessage != null) {
                consumer.accept(receiveMessage);
            }
        });
    }

    public void consumeMessageSet(String str, final String str2, boolean z, boolean z2, String str3, String str4, final String str5, final Consumer<List<JMSResponse>> consumer) {
        doWithJmsTemplate(str, z, z2, str3, str4, new MessageReceiver() { // from class: org.apache.nifi.jms.processors.JMSConsumer.1
            @Override // org.apache.nifi.jms.processors.JMSConsumer.MessageReceiver
            public void consume(Session session, MessageConsumer messageConsumer) throws JMSException {
                ArrayList arrayList = new ArrayList();
                int i = 0;
                while (true) {
                    JMSResponse receiveMessage = JMSConsumer.this.receiveMessage(session, messageConsumer, str5, str2);
                    if (receiveMessage == null || i >= JMSConsumer.MAX_MESSAGES_PER_FLOW_FILE) {
                        break;
                    }
                    receiveMessage.setBatchOrder(Integer.valueOf(i));
                    arrayList.add(receiveMessage);
                    i++;
                }
                if (arrayList.isEmpty()) {
                    return;
                }
                consumer.accept(arrayList);
            }
        });
    }

    private void doWithJmsTemplate(final String str, final boolean z, final boolean z2, final String str2, final String str3, final MessageReceiver messageReceiver) {
        this.jmsTemplate.execute(new SessionCallback<Void>() { // from class: org.apache.nifi.jms.processors.JMSConsumer.2
            /* renamed from: doInJms, reason: merged with bridge method [inline-methods] */
            public Void m6doInJms(Session session) throws JMSException {
                MessageConsumer createMessageConsumer = JMSConsumer.this.createMessageConsumer(session, str, z, z2, str2, str3);
                try {
                    messageReceiver.consume(session, createMessageConsumer);
                    return null;
                } catch (Exception e) {
                    try {
                        session.recover();
                    } catch (Exception e2) {
                        JMSConsumer.this.processLog.debug("Failed to recover JMS session while handling initial error. The recover error is: ", e2);
                    }
                    JmsUtils.closeMessageConsumer(createMessageConsumer);
                    throw e;
                }
            }
        }, true);
    }

    private JMSResponse receiveMessage(Session session, MessageConsumer messageConsumer, String str, String str2) throws JMSException {
        String simpleName;
        byte[] bytes;
        TextMessage receive = messageConsumer.receive(this.jmsTemplate.getReceiveTimeout());
        if (receive == null) {
            JmsUtils.closeMessageConsumer(messageConsumer);
            return null;
        }
        try {
            if (receive instanceof TextMessage) {
                simpleName = TextMessage.class.getSimpleName();
                bytes = MessageBodyToBytesConverter.toBytes(receive, Charset.forName(str));
            } else if (receive instanceof BytesMessage) {
                simpleName = BytesMessage.class.getSimpleName();
                bytes = MessageBodyToBytesConverter.toBytes((BytesMessage) receive);
            } else if (receive instanceof ObjectMessage) {
                simpleName = ObjectMessage.class.getSimpleName();
                bytes = MessageBodyToBytesConverter.toBytes((ObjectMessage) receive);
            } else if (receive instanceof StreamMessage) {
                simpleName = StreamMessage.class.getSimpleName();
                bytes = MessageBodyToBytesConverter.toBytes((StreamMessage) receive);
            } else {
                if (!(receive instanceof MapMessage)) {
                    acknowledge(receive, session);
                    if (str2 == null) {
                        this.processLog.error("Received unsupported JMS Message type [{}]; will skip this message.", new Object[]{receive});
                        return null;
                    }
                    this.processLog.error("Received unsupported JMS Message type [{}]; rerouting message to error queue [{}].", new Object[]{receive, str2});
                    this.jmsTemplate.send(str2, session2 -> {
                        return receive;
                    });
                    return null;
                }
                simpleName = MapMessage.class.getSimpleName();
                bytes = MessageBodyToBytesConverter.toBytes((MapMessage) receive);
            }
            return new JMSResponse(receive, session.getAcknowledgeMode(), simpleName, bytes, extractMessageHeaders(receive), extractMessageProperties(receive), messageConsumer);
        } catch (MessageBodyToBytesConverter.MessageConversionException e) {
            this.processLog.error("Received a JMS Message [{}] but failed to obtain the content of the message; will acknowledge this message without creating a FlowFile for it.", new Object[]{receive, e});
            acknowledge(receive, session);
            if (str2 == null) {
                return null;
            }
            this.jmsTemplate.send(str2, session3 -> {
                return receive;
            });
            return null;
        }
    }

    private void acknowledge(Message message, Session session) throws JMSException {
        if (message == null || session.getAcknowledgeMode() != 2) {
            return;
        }
        message.acknowledge();
    }

    private Map<String, String> extractMessageProperties(Message message) {
        HashMap hashMap = new HashMap();
        try {
            Enumeration propertyNames = message.getPropertyNames();
            while (propertyNames.hasMoreElements()) {
                String str = (String) propertyNames.nextElement();
                hashMap.put(str, String.valueOf(message.getObjectProperty(str)));
            }
        } catch (JMSException e) {
            this.processLog.warn("Failed to extract message properties", e);
        }
        return hashMap;
    }

    private Map<String, String> extractMessageHeaders(Message message) throws JMSException {
        HashMap hashMap = new HashMap();
        hashMap.put("jms_deliveryMode", String.valueOf(message.getJMSDeliveryMode()));
        hashMap.put("jms_expiration", String.valueOf(message.getJMSExpiration()));
        hashMap.put("jms_priority", String.valueOf(message.getJMSPriority()));
        hashMap.put("jms_redelivered", String.valueOf(message.getJMSRedelivered()));
        hashMap.put("jms_timestamp", String.valueOf(message.getJMSTimestamp()));
        hashMap.put("jms_correlationId", message.getJMSCorrelationID());
        hashMap.put("jms_messageId", message.getJMSMessageID());
        hashMap.put("jms_type", message.getJMSType());
        String retrieveDestinationName = retrieveDestinationName(message.getJMSReplyTo(), "jms_replyTo");
        if (retrieveDestinationName != null) {
            hashMap.put("jms_replyTo", retrieveDestinationName);
        }
        String retrieveDestinationName2 = retrieveDestinationName(message.getJMSDestination(), "jms_destination");
        if (retrieveDestinationName2 != null) {
            hashMap.put("jms_destination", retrieveDestinationName2);
        }
        return hashMap;
    }

    private String retrieveDestinationName(Destination destination, String str) {
        String str2 = null;
        if (destination != null) {
            try {
                str2 = destination instanceof Queue ? ((Queue) destination).getQueueName() : ((Topic) destination).getTopicName();
            } catch (JMSException e) {
                this.processLog.warn("Failed to retrieve Destination name for '" + str + "' header", e);
            }
        }
        return str2;
    }
}
