/*
 * Decompiled with CFR 0.152.
 */
package org.activemq;

import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import java.io.Serializable;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.activemq.ActiveMQConnection;
import org.activemq.ActiveMQDispatcher;
import org.activemq.ActiveMQMessageConsumer;
import org.activemq.ActiveMQMessageProducer;
import org.activemq.ActiveMQMessageTransformation;
import org.activemq.ActiveMQQueueBrowser;
import org.activemq.ActiveMQQueueReceiver;
import org.activemq.ActiveMQQueueSender;
import org.activemq.ActiveMQSessionExecutor;
import org.activemq.ActiveMQTopicPublisher;
import org.activemq.ActiveMQTopicSubscriber;
import org.activemq.MessageDispatchChannel;
import org.activemq.TransactionContext;
import org.activemq.command.ActiveMQBytesMessage;
import org.activemq.command.ActiveMQDestination;
import org.activemq.command.ActiveMQMapMessage;
import org.activemq.command.ActiveMQMessage;
import org.activemq.command.ActiveMQObjectMessage;
import org.activemq.command.ActiveMQQueue;
import org.activemq.command.ActiveMQStreamMessage;
import org.activemq.command.ActiveMQTextMessage;
import org.activemq.command.ActiveMQTopic;
import org.activemq.command.Command;
import org.activemq.command.ConsumerId;
import org.activemq.command.MessageAck;
import org.activemq.command.MessageDispatch;
import org.activemq.command.MessageId;
import org.activemq.command.ProducerId;
import org.activemq.command.RedeliveryPolicy;
import org.activemq.command.Response;
import org.activemq.command.SessionId;
import org.activemq.command.SessionInfo;
import org.activemq.command.TransactionId;
import org.activemq.management.JMSSessionStatsImpl;
import org.activemq.management.StatsCapable;
import org.activemq.management.StatsImpl;
import org.activemq.thread.Scheduler;
import org.activemq.transaction.Synchronization;
import org.activemq.util.Callback;
import org.activemq.util.LongSequenceGenerator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class ActiveMQSession
implements Session,
QueueSession,
TopicSession,
StatsCapable,
ActiveMQDispatcher {
    private static final Log log = LogFactory.getLog((Class)ActiveMQSession.class);
    protected int acknowledgementMode;
    private MessageListener messageListener;
    private JMSSessionStatsImpl stats;
    private TransactionContext transactionContext;
    private DeliveryListener deliveryListener;
    protected final ActiveMQConnection connection;
    protected final SessionInfo info;
    protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
    protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
    protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator();
    protected final ActiveMQSessionExecutor executor = new ActiveMQSessionExecutor(this);
    protected final AtomicBoolean started = new AtomicBoolean(false);
    protected final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList();
    protected final CopyOnWriteArrayList producers = new CopyOnWriteArrayList();
    protected boolean closed;
    protected boolean asyncDispatch;

    protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException {
        this.connection = connection;
        this.acknowledgementMode = acknowledgeMode;
        this.asyncDispatch = asyncDispatch;
        this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getSessionId());
        this.setTransactionContext(new TransactionContext(connection));
        connection.addSession(this);
        this.stats = new JMSSessionStatsImpl((List)this.producers, (List)this.consumers);
        this.connection.asyncSendPacket(this.info);
        if (connection.isStarted()) {
            this.start();
        }
    }

    public void setTransactionContext(TransactionContext transactionContext) {
        this.transactionContext = transactionContext;
    }

    public TransactionContext getTransactionContext() {
        return this.transactionContext;
    }

    public StatsImpl getStats() {
        return this.stats;
    }

    public JMSSessionStatsImpl getSessionStats() {
        return this.stats;
    }

    public BytesMessage createBytesMessage() throws JMSException {
        this.checkClosed();
        ActiveMQBytesMessage message = new ActiveMQBytesMessage();
        message.setConnection(this.connection);
        return message;
    }

    public MapMessage createMapMessage() throws JMSException {
        this.checkClosed();
        ActiveMQMapMessage message = new ActiveMQMapMessage();
        message.setConnection(this.connection);
        return message;
    }

    public Message createMessage() throws JMSException {
        this.checkClosed();
        ActiveMQMessage message = new ActiveMQMessage();
        message.setConnection(this.connection);
        return message;
    }

    public ObjectMessage createObjectMessage() throws JMSException {
        this.checkClosed();
        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
        message.setConnection(this.connection);
        return message;
    }

    public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
        this.checkClosed();
        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
        message.setConnection(this.connection);
        message.setObject(object);
        return message;
    }

    public StreamMessage createStreamMessage() throws JMSException {
        this.checkClosed();
        ActiveMQStreamMessage message = new ActiveMQStreamMessage();
        message.setConnection(this.connection);
        return message;
    }

    public TextMessage createTextMessage() throws JMSException {
        this.checkClosed();
        ActiveMQTextMessage message = new ActiveMQTextMessage();
        message.setConnection(this.connection);
        return message;
    }

    public TextMessage createTextMessage(String text) throws JMSException {
        this.checkClosed();
        ActiveMQTextMessage message = new ActiveMQTextMessage();
        message.setText(text);
        message.setConnection(this.connection);
        return message;
    }

    public boolean getTransacted() throws JMSException {
        this.checkClosed();
        return this.acknowledgementMode == 0 || this.transactionContext.isInXATransaction();
    }

    public int getAcknowledgeMode() throws JMSException {
        this.checkClosed();
        return this.acknowledgementMode;
    }

    public void commit() throws JMSException {
        this.checkClosed();
        if (!this.getTransacted()) {
            throw new IllegalStateException("Not a transacted session");
        }
        this.transactionContext.commit();
    }

    public void rollback() throws JMSException {
        this.checkClosed();
        if (!this.getTransacted()) {
            throw new IllegalStateException("Not a transacted session");
        }
        this.transactionContext.rollback();
    }

    public void close() throws JMSException {
        if (!this.closed) {
            this.dispose();
            this.connection.asyncSendPacket(this.info.createRemoveCommand());
        }
    }

    public void dispose() throws JMSException {
        if (!this.closed) {
            Iterator iter = this.consumers.iterator();
            while (iter.hasNext()) {
                ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)iter.next();
                consumer.dispose();
            }
            this.consumers.clear();
            iter = this.producers.iterator();
            while (iter.hasNext()) {
                ActiveMQMessageProducer producer = (ActiveMQMessageProducer)iter.next();
                producer.dispose();
            }
            this.producers.clear();
            try {
                if (this.getTransactionContext().isInLocalTransaction()) {
                    this.rollback();
                }
            }
            catch (JMSException jMSException) {
                // empty catch block
            }
            this.connection.removeSession(this);
            this.transactionContext = null;
            this.closed = true;
        }
    }

    protected void checkClosed() throws IllegalStateException {
        if (this.closed) {
            throw new IllegalStateException("The Session is closed");
        }
    }

    public void recover() throws JMSException {
        this.checkClosed();
        if (this.getTransacted()) {
            throw new IllegalStateException("This session is transacted");
        }
        Iterator iter = this.consumers.iterator();
        while (iter.hasNext()) {
            ActiveMQMessageConsumer c = (ActiveMQMessageConsumer)iter.next();
            c.rollback();
        }
    }

    public MessageListener getMessageListener() throws JMSException {
        this.checkClosed();
        return this.messageListener;
    }

    public void setMessageListener(MessageListener listener) throws JMSException {
        this.checkClosed();
        this.messageListener = listener;
        if (listener != null) {
            this.executor.setDispatchedBySessionPool(true);
        }
    }

    public void run() {
        MessageDispatch messageDispatch;
        while ((messageDispatch = this.executor.dequeueNoWait()) != null) {
            final MessageDispatch md = messageDispatch;
            ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
            if (message.isExpired()) continue;
            if (this.isClientAcknowledge()) {
                message.setAcknowledgeCallback(new Callback(){

                    public void execute() throws Throwable {
                    }
                });
            }
            if (this.deliveryListener != null) {
                this.deliveryListener.beforeDelivery(this, message);
            }
            md.setDeliverySequenceId(this.getNextDeliveryId());
            try {
                this.messageListener.onMessage((Message)message);
            }
            catch (Throwable e) {
                // empty catch block
            }
            try {
                MessageAck ack = new MessageAck(md, 2, 1);
                ack.setFirstMessageId(md.getMessage().getMessageId());
                this.doStartTransaction();
                ack.setTransactionId(this.getTransactionContext().getTransactionId());
                if (ack.getTransactionId() != null) {
                    this.getTransactionContext().addSynchronization(new Synchronization(){

                        public void afterRollback() throws Throwable {
                            md.getMessage().incrementRedeliveryCounter();
                            RedeliveryPolicy redeliveryPolicy = ActiveMQSession.this.connection.getRedeliveryPolicy();
                            int redeliveryCounter = md.getMessage().getRedeliveryCounter();
                            if (redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) {
                                MessageAck ack = new MessageAck(md, 1, 1);
                                ack.setFirstMessageId(md.getMessage().getMessageId());
                                ActiveMQSession.this.asyncSendPacket(ack);
                            } else {
                                long redeliveryDelay = 0L;
                                for (int i = 0; i < redeliveryCounter; ++i) {
                                    if (redeliveryDelay == 0L) {
                                        redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
                                        continue;
                                    }
                                    if (!redeliveryPolicy.isUseExponentialBackOff()) continue;
                                    redeliveryDelay *= (long)redeliveryPolicy.getBackOffMultiplier();
                                }
                                Scheduler.executeAfterDelay(new Runnable(){

                                    public void run() {
                                        ((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
                                    }
                                }, redeliveryDelay);
                            }
                        }
                    });
                }
                this.asyncSendPacket(ack);
            }
            catch (Throwable e) {
                this.connection.onAsyncException(e);
            }
            if (this.deliveryListener == null) continue;
            this.deliveryListener.afterDelivery(this, message);
        }
    }

    public MessageProducer createProducer(Destination destination) throws JMSException {
        this.checkClosed();
        return new ActiveMQMessageProducer(this, this.getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination));
    }

    public MessageConsumer createConsumer(Destination destination) throws JMSException {
        this.checkClosed();
        return this.createConsumer(destination, null);
    }

    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
        this.checkClosed();
        int prefetch = 0;
        prefetch = destination instanceof Topic ? this.connection.getPrefetchPolicy().getTopicPrefetch() : this.connection.getPrefetchPolicy().getQueuePrefetch();
        return new ActiveMQMessageConsumer(this, this.getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(destination), null, messageSelector, prefetch, false, false, this.asyncDispatch);
    }

    protected ConsumerId getNextConsumerId() {
        return new ConsumerId(this.info.getSessionId(), this.consumerIdGenerator.getNextSequenceId());
    }

    protected ProducerId getNextProducerId() {
        return new ProducerId(this.info.getSessionId(), this.producerIdGenerator.getNextSequenceId());
    }

    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal) throws JMSException {
        this.checkClosed();
        return new ActiveMQMessageConsumer(this, this.getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(destination), null, messageSelector, this.connection.getPrefetchPolicy().getTopicPrefetch(), NoLocal, false, this.asyncDispatch);
    }

    public Queue createQueue(String queueName) throws JMSException {
        this.checkClosed();
        return new ActiveMQQueue(queueName);
    }

    public Topic createTopic(String topicName) throws JMSException {
        this.checkClosed();
        return new ActiveMQTopic(topicName);
    }

    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
        this.checkClosed();
        return this.createDurableSubscriber(topic, name, null, false);
    }

    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
        this.checkClosed();
        this.connection.checkClientIDWasManuallySpecified();
        return new ActiveMQTopicSubscriber(this, this.getNextConsumerId(), ActiveMQMessageTransformation.transformDestination((Destination)topic), name, messageSelector, this.connection.getPrefetchPolicy().getDurableTopicPrefetch(), noLocal, false, this.asyncDispatch);
    }

    public QueueBrowser createBrowser(Queue queue) throws JMSException {
        this.checkClosed();
        return this.createBrowser(queue, null);
    }

    public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
        this.checkClosed();
        return new ActiveMQQueueBrowser(this, this.getNextConsumerId(), ActiveMQMessageTransformation.transformDestination((Destination)queue), messageSelector, this.asyncDispatch);
    }

    public TemporaryQueue createTemporaryQueue() throws JMSException {
        this.checkClosed();
        return (TemporaryQueue)this.connection.createTempDestination(false);
    }

    public TemporaryTopic createTemporaryTopic() throws JMSException {
        this.checkClosed();
        return (TemporaryTopic)this.connection.createTempDestination(true);
    }

    public QueueReceiver createReceiver(Queue queue) throws JMSException {
        this.checkClosed();
        return this.createReceiver(queue, null);
    }

    public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
        this.checkClosed();
        return new ActiveMQQueueReceiver(this, this.getNextConsumerId(), ActiveMQMessageTransformation.transformDestination((Destination)queue), messageSelector, this.connection.getPrefetchPolicy().getQueuePrefetch(), this.asyncDispatch);
    }

    public QueueSender createSender(Queue queue) throws JMSException {
        this.checkClosed();
        return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination((Destination)queue));
    }

    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
        this.checkClosed();
        return this.createSubscriber(topic, null, false);
    }

    public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
        this.checkClosed();
        return new ActiveMQTopicSubscriber(this, this.getNextConsumerId(), ActiveMQMessageTransformation.transformDestination((Destination)topic), null, messageSelector, this.connection.getPrefetchPolicy().getTopicPrefetch(), noLocal, false, this.asyncDispatch);
    }

    public TopicPublisher createPublisher(Topic topic) throws JMSException {
        this.checkClosed();
        return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination((Destination)topic));
    }

    public void unsubscribe(String name) throws JMSException {
        this.checkClosed();
        this.connection.unsubscribe(name);
    }

    public void dispatch(MessageDispatch messageDispatch) {
        try {
            this.executor.execute(messageDispatch);
        }
        catch (InterruptedException e) {
            this.connection.onAsyncException(e);
        }
    }

    public void acknowledge() throws JMSException {
        Iterator iter = this.consumers.iterator();
        while (iter.hasNext()) {
            ActiveMQMessageConsumer c = (ActiveMQMessageConsumer)iter.next();
            c.acknowledge();
        }
    }

    protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
        this.consumers.add((Object)consumer);
        if (consumer.isDurableSubscriber()) {
            this.stats.onCreateDurableSubscriber();
        }
        this.connection.addDispatcher(consumer.getConsumerId(), this);
    }

    protected void removeConsumer(ActiveMQMessageConsumer consumer) {
        this.connection.removeDispatcher(consumer.getConsumerId());
        if (consumer.isDurableSubscriber()) {
            this.stats.onRemoveDurableSubscriber();
        }
        this.consumers.remove((Object)consumer);
    }

    protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
        this.producers.add((Object)producer);
    }

    protected void removeProducer(ActiveMQMessageProducer producer) {
        this.producers.remove((Object)producer);
    }

    protected void start() throws JMSException {
        this.started.set(true);
        Iterator iter = this.consumers.iterator();
        while (iter.hasNext()) {
            ActiveMQMessageConsumer c = (ActiveMQMessageConsumer)iter.next();
            c.start();
        }
        this.executor.start();
    }

    protected void stop() throws JMSException {
        Iterator iter = this.consumers.iterator();
        while (iter.hasNext()) {
            ActiveMQMessageConsumer c = (ActiveMQMessageConsumer)iter.next();
            c.stop();
        }
        this.started.set(false);
        this.executor.stop();
    }

    protected SessionId getSessionId() {
        return this.info.getSessionId();
    }

    protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
        this.checkClosed();
        if (destination.isTemporary() && this.connection.isDeleted(destination)) {
            throw new JMSException("Cannot publish to a deleted Destination: " + destination);
        }
        this.doStartTransaction();
        TransactionId txid = this.transactionContext.getTransactionId();
        message.setJMSDestination((Destination)destination);
        message.setJMSDeliveryMode(deliveryMode);
        long expiration = 0L;
        if (!producer.getDisableMessageTimestamp()) {
            long timeStamp = System.currentTimeMillis();
            message.setJMSTimestamp(timeStamp);
            if (timeToLive > 0L) {
                expiration = timeToLive + timeStamp;
            }
        }
        message.setJMSExpiration(expiration);
        message.setJMSPriority(priority);
        long sequenceNumber = producer.getMessageSequence();
        message.setJMSRedelivered(false);
        ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, this.connection);
        if (msg == message) {
            msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
        } else {
            msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
            message.setJMSMessageID(msg.getMessageId().toString());
        }
        msg.setTransactionId(txid);
        if (this.connection.isCopyMessageOnSend()) {
            msg = (ActiveMQMessage)msg.copy();
        }
        msg.onSend();
        msg.setProducerId(msg.getMessageId().getProducerId());
        if (log.isDebugEnabled()) {
            log.debug((Object)("Sending message: " + msg));
        }
        if (!msg.isPersistent() || this.connection.isUseAsyncSend() || txid != null) {
            this.connection.asyncSendPacket(msg);
        } else {
            this.connection.syncSendPacket(msg);
        }
    }

    protected void doStartTransaction() throws JMSException {
        if (this.getTransacted() && !this.transactionContext.isInXATransaction()) {
            this.transactionContext.begin();
        }
    }

    public boolean hasUncomsumedMessages() {
        return !this.executor.isEmpty();
    }

    public boolean isTransacted() {
        return this.acknowledgementMode == 0;
    }

    protected boolean isClientAcknowledge() {
        return this.acknowledgementMode == 2;
    }

    public boolean isAutoAcknowledge() {
        return this.acknowledgementMode == 1;
    }

    public boolean isDupsOkAcknowledge() {
        return this.acknowledgementMode == 3;
    }

    public DeliveryListener getDeliveryListener() {
        return this.deliveryListener;
    }

    public void setDeliveryListener(DeliveryListener deliveryListener) {
        this.deliveryListener = deliveryListener;
    }

    protected SessionInfo getSessionInfo() throws JMSException {
        SessionInfo info = new SessionInfo(this.connection.getConnectionInfo(), this.getSessionId().getSessionId());
        return info;
    }

    public void asyncSendPacket(Command command) throws JMSException {
        this.connection.asyncSendPacket(command);
    }

    public Response syncSendPacket(Command command) throws JMSException {
        return this.connection.syncSendPacket(command);
    }

    public long getNextDeliveryId() {
        return this.deliveryIdGenerator.getNextSequenceId();
    }

    public void redispatch(MessageDispatchChannel unconsumedMessages) throws JMSException {
        List c = unconsumedMessages.removeAll();
        Collections.reverse(c);
        Iterator iter = c.iterator();
        while (iter.hasNext()) {
            MessageDispatch md = (MessageDispatch)iter.next();
            this.executor.executeFirst(md);
        }
    }

    public boolean isRunning() {
        return this.started.get();
    }

    public boolean isAsyncDispatch() {
        return this.asyncDispatch;
    }

    public void setAsyncDispatch(boolean asyncDispatch) {
        this.asyncDispatch = asyncDispatch;
    }

    public List getUnconsumedMessages() {
        return this.executor.getUnconsumedMessages();
    }

    public static interface DeliveryListener {
        public void beforeDelivery(ActiveMQSession var1, Message var2);

        public void afterDelivery(ActiveMQSession var1, Message var2);
    }
}

