/*
 * Decompiled with CFR 0.152.
 */
package org.codehaus.activemq;

import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import java.io.Serializable;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.management.j2ee.statistics.Stats;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.ActiveMQConnection;
import org.codehaus.activemq.ActiveMQMessageConsumer;
import org.codehaus.activemq.ActiveMQMessageDispatcher;
import org.codehaus.activemq.ActiveMQMessageProducer;
import org.codehaus.activemq.ActiveMQMessageTransformation;
import org.codehaus.activemq.ActiveMQQueueBrowser;
import org.codehaus.activemq.ActiveMQQueueReceiver;
import org.codehaus.activemq.ActiveMQQueueSender;
import org.codehaus.activemq.ActiveMQSessionExecutor;
import org.codehaus.activemq.ActiveMQTopicPublisher;
import org.codehaus.activemq.ActiveMQTopicSubscriber;
import org.codehaus.activemq.LocalTransactionEventListener;
import org.codehaus.activemq.management.JMSSessionStatsImpl;
import org.codehaus.activemq.management.StatsCapable;
import org.codehaus.activemq.message.ActiveMQBytesMessage;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.ActiveMQMapMessage;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ActiveMQObjectMessage;
import org.codehaus.activemq.message.ActiveMQQueue;
import org.codehaus.activemq.message.ActiveMQStreamMessage;
import org.codehaus.activemq.message.ActiveMQTemporaryQueue;
import org.codehaus.activemq.message.ActiveMQTemporaryTopic;
import org.codehaus.activemq.message.ActiveMQTextMessage;
import org.codehaus.activemq.message.ActiveMQTopic;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.DurableUnsubscribe;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.message.MessageAcknowledge;
import org.codehaus.activemq.message.ProducerInfo;
import org.codehaus.activemq.message.TransactionInfo;
import org.codehaus.activemq.service.impl.DefaultQueueList;
import org.codehaus.activemq.util.IdGenerator;

public class ActiveMQSession
implements Session,
QueueSession,
TopicSession,
ActiveMQMessageDispatcher,
MessageAcknowledge,
StatsCapable {
    protected static final int CONSUMER_DISPATCH_UNSET = 1;
    protected static final int CONSUMER_DISPATCH_ASYNC = 2;
    protected static final int CONSUMER_DISPATCH_SYNC = 3;
    private static final Log log = LogFactory.getLog((Class)ActiveMQSession.class);
    protected ActiveMQConnection connection;
    protected int acknowledgeMode;
    protected CopyOnWriteArrayList consumers;
    protected CopyOnWriteArrayList producers;
    private IdGenerator transactionIdGenerator;
    private IdGenerator temporaryDestinationGenerator;
    protected IdGenerator packetIdGenerator;
    private IdGenerator consumerIdGenerator;
    private MessageListener messageListener;
    protected SynchronizedBoolean closed;
    private SynchronizedBoolean startTransaction;
    private String sessionId;
    protected String currentTransactionId;
    private long startTime;
    private LocalTransactionEventListener localTransactionEventListener;
    private DefaultQueueList deliveredMessages;
    private ActiveMQSessionExecutor messageExecutor;
    private JMSSessionStatsImpl stats;
    private int consumerDispatchState;

    protected ActiveMQSession(ActiveMQConnection theConnection, int theAcknowledgeMode) throws JMSException {
        this.connection = theConnection;
        this.acknowledgeMode = theAcknowledgeMode;
        this.consumers = new CopyOnWriteArrayList();
        this.producers = new CopyOnWriteArrayList();
        this.consumerIdGenerator = new IdGenerator();
        this.transactionIdGenerator = new IdGenerator();
        this.temporaryDestinationGenerator = new IdGenerator();
        this.packetIdGenerator = new IdGenerator();
        this.closed = new SynchronizedBoolean(false);
        this.startTransaction = new SynchronizedBoolean(false);
        this.sessionId = this.connection.generateSessionId();
        this.startTime = System.currentTimeMillis();
        this.deliveredMessages = new DefaultQueueList();
        this.messageExecutor = new ActiveMQSessionExecutor(this, this.connection.getMemoryBoundedQueue(this.sessionId));
        this.connection.addSession(this);
        this.stats = new JMSSessionStatsImpl((List)this.producers, (List)this.consumers);
        this.consumerDispatchState = 1;
    }

    public Stats getStats() {
        return this.stats;
    }

    public JMSSessionStatsImpl getSessionStats() {
        return this.stats;
    }

    public BytesMessage createBytesMessage() throws JMSException {
        this.checkClosed();
        return new ActiveMQBytesMessage();
    }

    public MapMessage createMapMessage() throws JMSException {
        this.checkClosed();
        return new ActiveMQMapMessage();
    }

    public Message createMessage() throws JMSException {
        this.checkClosed();
        return new ActiveMQMessage();
    }

    public ObjectMessage createObjectMessage() throws JMSException {
        this.checkClosed();
        return new ActiveMQObjectMessage();
    }

    public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
        this.checkClosed();
        ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
        msg.setObject(object);
        return msg;
    }

    public StreamMessage createStreamMessage() throws JMSException {
        this.checkClosed();
        return new ActiveMQStreamMessage();
    }

    public TextMessage createTextMessage() throws JMSException {
        this.checkClosed();
        return new ActiveMQTextMessage();
    }

    public TextMessage createTextMessage(String text) throws JMSException {
        this.checkClosed();
        ActiveMQTextMessage msg = new ActiveMQTextMessage();
        msg.setText(text);
        return msg;
    }

    public boolean getTransacted() throws JMSException {
        this.checkClosed();
        return this.acknowledgeMode == 0;
    }

    public int getAcknowledgeMode() throws JMSException {
        this.checkClosed();
        return this.acknowledgeMode;
    }

    public void commit() throws JMSException {
        this.commitLocalTransaction();
    }

    protected void commitLocalTransaction() throws IllegalStateException, JMSException {
        this.checkClosed();
        if (!this.getTransacted()) {
            throw new IllegalStateException("Not a transacted session");
        }
        if (this.startTransaction.commit(true, false)) {
            TransactionInfo info = new TransactionInfo();
            info.setId(this.packetIdGenerator.generateId());
            info.setTransactionId(this.currentTransactionId);
            info.setType(103);
            this.currentTransactionId = null;
            this.connection.syncSendPacket(info);
            if (this.localTransactionEventListener != null) {
                this.localTransactionEventListener.commitEvent();
            }
        }
        this.deliveredMessages.clear();
    }

    public void rollback() throws JMSException {
        this.rollbackLocalTransaction();
    }

    protected void rollbackLocalTransaction() throws IllegalStateException, JMSException {
        this.checkClosed();
        if (!this.getTransacted()) {
            throw new IllegalStateException("Not a transacted session");
        }
        if (this.startTransaction.commit(true, false)) {
            TransactionInfo info = new TransactionInfo();
            info.setId(this.packetIdGenerator.generateId());
            info.setTransactionId(this.currentTransactionId);
            info.setType(105);
            this.currentTransactionId = null;
            this.connection.asyncSendPacket(info);
            if (this.localTransactionEventListener != null) {
                this.localTransactionEventListener.rollbackEvent();
            }
        }
        this.redeliverUnacknowledgedMessages(true);
        this.deliveredMessages.clear();
    }

    public void close() throws JMSException {
        if (!this.closed.get()) {
            if (this.getTransacted()) {
                this.rollback();
            }
            this.doClose();
            this.closed.set(true);
        }
    }

    protected void doClose() throws JMSException {
        Iterator i = this.consumers.iterator();
        while (i.hasNext()) {
            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)i.next();
            consumer.close();
        }
        i = this.producers.iterator();
        while (i.hasNext()) {
            ActiveMQMessageProducer producer = (ActiveMQMessageProducer)i.next();
            producer.close();
        }
        this.consumers.clear();
        this.producers.clear();
        this.doAcknowledge(true);
        this.connection.removeSession(this);
        this.messageExecutor.close();
        this.deliveredMessages.clear();
    }

    protected void checkClosed() throws IllegalStateException {
        if (this.closed.get()) {
            throw new IllegalStateException("The Session is closed");
        }
    }

    public void recover() throws JMSException {
        this.checkClosed();
        if (this.getTransacted()) {
            throw new IllegalStateException("This session is transacted");
        }
        this.redeliverUnacknowledgedMessages();
    }

    public MessageListener getMessageListener() throws JMSException {
        this.checkClosed();
        return this.messageListener;
    }

    public void setMessageListener(MessageListener listener) throws JMSException {
        this.checkClosed();
        this.messageListener = listener;
        if (listener != null) {
            this.messageExecutor.setDoDispatch(false);
        }
    }

    public void run() {
        ActiveMQMessage message;
        boolean doRemove;
        MessageListener listener = this.messageListener;
        boolean bl = doRemove = this.acknowledgeMode != 2;
        while ((message = this.messageExecutor.dequeueNoWait()) != null) {
            if (!message.isExpired() && listener != null) {
                try {
                    listener.onMessage((Message)message);
                    this.messageDelivered(true, message, true, false);
                }
                catch (Throwable t) {
                    log.info((Object)("Caught :" + t), t);
                    this.messageDelivered(true, message, false, false);
                }
                continue;
            }
            this.messageDelivered(true, message, false, message.isExpired());
        }
    }

    public MessageProducer createProducer(Destination destination) throws JMSException {
        this.checkClosed();
        return new ActiveMQMessageProducer(this, ActiveMQMessageTransformation.transformDestination(destination));
    }

    public MessageConsumer createConsumer(Destination destination) throws JMSException {
        this.checkClosed();
        int prefetch = destination instanceof Topic ? this.connection.getPrefetchPolicy().getTopicPrefetch() : this.connection.getPrefetchPolicy().getQueuePrefetch();
        return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "", "", this.connection.getNextConsumerNumber(), prefetch, false, false);
    }

    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
        this.checkClosed();
        int prefetch = destination instanceof Topic ? this.connection.getPrefetchPolicy().getTopicPrefetch() : this.connection.getPrefetchPolicy().getQueuePrefetch();
        return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "", messageSelector, this.connection.getNextConsumerNumber(), prefetch, false, false);
    }

    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal) throws JMSException {
        this.checkClosed();
        int prefetch = this.connection.getPrefetchPolicy().getTopicPrefetch();
        return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "", messageSelector, this.connection.getNextConsumerNumber(), prefetch, NoLocal, false);
    }

    public Queue createQueue(String queueName) throws JMSException {
        this.checkClosed();
        return new ActiveMQQueue(queueName);
    }

    public Topic createTopic(String topicName) throws JMSException {
        this.checkClosed();
        return new ActiveMQTopic(topicName);
    }

    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
        this.checkClosed();
        return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination((Destination)topic), name, "", this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getDurableTopicPrefetch(), false, false);
    }

    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
        this.checkClosed();
        return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination((Destination)topic), name, messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getDurableTopicPrefetch(), noLocal, false);
    }

    public QueueBrowser createBrowser(Queue queue) throws JMSException {
        this.checkClosed();
        return new ActiveMQQueueBrowser(this, ActiveMQMessageTransformation.transformDestination((Destination)queue), "", this.connection.getNextConsumerNumber());
    }

    public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
        this.checkClosed();
        return new ActiveMQQueueBrowser(this, ActiveMQMessageTransformation.transformDestination((Destination)queue), messageSelector, this.connection.getNextConsumerNumber());
    }

    public TemporaryQueue createTemporaryQueue() throws JMSException {
        this.checkClosed();
        String tempQueueName = "TemporaryQueue-" + ActiveMQDestination.createTemporaryName(this.connection.getInitializedClientID());
        tempQueueName = tempQueueName + this.temporaryDestinationGenerator.generateId();
        return new ActiveMQTemporaryQueue(tempQueueName);
    }

    public TemporaryTopic createTemporaryTopic() throws JMSException {
        this.checkClosed();
        String tempTopicName = "TemporaryTopic-" + ActiveMQDestination.createTemporaryName(this.connection.getInitializedClientID());
        tempTopicName = tempTopicName + this.temporaryDestinationGenerator.generateId();
        return new ActiveMQTemporaryTopic(tempTopicName);
    }

    public QueueReceiver createReceiver(Queue queue) throws JMSException {
        this.checkClosed();
        return new ActiveMQQueueReceiver(this, ActiveMQDestination.transformDestination((Destination)queue), "", this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getQueuePrefetch());
    }

    public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
        this.checkClosed();
        return new ActiveMQQueueReceiver(this, ActiveMQMessageTransformation.transformDestination((Destination)queue), messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getQueuePrefetch());
    }

    public QueueSender createSender(Queue queue) throws JMSException {
        this.checkClosed();
        return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination((Destination)queue));
    }

    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
        this.checkClosed();
        return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination((Destination)topic), null, null, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getTopicPrefetch(), false, false);
    }

    public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
        this.checkClosed();
        return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination((Destination)topic), null, messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getTopicPrefetch(), noLocal, false);
    }

    public TopicPublisher createPublisher(Topic topic) throws JMSException {
        this.checkClosed();
        return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination((Destination)topic));
    }

    public void unsubscribe(String name) throws JMSException {
        this.checkClosed();
        DurableUnsubscribe ds = new DurableUnsubscribe();
        ds.setId(this.packetIdGenerator.generateId());
        ds.setClientId(this.connection.getClientID());
        ds.setSubscriberName(name);
        this.connection.syncSendPacket(ds);
    }

    public boolean isTarget(ActiveMQMessage message) {
        Iterator i = this.consumers.iterator();
        while (i.hasNext()) {
            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)i.next();
            if (!message.isConsumerTarget(consumer.getConsumerNumber())) continue;
            return true;
        }
        return false;
    }

    public void dispatch(ActiveMQMessage message) {
        message.setMessageAcknowledge(this);
        this.messageExecutor.execute(message);
    }

    public void acknowledge() throws JMSException {
        this.doAcknowledge(false);
    }

    protected void doAcknowledge(boolean isClosing) throws JMSException {
        this.checkClosed();
        if (this.acknowledgeMode == 2) {
            ActiveMQMessage msg = null;
            while ((msg = (ActiveMQMessage)this.deliveredMessages.removeFirst()) != null) {
                MessageAck ack = new MessageAck();
                ack.setConsumerId(msg.getConsumerId());
                ack.setMessageID(msg.getJMSMessageID());
                if (!isClosing) {
                    ack.setMessageRead(msg.isMessageConsumed());
                }
                ack.setId(this.packetIdGenerator.generateId());
                ack.setDestination(msg.getJMSActiveMQDestination());
                ack.setPersistent(msg.getJMSDeliveryMode() == 2);
                this.connection.asyncSendPacket(ack, false);
            }
            this.deliveredMessages.clear();
        }
    }

    protected void messageDelivered(boolean sendAcknowledge, ActiveMQMessage message, boolean messageConsumed, boolean messageExpired) {
        if (message != null && !this.closed.get()) {
            if (this.isClientAcknowledge() && !messageExpired || this.isTransacted() && message.isTransientConsumed()) {
                message.setMessageConsumed(messageConsumed);
                this.deliveredMessages.add(message);
            }
            if (sendAcknowledge && !this.isClientAcknowledge()) {
                try {
                    this.doStartTransaction();
                    MessageAck ack = new MessageAck();
                    ack.setConsumerId(message.getConsumerId());
                    ack.setTransactionId(this.currentTransactionId);
                    ack.setMessageID(message.getJMSMessageID());
                    ack.setMessageRead(messageConsumed);
                    ack.setId(this.packetIdGenerator.generateId());
                    ack.setXaTransacted(this.isXaTransacted());
                    ack.setDestination(message.getJMSActiveMQDestination());
                    ack.setPersistent(message.getJMSDeliveryMode() == 2);
                    ack.setExpired(messageExpired);
                    this.connection.asyncSendPacket(ack);
                }
                catch (JMSException e) {
                    log.warn((Object)"failed to notify Broker that message is delivered", (Throwable)e);
                }
            }
        }
    }

    protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
        this.connection.sendConnectionInfoToBroker();
        if (consumer.isDurableSubscriber()) {
            this.stats.onCreateDurableSubscriber();
        }
        consumer.setConsumerId(this.consumerIdGenerator.generateId());
        ConsumerInfo info = this.createConsumerInfo(consumer);
        info.setStarted(true);
        this.consumers.add((Object)consumer);
        try {
            this.connection.syncSendPacket(info);
        }
        catch (JMSException jmsEx) {
            this.consumers.remove((Object)consumer);
            throw jmsEx;
        }
    }

    protected void removeConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
        this.consumers.remove((Object)consumer);
        if (consumer.isDurableSubscriber()) {
            this.stats.onRemoveDurableSubscriber();
        }
        if (!this.closed.get()) {
            ConsumerInfo info = this.createConsumerInfo(consumer);
            info.setStarted(false);
            this.connection.asyncSendPacket(info, false);
        }
    }

    protected ConsumerInfo createConsumerInfo(ActiveMQMessageConsumer consumer) throws JMSException {
        ConsumerInfo info = new ConsumerInfo();
        info.setConsumerId(consumer.consumerId);
        info.setClientId(this.connection.clientID);
        info.setSessionId(this.sessionId);
        info.setConsumerNo(consumer.consumerNumber);
        info.setPrefetchNumber(consumer.prefetchNumber);
        info.setDestination(consumer.destination);
        info.setId(this.packetIdGenerator.generateId());
        info.setNoLocal(consumer.noLocal);
        info.setBrowser(consumer.browser);
        info.setSelector(consumer.messageSelector);
        info.setStartTime(consumer.startTime);
        info.setConsumerName(consumer.consumerName);
        return info;
    }

    protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
        this.connection.sendConnectionInfoToBroker();
        ProducerInfo info = this.createProducerInfo(producer);
        info.setStarted(true);
        this.connection.syncSendPacket(info);
        this.producers.add((Object)producer);
    }

    protected void removeProducer(ActiveMQMessageProducer producer) throws JMSException {
        this.producers.remove((Object)producer);
        if (!this.closed.get()) {
            ProducerInfo info = this.createProducerInfo(producer);
            info.setStarted(false);
            this.connection.asyncSendPacket(info, false);
        }
    }

    protected ProducerInfo createProducerInfo(ActiveMQMessageProducer producer) throws JMSException {
        ProducerInfo info = new ProducerInfo();
        info.setProducerId(producer.getProducerId());
        info.setClientId(this.connection.clientID);
        info.setSessionId(this.sessionId);
        info.setDestination(producer.defaultDestination);
        info.setId(this.packetIdGenerator.generateId());
        info.setStartTime(producer.getStartTime());
        return info;
    }

    protected void start() {
        this.messageExecutor.start();
    }

    protected void stop() {
        this.messageExecutor.stop();
    }

    protected String getSessionId() {
        return this.sessionId;
    }

    protected void setSessionId(String sessionId) {
        this.sessionId = sessionId;
    }

    protected long getStartTime() {
        return this.startTime;
    }

    protected void setStartTime(long startTime) {
        this.startTime = startTime;
    }

    protected void send(ActiveMQMessageProducer producer, Destination destination, Message message, int deliveryMode, int priority, long timeToLive, boolean reuseMessageId) throws JMSException {
        this.checkClosed();
        this.connection.sendConnectionInfoToBroker();
        this.doStartTransaction();
        message.setJMSDestination(destination);
        message.setJMSDeliveryMode(deliveryMode);
        message.setJMSPriority(priority);
        long expiration = 0L;
        if (!producer.getDisableMessageTimestamp()) {
            long timeStamp = System.currentTimeMillis();
            message.setJMSTimestamp(timeStamp);
            if (timeToLive > 0L) {
                expiration = timeToLive + timeStamp;
            }
        }
        message.setJMSExpiration(expiration);
        String id = message.getJMSMessageID();
        String producerId = producer.getProducerId();
        long sequenceNumber = producer.getNextSequenceNumber();
        if (id == null || id.length() == 0 || !producer.getDisableMessageID() && !reuseMessageId) {
            message.setJMSMessageID(producerId + sequenceNumber);
        }
        ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message);
        msg.prepareMessageBody();
        msg.setProducerID(producerId);
        msg.setSequenceNumber(sequenceNumber);
        msg.setTransactionId(this.currentTransactionId);
        msg.setXaTransacted(this.isXaTransacted());
        msg.setJMSClientID(this.connection.clientID);
        msg.setJMSRedelivered(false);
        if (log.isDebugEnabled()) {
            log.debug((Object)("Sending message: " + msg));
        }
        if (this.connection.isUseAsyncSend()) {
            this.connection.asyncSendPacket(msg);
        } else {
            this.connection.syncSendPacket(msg);
        }
    }

    protected void doStartTransaction() throws JMSException {
        if (this.getTransacted()) {
            this.startLocalTransaction();
        }
    }

    protected void startLocalTransaction() throws JMSException {
        if (this.startTransaction.commit(false, true)) {
            this.currentTransactionId = this.transactionIdGenerator.generateId();
            TransactionInfo info = new TransactionInfo();
            info.setId(this.packetIdGenerator.generateId());
            info.setTransactionId(this.currentTransactionId);
            info.setType(101);
            this.connection.asyncSendPacket(info);
            if (this.localTransactionEventListener != null) {
                this.localTransactionEventListener.beginEvent();
            }
        }
    }

    public LocalTransactionEventListener getLocalTransactionEventListener() {
        return this.localTransactionEventListener;
    }

    public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) {
        this.localTransactionEventListener = localTransactionEventListener;
    }

    protected boolean isXaTransacted() {
        return false;
    }

    protected void setSessionConsumerDispatchState(int value) throws JMSException {
        if (this.consumerDispatchState != 1 && value != this.consumerDispatchState) {
            String errorStr = "Cannot mix consumer dispatching on a session - already: ";
            errorStr = value == 3 ? errorStr + "synchronous" : errorStr + "asynchronous";
            throw new IllegalStateException(errorStr);
        }
        this.consumerDispatchState = value;
    }

    protected void redeliverUnacknowledgedMessages() {
        this.redeliverUnacknowledgedMessages(false);
    }

    protected void redeliverUnacknowledgedMessages(boolean onlyDeliverTransientConsumed) {
        this.messageExecutor.stop();
        LinkedList<Object> replay = new LinkedList<Object>();
        Object obj = null;
        while ((obj = this.deliveredMessages.removeFirst()) != null) {
            replay.add(obj);
        }
        this.deliveredMessages.clear();
        if (!replay.isEmpty()) {
            ListIterator i = replay.listIterator(replay.size());
            while (i.hasPrevious()) {
                ActiveMQMessage msg = (ActiveMQMessage)i.previous();
                if (onlyDeliverTransientConsumed && !msg.isTransientConsumed()) continue;
                msg.setJMSRedelivered(true);
                msg.incrementDeliveryCount();
                this.messageExecutor.executeFirst(msg);
            }
        }
        replay.clear();
        this.messageExecutor.start();
    }

    protected void clearMessagesInProgress() {
        this.messageExecutor.clearMessagesInProgress();
        Iterator i = this.consumers.iterator();
        while (i.hasNext()) {
            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)i.next();
            consumer.clearMessagesInProgress();
        }
    }

    protected boolean isTransacted() {
        return this.acknowledgeMode == 0;
    }

    protected boolean isClientAcknowledge() {
        return this.acknowledgeMode == 2;
    }
}

