/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.jms.processors;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.nifi.jms.processors.JMSWorker;
import org.apache.nifi.jms.processors.MessageBodyToBytesConverter;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.SessionCallback;
import org.springframework.jms.support.JmsUtils;

class JMSConsumer
extends JMSWorker {
    private static final int MAX_MESSAGES_PER_FLOW_FILE = 10000;

    JMSConsumer(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ComponentLog logger) {
        super(connectionFactory, jmsTemplate, logger);
        logger.debug("Created Message Consumer for '{}'", new Object[]{jmsTemplate});
    }

    private MessageConsumer createMessageConsumer(Session session, String destinationName, boolean durable, boolean shared, String subscriptionName, String messageSelector) throws JMSException {
        boolean isPubSub = this.jmsTemplate.isPubSubDomain();
        Destination destination = this.jmsTemplate.getDestinationResolver().resolveDestinationName(session, destinationName, isPubSub);
        if (isPubSub) {
            if (shared) {
                try {
                    if (durable) {
                        return session.createSharedDurableConsumer((Topic)destination, subscriptionName, messageSelector);
                    }
                    return session.createSharedConsumer((Topic)destination, subscriptionName, messageSelector);
                }
                catch (AbstractMethodError e) {
                    throw new ProcessException("Failed to create a shared consumer. Make sure the target broker is JMS 2.0 compliant.", (Throwable)e);
                }
            }
            if (durable) {
                return session.createDurableConsumer((Topic)destination, subscriptionName, messageSelector, this.jmsTemplate.isPubSubDomain());
            }
            return session.createConsumer(destination, messageSelector, this.jmsTemplate.isPubSubDomain());
        }
        return session.createConsumer(destination, messageSelector, this.jmsTemplate.isPubSubDomain());
    }

    public void consumeSingleMessage(String destinationName, String errorQueueName, boolean durable, boolean shared, String subscriptionName, String messageSelector, String charset, Consumer<JMSResponse> singleMessageConsumer) {
        this.doWithJmsTemplate(destinationName, durable, shared, subscriptionName, messageSelector, (session, messageConsumer) -> {
            JMSResponse response = this.receiveMessage(session, messageConsumer, charset, errorQueueName);
            if (response != null) {
                singleMessageConsumer.accept(response);
            }
        });
    }

    public void consumeMessageSet(String destinationName, final String errorQueueName, boolean durable, boolean shared, String subscriptionName, String messageSelector, final String charset, final Consumer<List<JMSResponse>> messageSetConsumer) {
        this.doWithJmsTemplate(destinationName, durable, shared, subscriptionName, messageSelector, new MessageReceiver(){

            @Override
            public void consume(Session session, MessageConsumer messageConsumer) throws JMSException {
                JMSResponse response;
                ArrayList<JMSResponse> jmsResponses = new ArrayList<JMSResponse>();
                for (int batchCounter = 0; (response = JMSConsumer.this.receiveMessage(session, messageConsumer, charset, errorQueueName)) != null && batchCounter < 10000; ++batchCounter) {
                    response.setBatchOrder(batchCounter);
                    jmsResponses.add(response);
                }
                if (!jmsResponses.isEmpty()) {
                    messageSetConsumer.accept(jmsResponses);
                }
            }
        });
    }

    private void doWithJmsTemplate(final String destinationName, final boolean durable, final boolean shared, final String subscriptionName, final String messageSelector, final MessageReceiver messageReceiver) {
        this.jmsTemplate.execute((SessionCallback)new SessionCallback<Void>(){

            public Void doInJms(Session session) throws JMSException {
                MessageConsumer messageConsumer = JMSConsumer.this.createMessageConsumer(session, destinationName, durable, shared, subscriptionName, messageSelector);
                try {
                    messageReceiver.consume(session, messageConsumer);
                }
                catch (Exception e) {
                    try {
                        session.recover();
                    }
                    catch (Exception e1) {
                        JMSConsumer.this.processLog.debug("Failed to recover JMS session while handling initial error. The recover error is: ", (Throwable)e1);
                    }
                    JmsUtils.closeMessageConsumer((MessageConsumer)messageConsumer);
                    throw e;
                }
                return null;
            }
        }, true);
    }

    private JMSResponse receiveMessage(Session session, MessageConsumer msgConsumer, String charset, String errorQueueName) throws JMSException {
        byte[] messageBody;
        String messageType;
        Message message;
        block11: {
            message = msgConsumer.receive(this.jmsTemplate.getReceiveTimeout());
            if (message == null) {
                JmsUtils.closeMessageConsumer((MessageConsumer)msgConsumer);
                return null;
            }
            try {
                if (message instanceof TextMessage) {
                    messageType = TextMessage.class.getSimpleName();
                    messageBody = MessageBodyToBytesConverter.toBytes((TextMessage)message, Charset.forName(charset));
                    break block11;
                }
                if (message instanceof BytesMessage) {
                    messageType = BytesMessage.class.getSimpleName();
                    messageBody = MessageBodyToBytesConverter.toBytes((BytesMessage)message);
                    break block11;
                }
                if (message instanceof ObjectMessage) {
                    messageType = ObjectMessage.class.getSimpleName();
                    messageBody = MessageBodyToBytesConverter.toBytes((ObjectMessage)message);
                    break block11;
                }
                if (message instanceof StreamMessage) {
                    messageType = StreamMessage.class.getSimpleName();
                    messageBody = MessageBodyToBytesConverter.toBytes((StreamMessage)message);
                    break block11;
                }
                if (message instanceof MapMessage) {
                    messageType = MapMessage.class.getSimpleName();
                    messageBody = MessageBodyToBytesConverter.toBytes((MapMessage)message);
                    break block11;
                }
                this.acknowledge(message, session);
                if (errorQueueName != null) {
                    this.processLog.error("Received unsupported JMS Message type [{}]; rerouting message to error queue [{}].", new Object[]{message, errorQueueName});
                    this.jmsTemplate.send(errorQueueName, __ -> message);
                } else {
                    this.processLog.error("Received unsupported JMS Message type [{}]; will skip this message.", new Object[]{message});
                }
                return null;
            }
            catch (MessageBodyToBytesConverter.MessageConversionException mce) {
                this.processLog.error("Received a JMS Message [{}] but failed to obtain the content of the message; will acknowledge this message without creating a FlowFile for it.", new Object[]{message, mce});
                this.acknowledge(message, session);
                if (errorQueueName != null) {
                    this.jmsTemplate.send(errorQueueName, __ -> message);
                }
                return null;
            }
        }
        Map<String, String> messageHeaders = this.extractMessageHeaders(message);
        Map<String, String> messageProperties = this.extractMessageProperties(message);
        return new JMSResponse(message, session.getAcknowledgeMode(), messageType, messageBody, messageHeaders, messageProperties, msgConsumer);
    }

    private void acknowledge(Message message, Session session) throws JMSException {
        if (message != null && session.getAcknowledgeMode() == 2) {
            message.acknowledge();
        }
    }

    private Map<String, String> extractMessageProperties(Message message) {
        HashMap<String, String> properties = new HashMap<String, String>();
        try {
            Enumeration propertyNames = message.getPropertyNames();
            while (propertyNames.hasMoreElements()) {
                String propertyName = (String)propertyNames.nextElement();
                properties.put(propertyName, String.valueOf(message.getObjectProperty(propertyName)));
            }
        }
        catch (JMSException e) {
            this.processLog.warn("Failed to extract message properties", (Throwable)e);
        }
        return properties;
    }

    private Map<String, String> extractMessageHeaders(Message message) throws JMSException {
        String destinationName;
        HashMap<String, String> messageHeaders = new HashMap<String, String>();
        messageHeaders.put("jms_deliveryMode", String.valueOf(message.getJMSDeliveryMode()));
        messageHeaders.put("jms_expiration", String.valueOf(message.getJMSExpiration()));
        messageHeaders.put("jms_priority", String.valueOf(message.getJMSPriority()));
        messageHeaders.put("jms_redelivered", String.valueOf(message.getJMSRedelivered()));
        messageHeaders.put("jms_timestamp", String.valueOf(message.getJMSTimestamp()));
        messageHeaders.put("jms_correlationId", message.getJMSCorrelationID());
        messageHeaders.put("jms_messageId", message.getJMSMessageID());
        messageHeaders.put("jms_type", message.getJMSType());
        String replyToDestinationName = this.retrieveDestinationName(message.getJMSReplyTo(), "jms_replyTo");
        if (replyToDestinationName != null) {
            messageHeaders.put("jms_replyTo", replyToDestinationName);
        }
        if ((destinationName = this.retrieveDestinationName(message.getJMSDestination(), "jms_destination")) != null) {
            messageHeaders.put("jms_destination", destinationName);
        }
        return messageHeaders;
    }

    private String retrieveDestinationName(Destination destination, String headerName) {
        String destinationName = null;
        if (destination != null) {
            try {
                destinationName = destination instanceof Queue ? ((Queue)destination).getQueueName() : ((Topic)destination).getTopicName();
            }
            catch (JMSException e) {
                this.processLog.warn("Failed to retrieve Destination name for '" + headerName + "' header", (Throwable)e);
            }
        }
        return destinationName;
    }

    static interface MessageReceiver {
        public void consume(Session var1, MessageConsumer var2) throws JMSException;
    }

    static class JMSResponse {
        private final Message message;
        private final int acknowledgementMode;
        private final byte[] messageBody;
        private final String messageType;
        private final Map<String, String> messageHeaders;
        private final Map<String, String> messageProperties;
        private final MessageConsumer messageConsumer;
        private Integer batchOrder;

        JMSResponse(Message message, int acknowledgementMode, String messageType, byte[] messageBody, Map<String, String> messageHeaders, Map<String, String> messageProperties, MessageConsumer msgConsumer) {
            this.message = message;
            this.acknowledgementMode = acknowledgementMode;
            this.messageType = messageType;
            this.messageBody = messageBody;
            this.messageHeaders = Collections.unmodifiableMap(messageHeaders);
            this.messageProperties = Collections.unmodifiableMap(messageProperties);
            this.messageConsumer = msgConsumer;
        }

        public String getMessageType() {
            return this.messageType;
        }

        public byte[] getMessageBody() {
            return this.messageBody;
        }

        public Map<String, String> getMessageHeaders() {
            return this.messageHeaders;
        }

        public Map<String, String> getMessageProperties() {
            return this.messageProperties;
        }

        public void acknowledge() throws JMSException {
            try {
                if (this.acknowledgementMode == 2) {
                    this.message.acknowledge();
                }
            }
            finally {
                JmsUtils.closeMessageConsumer((MessageConsumer)this.messageConsumer);
            }
        }

        public void reject() {
            JmsUtils.closeMessageConsumer((MessageConsumer)this.messageConsumer);
        }

        public Integer getBatchOrder() {
            return this.batchOrder;
        }

        public void setBatchOrder(Integer batchOrder) {
            this.batchOrder = batchOrder;
        }
    }
}

