package org.wso2.mb.integration.common.clients;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.NamingException;
import org.apache.log4j.Logger;
import org.wso2.mb.integration.common.clients.configurations.AndesJMSConsumerClientConfiguration;
import org.wso2.mb.integration.common.clients.exceptions.AndesClientException;
import org.wso2.mb.integration.common.clients.operations.utils.AndesClientConstants;
import org.wso2.mb.integration.common.clients.operations.utils.AndesClientUtils;
import org.wso2.mb.integration.common.clients.operations.utils.ExchangeType;
import org.wso2.mb.integration.common.clients.operations.utils.JMSDeliveryStatus;

/* loaded from: input_file:org/wso2/mb/integration/common/clients/AndesJMSConsumer.class */
public class AndesJMSConsumer extends AndesJMSBase implements Runnable, MessageListener {
    private static Logger log = Logger.getLogger(AndesJMSConsumer.class);
    private final AndesJMSConsumerClientConfiguration consumerConfig;
    private long firstMessageConsumedTimestamp;
    private long lastMessageConsumedTimestamp;
    private AtomicLong receivedMessageCount;
    private long totalLatency;
    private Connection connection;
    private Session session;
    private MessageConsumer receiver;

    public AndesJMSConsumer(AndesJMSConsumerClientConfiguration andesJMSConsumerClientConfiguration, boolean z) throws NamingException, JMSException {
        super(andesJMSConsumerClientConfiguration);
        this.receivedMessageCount = new AtomicLong(0L);
        this.consumerConfig = andesJMSConsumerClientConfiguration;
        if (z) {
            if (ExchangeType.QUEUE == this.consumerConfig.getExchangeType()) {
                createQueueConnection();
            } else if (ExchangeType.TOPIC == this.consumerConfig.getExchangeType()) {
                createTopicConnection();
            }
        }
    }

    private void createTopicConnection() throws NamingException, JMSException {
        TopicConnection createTopicConnection = ((TopicConnectionFactory) super.getInitialContext().lookup(AndesClientConstants.CF_NAME)).createTopicConnection();
        createTopicConnection.setClientID(this.consumerConfig.getSubscriptionID());
        createTopicConnection.start();
        TopicSession createTopicSession = 0 == this.consumerConfig.getAcknowledgeMode().getType() ? createTopicConnection.createTopicSession(true, this.consumerConfig.getAcknowledgeMode().getType()) : createTopicConnection.createTopicSession(false, this.consumerConfig.getAcknowledgeMode().getType());
        Topic topic = (Topic) super.getInitialContext().lookup(this.consumerConfig.getDestinationName());
        this.connection = createTopicConnection;
        this.session = createTopicSession;
        if (this.consumerConfig.isDurable()) {
            if (null != this.consumerConfig.getSelectors()) {
                this.receiver = createTopicSession.createDurableSubscriber(topic, this.consumerConfig.getSubscriptionID(), this.consumerConfig.getSelectors(), false);
                return;
            } else {
                this.receiver = createTopicSession.createDurableSubscriber(topic, this.consumerConfig.getSubscriptionID());
                return;
            }
        }
        if (null != this.consumerConfig.getSelectors()) {
            this.receiver = createTopicSession.createSubscriber(topic, this.consumerConfig.getSelectors(), false);
        } else {
            this.receiver = createTopicSession.createSubscriber(topic);
        }
    }

    private void createQueueConnection() throws NamingException, JMSException {
        QueueConnection createQueueConnection = ((QueueConnectionFactory) super.getInitialContext().lookup(AndesClientConstants.CF_NAME)).createQueueConnection();
        createQueueConnection.start();
        QueueSession createQueueSession = 0 == this.consumerConfig.getAcknowledgeMode().getType() ? createQueueConnection.createQueueSession(true, this.consumerConfig.getAcknowledgeMode().getType()) : createQueueConnection.createQueueSession(false, this.consumerConfig.getAcknowledgeMode().getType());
        Queue queue = (Queue) super.getInitialContext().lookup(this.consumerConfig.getDestinationName());
        this.connection = createQueueConnection;
        this.session = createQueueSession;
        if (null != this.consumerConfig.getSelectors()) {
            this.receiver = createQueueSession.createReceiver(queue, this.consumerConfig.getSelectors());
        } else {
            this.receiver = createQueueSession.createReceiver(queue);
        }
    }

    @Override // org.wso2.mb.integration.common.clients.AndesJMSBase
    public void startClient() throws AndesClientException, JMSException {
        if (null == this.connection || null == this.session || null == this.receiver) {
            throw new AndesClientException("The connection, session and message receiver is not assigned.");
        }
        log.info("Starting Consumer");
        if (this.consumerConfig.isAsync()) {
            this.receiver.setMessageListener(this);
        } else {
            new Thread(this).start();
        }
    }

    @Override // org.wso2.mb.integration.common.clients.AndesJMSBase
    public void stopClient() {
        Thread thread = new Thread(new Runnable() { // from class: org.wso2.mb.integration.common.clients.AndesJMSConsumer.1
            @Override // java.lang.Runnable
            public void run() {
                if (null == AndesJMSConsumer.this.connection || null == AndesJMSConsumer.this.session || null == AndesJMSConsumer.this.receiver) {
                    return;
                }
                try {
                    AndesJMSConsumer.log.info("Closing Consumer");
                    if (ExchangeType.TOPIC == AndesJMSConsumer.this.consumerConfig.getExchangeType()) {
                        if (null != AndesJMSConsumer.this.receiver) {
                            AndesJMSConsumer.this.receiver.close();
                        }
                        if (null != AndesJMSConsumer.this.session) {
                            AndesJMSConsumer.this.session.close();
                        }
                        if (null != AndesJMSConsumer.this.connection) {
                            AndesJMSConsumer.this.connection.close();
                        }
                    } else if (ExchangeType.QUEUE == AndesJMSConsumer.this.consumerConfig.getExchangeType()) {
                        if (null != AndesJMSConsumer.this.receiver) {
                            AndesJMSConsumer.this.receiver.close();
                        }
                        if (null != AndesJMSConsumer.this.session) {
                            AndesJMSConsumer.this.session.close();
                        }
                        if (null != AndesJMSConsumer.this.connection) {
                            QueueConnection queueConnection = AndesJMSConsumer.this.connection;
                            queueConnection.stop();
                            queueConnection.close();
                        }
                    }
                    AndesJMSConsumer.this.receiver = null;
                    AndesJMSConsumer.this.session = null;
                    AndesJMSConsumer.this.connection = null;
                    AndesJMSConsumer.log.info("Consumer Closed");
                } catch (JMSException e) {
                    AndesJMSConsumer.log.error("Error in stopping client.", e);
                    throw new RuntimeException("Error in stopping client.", e);
                }
            }
        });
        thread.start();
        try {
            thread.join();
        } catch (InterruptedException e) {
            throw new RuntimeException("Error waiting for subscriber to stop", e);
        }
    }

    public void stopClientSync() {
        if (null == this.connection || null == this.session || null == this.receiver) {
            return;
        }
        try {
            log.info("Closing Consumer");
            if (ExchangeType.TOPIC == this.consumerConfig.getExchangeType()) {
                if (null != this.receiver) {
                    this.receiver.close();
                }
                if (null != this.session) {
                    this.session.close();
                }
                if (null != this.connection) {
                    this.connection.close();
                }
            } else if (ExchangeType.QUEUE == this.consumerConfig.getExchangeType()) {
                if (null != this.receiver) {
                    this.receiver.close();
                }
                if (null != this.session) {
                    this.session.close();
                }
                if (null != this.connection) {
                    QueueConnection queueConnection = this.connection;
                    queueConnection.stop();
                    queueConnection.close();
                }
            }
            this.receiver = null;
            this.session = null;
            this.connection = null;
            log.info("Consumer Closed");
        } catch (JMSException e) {
            log.error("Error in stopping client.", e);
            throw new RuntimeException("Error in stopping client.", e);
        }
    }

    public void unSubscribe(final boolean z) throws JMSException {
        Thread thread = new Thread(new Runnable() { // from class: org.wso2.mb.integration.common.clients.AndesJMSConsumer.2
            @Override // java.lang.Runnable
            public void run() {
                if (null == AndesJMSConsumer.this.connection || null == AndesJMSConsumer.this.session || null == AndesJMSConsumer.this.receiver) {
                    AndesClientException andesClientException = new AndesClientException("The connection, session and message receiver is not assigned.");
                    AndesJMSConsumer.log.error("The connection, session and message receiver is not assigned.", andesClientException);
                    throw new RuntimeException("The connection, session and message receiver is not assigned.", andesClientException);
                }
                try {
                    AndesJMSConsumer.log.info("Un-subscribing Subscriber");
                    AndesJMSConsumer.this.session.unsubscribe(AndesJMSConsumer.this.consumerConfig.getSubscriptionID());
                    AndesJMSConsumer.log.info("Subscriber Un-Subscribed");
                    if (z) {
                        AndesJMSConsumer.this.stopClient();
                    }
                } catch (JMSException e) {
                    AndesJMSConsumer.log.error("Error in removing subscription(un-subscribing).", e);
                    throw new RuntimeException("JMSException : Error in removing subscription(un-subscribing).", e);
                }
            }
        });
        thread.start();
        try {
            thread.join();
        } catch (InterruptedException e) {
            throw new RuntimeException("Error waiting for consumer to unsubscribe", e);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        boolean z = false;
        while (true) {
            try {
                Message receive = this.receiver.receive();
                if (null == receive) {
                    z = true;
                    break;
                } else if (processReceivedMessage(receive)) {
                    break;
                }
            } catch (JMSException e) {
                log.error("Error while receiving messages ", e);
                throw new RuntimeException("JMSException : Error while listening to messages", e);
            } catch (IOException e2) {
                log.error("Error while writing message to file", e2);
                throw new RuntimeException("IOException : Error while writing message to file\"", e2);
            }
        }
        if (!z) {
            stopClientSync();
        }
    }

    public void onMessage(Message message) {
        try {
            if (processReceivedMessage(message)) {
                stopClient();
            }
        } catch (JMSException e) {
            log.error("Error while listening to messages", e);
            throw new RuntimeException("Error while listening to messages", e);
        } catch (IOException e2) {
            log.error("Error while writing message to file", e2);
            throw new RuntimeException("Error while listening to messages", e2);
        }
    }

    private boolean processReceivedMessage(Message message) throws JMSException, IOException {
        if (null == message) {
            return false;
        }
        long id = Thread.currentThread().getId();
        long currentTimeMillis = System.currentTimeMillis();
        this.totalLatency += currentTimeMillis - message.getJMSTimestamp();
        if (0 == this.firstMessageConsumedTimestamp) {
            this.firstMessageConsumedTimestamp = currentTimeMillis;
        }
        this.lastMessageConsumedTimestamp = currentTimeMillis;
        this.receivedMessageCount.incrementAndGet();
        JMSDeliveryStatus jMSDeliveryStatus = message.getJMSRedelivered() ? JMSDeliveryStatus.REDELIVERED : JMSDeliveryStatus.ORIGINAL;
        if (0 == this.receivedMessageCount.get() % this.consumerConfig.getPrintsPerMessageCount()) {
            log.info("[RECEIVE] ThreadID:" + id + " Destination(" + this.consumerConfig.getExchangeType().getType() + "):" + this.consumerConfig.getDestinationName() + " ReceivedMessageCount:" + this.receivedMessageCount + " MessageToReceive:" + this.consumerConfig.getMaximumMessagesToReceived() + " Original/Redelivered:" + jMSDeliveryStatus.getStatus());
        }
        if (null != this.consumerConfig.getFilePathToWriteStatistics()) {
            AndesClientUtils.writeStatisticsToFile(Long.toString(currentTimeMillis) + "," + Double.toString(getConsumerTPS()) + "," + Double.toString(getAverageLatency()), this.consumerConfig.getFilePathToWriteStatistics());
        }
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            if (null != this.consumerConfig.getFilePathToWriteReceivedMessages()) {
                AndesClientUtils.writeReceivedMessagesToFile(textMessage.getText(), this.consumerConfig.getFilePathToWriteReceivedMessages());
            } else {
                this.consumerConfig.addReceivedMessage(textMessage.getText());
            }
        }
        if (0 == this.receivedMessageCount.get() % this.consumerConfig.getAcknowledgeAfterEachMessageCount() && 2 == this.session.getAcknowledgeMode()) {
            message.acknowledge();
            log.info("Acknowledging message : " + message.getJMSMessageID());
        }
        if (0 == this.receivedMessageCount.get() % this.consumerConfig.getCommitAfterEachMessageCount()) {
            this.session.commit();
            log.info("Committed session");
        } else if (0 == this.receivedMessageCount.get() % this.consumerConfig.getRollbackAfterEachMessageCount()) {
            this.session.rollback();
            log.info("Roll-backed session");
        } else if (0 == this.receivedMessageCount.get() % this.consumerConfig.getRecoverAfterEachMessageCount()) {
            log.info("Recovering session");
            this.session.recover();
        }
        if (this.receivedMessageCount.get() >= this.consumerConfig.getUnSubscribeAfterEachMessageCount()) {
            unSubscribe(true);
            AndesClientUtils.sleepForInterval(1000L);
            return true;
        }
        if (this.receivedMessageCount.get() >= this.consumerConfig.getMaximumMessagesToReceived()) {
            return true;
        }
        if (0 >= this.consumerConfig.getRunningDelay()) {
            return false;
        }
        try {
            Thread.sleep(this.consumerConfig.getRunningDelay());
            return false;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    public AtomicLong getReceivedMessageCount() {
        return this.receivedMessageCount;
    }

    public double getConsumerTPS() {
        return 0 == this.lastMessageConsumedTimestamp - this.firstMessageConsumedTimestamp ? this.receivedMessageCount.doubleValue() / 0.001d : this.receivedMessageCount.doubleValue() / ((this.lastMessageConsumedTimestamp - this.firstMessageConsumedTimestamp) / 1000.0d);
    }

    public double getAverageLatency() {
        if (0.0d != this.receivedMessageCount.doubleValue()) {
            return (this.totalLatency / 1000.0d) / this.receivedMessageCount.doubleValue();
        }
        log.warn("No messages were received to calculate average latency.");
        return 0.0d;
    }

    @Override // org.wso2.mb.integration.common.clients.AndesJMSBase
    public AndesJMSConsumerClientConfiguration getConfig() {
        return this.consumerConfig;
    }

    public Connection getConnection() {
        return this.connection;
    }

    public void setConnection(Connection connection) {
        this.connection = connection;
    }

    public Session getSession() {
        return this.session;
    }

    public void setSession(Session session) {
        this.session = session;
    }

    public MessageConsumer getReceiver() {
        return this.receiver;
    }

    public void setReceiver(MessageConsumer messageConsumer) {
        this.receiver = messageConsumer;
    }
}
