001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.File;
020import java.io.InputStream;
021import java.io.Serializable;
022import java.net.URL;
023import java.util.Collections;
024import java.util.Iterator;
025import java.util.List;
026import java.util.concurrent.CopyOnWriteArrayList;
027import java.util.concurrent.ThreadPoolExecutor;
028import java.util.concurrent.atomic.AtomicBoolean;
029import java.util.concurrent.atomic.AtomicInteger;
030
031import javax.jms.BytesMessage;
032import javax.jms.Destination;
033import javax.jms.IllegalStateException;
034import javax.jms.InvalidDestinationException;
035import javax.jms.InvalidSelectorException;
036import javax.jms.JMSException;
037import javax.jms.MapMessage;
038import javax.jms.Message;
039import javax.jms.MessageConsumer;
040import javax.jms.MessageListener;
041import javax.jms.MessageProducer;
042import javax.jms.ObjectMessage;
043import javax.jms.Queue;
044import javax.jms.QueueBrowser;
045import javax.jms.QueueReceiver;
046import javax.jms.QueueSender;
047import javax.jms.QueueSession;
048import javax.jms.Session;
049import javax.jms.StreamMessage;
050import javax.jms.TemporaryQueue;
051import javax.jms.TemporaryTopic;
052import javax.jms.TextMessage;
053import javax.jms.Topic;
054import javax.jms.TopicPublisher;
055import javax.jms.TopicSession;
056import javax.jms.TopicSubscriber;
057import javax.jms.TransactionRolledBackException;
058
059import org.apache.activemq.blob.BlobDownloader;
060import org.apache.activemq.blob.BlobTransferPolicy;
061import org.apache.activemq.blob.BlobUploader;
062import org.apache.activemq.command.ActiveMQBlobMessage;
063import org.apache.activemq.command.ActiveMQBytesMessage;
064import org.apache.activemq.command.ActiveMQDestination;
065import org.apache.activemq.command.ActiveMQMapMessage;
066import org.apache.activemq.command.ActiveMQMessage;
067import org.apache.activemq.command.ActiveMQObjectMessage;
068import org.apache.activemq.command.ActiveMQQueue;
069import org.apache.activemq.command.ActiveMQStreamMessage;
070import org.apache.activemq.command.ActiveMQTempDestination;
071import org.apache.activemq.command.ActiveMQTempQueue;
072import org.apache.activemq.command.ActiveMQTempTopic;
073import org.apache.activemq.command.ActiveMQTextMessage;
074import org.apache.activemq.command.ActiveMQTopic;
075import org.apache.activemq.command.Command;
076import org.apache.activemq.command.ConsumerId;
077import org.apache.activemq.command.MessageAck;
078import org.apache.activemq.command.MessageDispatch;
079import org.apache.activemq.command.MessageId;
080import org.apache.activemq.command.ProducerId;
081import org.apache.activemq.command.RemoveInfo;
082import org.apache.activemq.command.Response;
083import org.apache.activemq.command.SessionId;
084import org.apache.activemq.command.SessionInfo;
085import org.apache.activemq.command.TransactionId;
086import org.apache.activemq.management.JMSSessionStatsImpl;
087import org.apache.activemq.management.StatsCapable;
088import org.apache.activemq.management.StatsImpl;
089import org.apache.activemq.thread.Scheduler;
090import org.apache.activemq.transaction.Synchronization;
091import org.apache.activemq.usage.MemoryUsage;
092import org.apache.activemq.util.Callback;
093import org.apache.activemq.util.LongSequenceGenerator;
094import org.slf4j.Logger;
095import org.slf4j.LoggerFactory;
096
097/**
098 * <P>
099 * A <CODE>Session</CODE> object is a single-threaded context for producing
100 * and consuming messages. Although it may allocate provider resources outside
101 * the Java virtual machine (JVM), it is considered a lightweight JMS object.
102 * <P>
103 * A session serves several purposes:
104 * <UL>
105 * <LI>It is a factory for its message producers and consumers.
106 * <LI>It supplies provider-optimized message factories.
107 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and
108 * <CODE>TemporaryQueues</CODE>.
109 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE>
110 * objects for those clients that need to dynamically manipulate
111 * provider-specific destination names.
112 * <LI>It supports a single series of transactions that combine work spanning
113 * its producers and consumers into atomic units.
114 * <LI>It defines a serial order for the messages it consumes and the messages
115 * it produces.
116 * <LI>It retains messages it consumes until they have been acknowledged.
117 * <LI>It serializes execution of message listeners registered with its message
118 * consumers.
119 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
120 * </UL>
121 * <P>
122 * A session can create and service multiple message producers and consumers.
123 * <P>
124 * One typical use is to have a thread block on a synchronous
125 * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then
126 * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
127 * <P>
128 * If a client desires to have one thread produce messages while others consume
129 * them, the client should use a separate session for its producing thread.
130 * <P>
131 * Once a connection has been started, any session with one or more registered
132 * message listeners is dedicated to the thread of control that delivers
133 * messages to it. It is erroneous for client code to use this session or any of
134 * its constituent objects from another thread of control. The only exception to
135 * this rule is the use of the session or connection <CODE>close</CODE>
136 * method.
137 * <P>
138 * It should be easy for most clients to partition their work naturally into
139 * sessions. This model allows clients to start simply and incrementally add
140 * message processing complexity as their need for concurrency grows.
141 * <P>
142 * The <CODE>close</CODE> method is the only session method that can be called
143 * while some other session method is being executed in another thread.
144 * <P>
145 * A session may be specified as transacted. Each transacted session supports a
146 * single series of transactions. Each transaction groups a set of message sends
147 * and a set of message receives into an atomic unit of work. In effect,
148 * transactions organize a session's input message stream and output message
149 * stream into series of atomic units. When a transaction commits, its atomic
150 * unit of input is acknowledged and its associated atomic unit of output is
151 * sent. If a transaction rollback is done, the transaction's sent messages are
152 * destroyed and the session's input is automatically recovered.
153 * <P>
154 * The content of a transaction's input and output units is simply those
155 * messages that have been produced and consumed within the session's current
156 * transaction.
157 * <P>
158 * A transaction is completed using either its session's <CODE>commit</CODE>
159 * method or its session's <CODE>rollback </CODE> method. The completion of a
160 * session's current transaction automatically begins the next. The result is
161 * that a transacted session always has a current transaction within which its
162 * work is done.
163 * <P>
164 * The Java Transaction Service (JTS) or some other transaction monitor may be
165 * used to combine a session's transaction with transactions on other resources
166 * (databases, other JMS sessions, etc.). Since Java distributed transactions
167 * are controlled via the Java Transaction API (JTA), use of the session's
168 * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is
169 * prohibited.
170 * <P>
171 * The JMS API does not require support for JTA; however, it does define how a
172 * provider supplies this support.
173 * <P>
174 * Although it is also possible for a JMS client to handle distributed
175 * transactions directly, it is unlikely that many JMS clients will do this.
176 * Support for JTA in the JMS API is targeted at systems vendors who will be
177 * integrating the JMS API into their application server products.
178 *
179 *
180 * @see javax.jms.Session
181 * @see javax.jms.QueueSession
182 * @see javax.jms.TopicSession
183 * @see javax.jms.XASession
184 */
185public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {
186
187    /**
188     * Only acknowledge an individual message - using message.acknowledge()
189     * as opposed to CLIENT_ACKNOWLEDGE which
190     * acknowledges all messages consumed by a session at when acknowledge()
191     * is called
192     */
193    public static final int INDIVIDUAL_ACKNOWLEDGE = 4;
194    public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE;
195
196    public static interface DeliveryListener {
197        void beforeDelivery(ActiveMQSession session, Message msg);
198
199        void afterDelivery(ActiveMQSession session, Message msg);
200    }
201
202    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class);
203    private final ThreadPoolExecutor connectionExecutor;
204
205    protected int acknowledgementMode;
206    protected final ActiveMQConnection connection;
207    protected final SessionInfo info;
208    protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
209    protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
210    protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator();
211    protected final ActiveMQSessionExecutor executor;
212    protected final AtomicBoolean started = new AtomicBoolean(false);
213
214    protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>();
215    protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>();
216
217    protected boolean closed;
218    private volatile boolean synchronizationRegistered;
219    protected boolean asyncDispatch;
220    protected boolean sessionAsyncDispatch;
221    protected final boolean debug;
222    protected Object sendMutex = new Object();
223    private final AtomicBoolean clearInProgress = new AtomicBoolean();
224
225    private MessageListener messageListener;
226    private final JMSSessionStatsImpl stats;
227    private TransactionContext transactionContext;
228    private DeliveryListener deliveryListener;
229    private MessageTransformer transformer;
230    private BlobTransferPolicy blobTransferPolicy;
231    private long lastDeliveredSequenceId;
232
233    /**
234     * Construct the Session
235     *
236     * @param connection
237     * @param sessionId
238     * @param acknowledgeMode n.b if transacted - the acknowledgeMode ==
239     *                Session.SESSION_TRANSACTED
240     * @param asyncDispatch
241     * @param sessionAsyncDispatch
242     * @throws JMSException on internal error
243     */
244    protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException {
245        this.debug = LOG.isDebugEnabled();
246        this.connection = connection;
247        this.acknowledgementMode = acknowledgeMode;
248        this.asyncDispatch = asyncDispatch;
249        this.sessionAsyncDispatch = sessionAsyncDispatch;
250        this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
251        setTransactionContext(new TransactionContext(connection));
252        stats = new JMSSessionStatsImpl(producers, consumers);
253        this.connection.asyncSendPacket(info);
254        setTransformer(connection.getTransformer());
255        setBlobTransferPolicy(connection.getBlobTransferPolicy());
256        this.connectionExecutor=connection.getExecutor();
257        this.executor = new ActiveMQSessionExecutor(this);
258        connection.addSession(this);
259        if (connection.isStarted()) {
260            start();
261        }
262
263    }
264
265    protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException {
266        this(connection, sessionId, acknowledgeMode, asyncDispatch, true);
267    }
268
269    /**
270     * Sets the transaction context of the session.
271     *
272     * @param transactionContext - provides the means to control a JMS
273     *                transaction.
274     */
275    public void setTransactionContext(TransactionContext transactionContext) {
276        this.transactionContext = transactionContext;
277    }
278
279    /**
280     * Returns the transaction context of the session.
281     *
282     * @return transactionContext - session's transaction context.
283     */
284    public TransactionContext getTransactionContext() {
285        return transactionContext;
286    }
287
288    /*
289     * (non-Javadoc)
290     *
291     * @see org.apache.activemq.management.StatsCapable#getStats()
292     */
293    @Override
294    public StatsImpl getStats() {
295        return stats;
296    }
297
298    /**
299     * Returns the session's statistics.
300     *
301     * @return stats - session's statistics.
302     */
303    public JMSSessionStatsImpl getSessionStats() {
304        return stats;
305    }
306
307    /**
308     * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE>
309     * object is used to send a message containing a stream of uninterpreted
310     * bytes.
311     *
312     * @return the an ActiveMQBytesMessage
313     * @throws JMSException if the JMS provider fails to create this message due
314     *                 to some internal error.
315     */
316    @Override
317    public BytesMessage createBytesMessage() throws JMSException {
318        ActiveMQBytesMessage message = new ActiveMQBytesMessage();
319        configureMessage(message);
320        return message;
321    }
322
323    /**
324     * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE>
325     * object is used to send a self-defining set of name-value pairs, where
326     * names are <CODE>String</CODE> objects and values are primitive values
327     * in the Java programming language.
328     *
329     * @return an ActiveMQMapMessage
330     * @throws JMSException if the JMS provider fails to create this message due
331     *                 to some internal error.
332     */
333    @Override
334    public MapMessage createMapMessage() throws JMSException {
335        ActiveMQMapMessage message = new ActiveMQMapMessage();
336        configureMessage(message);
337        return message;
338    }
339
340    /**
341     * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE>
342     * interface is the root interface of all JMS messages. A
343     * <CODE>Message</CODE> object holds all the standard message header
344     * information. It can be sent when a message containing only header
345     * information is sufficient.
346     *
347     * @return an ActiveMQMessage
348     * @throws JMSException if the JMS provider fails to create this message due
349     *                 to some internal error.
350     */
351    @Override
352    public Message createMessage() throws JMSException {
353        ActiveMQMessage message = new ActiveMQMessage();
354        configureMessage(message);
355        return message;
356    }
357
358    /**
359     * Creates an <CODE>ObjectMessage</CODE> object. An
360     * <CODE>ObjectMessage</CODE> object is used to send a message that
361     * contains a serializable Java object.
362     *
363     * @return an ActiveMQObjectMessage
364     * @throws JMSException if the JMS provider fails to create this message due
365     *                 to some internal error.
366     */
367    @Override
368    public ObjectMessage createObjectMessage() throws JMSException {
369        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
370        configureMessage(message);
371        return message;
372    }
373
374    /**
375     * Creates an initialized <CODE>ObjectMessage</CODE> object. An
376     * <CODE>ObjectMessage</CODE> object is used to send a message that
377     * contains a serializable Java object.
378     *
379     * @param object the object to use to initialize this message
380     * @return an ActiveMQObjectMessage
381     * @throws JMSException if the JMS provider fails to create this message due
382     *                 to some internal error.
383     */
384    @Override
385    public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
386        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
387        configureMessage(message);
388        message.setObject(object);
389        return message;
390    }
391
392    /**
393     * Creates a <CODE>StreamMessage</CODE> object. A
394     * <CODE>StreamMessage</CODE> object is used to send a self-defining
395     * stream of primitive values in the Java programming language.
396     *
397     * @return an ActiveMQStreamMessage
398     * @throws JMSException if the JMS provider fails to create this message due
399     *                 to some internal error.
400     */
401    @Override
402    public StreamMessage createStreamMessage() throws JMSException {
403        ActiveMQStreamMessage message = new ActiveMQStreamMessage();
404        configureMessage(message);
405        return message;
406    }
407
408    /**
409     * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE>
410     * object is used to send a message containing a <CODE>String</CODE>
411     * object.
412     *
413     * @return an ActiveMQTextMessage
414     * @throws JMSException if the JMS provider fails to create this message due
415     *                 to some internal error.
416     */
417    @Override
418    public TextMessage createTextMessage() throws JMSException {
419        ActiveMQTextMessage message = new ActiveMQTextMessage();
420        configureMessage(message);
421        return message;
422    }
423
424    /**
425     * Creates an initialized <CODE>TextMessage</CODE> object. A
426     * <CODE>TextMessage</CODE> object is used to send a message containing a
427     * <CODE>String</CODE>.
428     *
429     * @param text the string used to initialize this message
430     * @return an ActiveMQTextMessage
431     * @throws JMSException if the JMS provider fails to create this message due
432     *                 to some internal error.
433     */
434    @Override
435    public TextMessage createTextMessage(String text) throws JMSException {
436        ActiveMQTextMessage message = new ActiveMQTextMessage();
437        message.setText(text);
438        configureMessage(message);
439        return message;
440    }
441
442    /**
443     * Creates an initialized <CODE>BlobMessage</CODE> object. A
444     * <CODE>BlobMessage</CODE> object is used to send a message containing a
445     * <CODE>URL</CODE> which points to some network addressible BLOB.
446     *
447     * @param url the network addressable URL used to pass directly to the
448     *                consumer
449     * @return a BlobMessage
450     * @throws JMSException if the JMS provider fails to create this message due
451     *                 to some internal error.
452     */
453    public BlobMessage createBlobMessage(URL url) throws JMSException {
454        return createBlobMessage(url, false);
455    }
456
457    /**
458     * Creates an initialized <CODE>BlobMessage</CODE> object. A
459     * <CODE>BlobMessage</CODE> object is used to send a message containing a
460     * <CODE>URL</CODE> which points to some network addressible BLOB.
461     *
462     * @param url the network addressable URL used to pass directly to the
463     *                consumer
464     * @param deletedByBroker indicates whether or not the resource is deleted
465     *                by the broker when the message is acknowledged
466     * @return a BlobMessage
467     * @throws JMSException if the JMS provider fails to create this message due
468     *                 to some internal error.
469     */
470    public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException {
471        ActiveMQBlobMessage message = new ActiveMQBlobMessage();
472        configureMessage(message);
473        message.setURL(url);
474        message.setDeletedByBroker(deletedByBroker);
475        message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
476        return message;
477    }
478
479    /**
480     * Creates an initialized <CODE>BlobMessage</CODE> object. A
481     * <CODE>BlobMessage</CODE> object is used to send a message containing
482     * the <CODE>File</CODE> content. Before the message is sent the file
483     * conent will be uploaded to the broker or some other remote repository
484     * depending on the {@link #getBlobTransferPolicy()}.
485     *
486     * @param file the file to be uploaded to some remote repo (or the broker)
487     *                depending on the strategy
488     * @return a BlobMessage
489     * @throws JMSException if the JMS provider fails to create this message due
490     *                 to some internal error.
491     */
492    public BlobMessage createBlobMessage(File file) throws JMSException {
493        ActiveMQBlobMessage message = new ActiveMQBlobMessage();
494        configureMessage(message);
495        message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file));
496        message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy())));
497        message.setDeletedByBroker(true);
498        message.setName(file.getName());
499        return message;
500    }
501
502    /**
503     * Creates an initialized <CODE>BlobMessage</CODE> object. A
504     * <CODE>BlobMessage</CODE> object is used to send a message containing
505     * the <CODE>File</CODE> content. Before the message is sent the file
506     * conent will be uploaded to the broker or some other remote repository
507     * depending on the {@link #getBlobTransferPolicy()}. <br/>
508     * <p>
509     * The caller of this method is responsible for closing the
510     * input stream that is used, however the stream can not be closed
511     * until <b>after</b> the message has been sent.  To have this class
512     * manage the stream and close it automatically, use the method
513     * {@link ActiveMQSession#createBlobMessage(File)}
514     *
515     * @param in the stream to be uploaded to some remote repo (or the broker)
516     *                depending on the strategy
517     * @return a BlobMessage
518     * @throws JMSException if the JMS provider fails to create this message due
519     *                 to some internal error.
520     */
521    public BlobMessage createBlobMessage(InputStream in) throws JMSException {
522        ActiveMQBlobMessage message = new ActiveMQBlobMessage();
523        configureMessage(message);
524        message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in));
525        message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
526        message.setDeletedByBroker(true);
527        return message;
528    }
529
530    /**
531     * Indicates whether the session is in transacted mode.
532     *
533     * @return true if the session is in transacted mode
534     * @throws JMSException if there is some internal error.
535     */
536    @Override
537    public boolean getTransacted() throws JMSException {
538        checkClosed();
539        return isTransacted();
540    }
541
542    /**
543     * Returns the acknowledgement mode of the session. The acknowledgement mode
544     * is set at the time that the session is created. If the session is
545     * transacted, the acknowledgement mode is ignored.
546     *
547     * @return If the session is not transacted, returns the current
548     *         acknowledgement mode for the session. If the session is
549     *         transacted, returns SESSION_TRANSACTED.
550     * @throws JMSException
551     * @see javax.jms.Connection#createSession(boolean,int)
552     * @since 1.1 exception JMSException if there is some internal error.
553     */
554    @Override
555    public int getAcknowledgeMode() throws JMSException {
556        checkClosed();
557        return this.acknowledgementMode;
558    }
559
560    /**
561     * Commits all messages done in this transaction and releases any locks
562     * currently held.
563     *
564     * @throws JMSException if the JMS provider fails to commit the transaction
565     *                 due to some internal error.
566     * @throws TransactionRolledBackException if the transaction is rolled back
567     *                 due to some internal error during commit.
568     * @throws javax.jms.IllegalStateException if the method is not called by a
569     *                 transacted session.
570     */
571    @Override
572    public void commit() throws JMSException {
573        checkClosed();
574        if (!getTransacted()) {
575            throw new javax.jms.IllegalStateException("Not a transacted session");
576        }
577        if (LOG.isDebugEnabled()) {
578            LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId());
579        }
580        transactionContext.commit();
581    }
582
583    /**
584     * Rolls back any messages done in this transaction and releases any locks
585     * currently held.
586     *
587     * @throws JMSException if the JMS provider fails to roll back the
588     *                 transaction due to some internal error.
589     * @throws javax.jms.IllegalStateException if the method is not called by a
590     *                 transacted session.
591     */
592    @Override
593    public void rollback() throws JMSException {
594        checkClosed();
595        if (!getTransacted()) {
596            throw new javax.jms.IllegalStateException("Not a transacted session");
597        }
598        if (LOG.isDebugEnabled()) {
599            LOG.debug(getSessionId() + " Transaction Rollback, txid:"  + transactionContext.getTransactionId());
600        }
601        transactionContext.rollback();
602    }
603
604    /**
605     * Closes the session.
606     * <P>
607     * Since a provider may allocate some resources on behalf of a session
608     * outside the JVM, clients should close the resources when they are not
609     * needed. Relying on garbage collection to eventually reclaim these
610     * resources may not be timely enough.
611     * <P>
612     * There is no need to close the producers and consumers of a closed
613     * session.
614     * <P>
615     * This call will block until a <CODE>receive</CODE> call or message
616     * listener in progress has completed. A blocked message consumer
617     * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session
618     * is closed.
619     * <P>
620     * Closing a transacted session must roll back the transaction in progress.
621     * <P>
622     * This method is the only <CODE>Session</CODE> method that can be called
623     * concurrently.
624     * <P>
625     * Invoking any other <CODE>Session</CODE> method on a closed session must
626     * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a
627     * closed session must <I>not </I> throw an exception.
628     *
629     * @throws JMSException if the JMS provider fails to close the session due
630     *                 to some internal error.
631     */
632    @Override
633    public void close() throws JMSException {
634        if (!closed) {
635            if (getTransactionContext().isInXATransaction()) {
636                if (!synchronizationRegistered) {
637                    synchronizationRegistered = true;
638                    getTransactionContext().addSynchronization(new Synchronization() {
639
640                                        @Override
641                                        public void afterCommit() throws Exception {
642                                            doClose();
643                                            synchronizationRegistered = false;
644                                        }
645
646                                        @Override
647                                        public void afterRollback() throws Exception {
648                                            doClose();
649                                            synchronizationRegistered = false;
650                                        }
651                                    });
652                }
653
654            } else {
655                doClose();
656            }
657        }
658    }
659
660    private void doClose() throws JMSException {
661        boolean interrupted = Thread.interrupted();
662        dispose();
663        RemoveInfo removeCommand = info.createRemoveCommand();
664        removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
665        connection.asyncSendPacket(removeCommand);
666        if (interrupted) {
667            Thread.currentThread().interrupt();
668        }
669    }
670
671    final AtomicInteger clearRequestsCounter = new AtomicInteger(0);
672    void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) {
673        clearRequestsCounter.incrementAndGet();
674        executor.clearMessagesInProgress();
675        // we are called from inside the transport reconnection logic which involves us
676        // clearing all the connections' consumers dispatch and delivered lists. So rather
677        // than trying to grab a mutex (which could be already owned by the message listener
678        // calling the send or an ack) we allow it to complete in a separate thread via the
679        // scheduler and notify us via connection.transportInterruptionProcessingComplete()
680        //
681        // We must be careful though not to allow multiple calls to this method from a
682        // connection that is having issue becoming fully established from causing a large
683        // build up of scheduled tasks to clear the same consumers over and over.
684        if (consumers.isEmpty()) {
685            return;
686        }
687
688        if (clearInProgress.compareAndSet(false, true)) {
689            for (final ActiveMQMessageConsumer consumer : consumers) {
690                consumer.inProgressClearRequired();
691                transportInterruptionProcessingComplete.incrementAndGet();
692                try {
693                    connection.getScheduler().executeAfterDelay(new Runnable() {
694                        @Override
695                        public void run() {
696                            consumer.clearMessagesInProgress();
697                        }}, 0l);
698                } catch (JMSException e) {
699                    connection.onClientInternalException(e);
700                }
701            }
702
703            try {
704                connection.getScheduler().executeAfterDelay(new Runnable() {
705                    @Override
706                    public void run() {
707                        clearInProgress.set(false);
708                    }}, 0l);
709            } catch (JMSException e) {
710                connection.onClientInternalException(e);
711            }
712        }
713    }
714
715    void deliverAcks() {
716        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
717            ActiveMQMessageConsumer consumer = iter.next();
718            consumer.deliverAcks();
719        }
720    }
721
722    public synchronized void dispose() throws JMSException {
723        if (!closed) {
724
725            try {
726                executor.stop();
727
728                for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
729                    ActiveMQMessageConsumer consumer = iter.next();
730                    consumer.setFailureError(connection.getFirstFailureError());
731                    consumer.dispose();
732                    lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId());
733                }
734                consumers.clear();
735
736                for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) {
737                    ActiveMQMessageProducer producer = iter.next();
738                    producer.dispose();
739                }
740                producers.clear();
741
742                try {
743                    if (getTransactionContext().isInLocalTransaction()) {
744                        rollback();
745                    }
746                } catch (JMSException e) {
747                }
748
749            } finally {
750                connection.removeSession(this);
751                this.transactionContext = null;
752                closed = true;
753            }
754        }
755    }
756
757    /**
758     * Checks that the session is not closed then configures the message
759     */
760    protected void configureMessage(ActiveMQMessage message) throws IllegalStateException {
761        checkClosed();
762        message.setConnection(connection);
763    }
764
765    /**
766     * Check if the session is closed. It is used for ensuring that the session
767     * is open before performing various operations.
768     *
769     * @throws IllegalStateException if the Session is closed
770     */
771    protected void checkClosed() throws IllegalStateException {
772        if (closed) {
773            throw new IllegalStateException("The Session is closed");
774        }
775    }
776
777    /**
778     * Checks if the session is closed.
779     *
780     * @return true if the session is closed, false otherwise.
781     */
782    public boolean isClosed() {
783        return closed;
784    }
785
786    /**
787     * Stops message delivery in this session, and restarts message delivery
788     * with the oldest unacknowledged message.
789     * <P>
790     * All consumers deliver messages in a serial order. Acknowledging a
791     * received message automatically acknowledges all messages that have been
792     * delivered to the client.
793     * <P>
794     * Restarting a session causes it to take the following actions:
795     * <UL>
796     * <LI>Stop message delivery
797     * <LI>Mark all messages that might have been delivered but not
798     * acknowledged as "redelivered"
799     * <LI>Restart the delivery sequence including all unacknowledged messages
800     * that had been previously delivered. Redelivered messages do not have to
801     * be delivered in exactly their original delivery order.
802     * </UL>
803     *
804     * @throws JMSException if the JMS provider fails to stop and restart
805     *                 message delivery due to some internal error.
806     * @throws IllegalStateException if the method is called by a transacted
807     *                 session.
808     */
809    @Override
810    public void recover() throws JMSException {
811
812        checkClosed();
813        if (getTransacted()) {
814            throw new IllegalStateException("This session is transacted");
815        }
816
817        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
818            ActiveMQMessageConsumer c = iter.next();
819            c.rollback();
820        }
821
822    }
823
824    /**
825     * Returns the session's distinguished message listener (optional).
826     *
827     * @return the message listener associated with this session
828     * @throws JMSException if the JMS provider fails to get the message
829     *                 listener due to an internal error.
830     * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
831     * @see javax.jms.ServerSessionPool
832     * @see javax.jms.ServerSession
833     */
834    @Override
835    public MessageListener getMessageListener() throws JMSException {
836        checkClosed();
837        return this.messageListener;
838    }
839
840    /**
841     * Sets the session's distinguished message listener (optional).
842     * <P>
843     * When the distinguished message listener is set, no other form of message
844     * receipt in the session can be used; however, all forms of sending
845     * messages are still supported.
846     * <P>
847     * If this session has been closed, then an {@link IllegalStateException} is
848     * thrown, if trying to set a new listener. However setting the listener
849     * to <tt>null</tt> is allowed, to clear the listener, even if this session
850     * has been closed prior.
851     * <P>
852     * This is an expert facility not used by regular JMS clients.
853     *
854     * @param listener the message listener to associate with this session
855     * @throws JMSException if the JMS provider fails to set the message
856     *                 listener due to an internal error.
857     * @see javax.jms.Session#getMessageListener()
858     * @see javax.jms.ServerSessionPool
859     * @see javax.jms.ServerSession
860     */
861    @Override
862    public void setMessageListener(MessageListener listener) throws JMSException {
863        // only check for closed if we set a new listener, as we allow to clear
864        // the listener, such as when an application is shutting down, and is
865        // no longer using a message listener on this session
866        if (listener != null) {
867            checkClosed();
868        }
869        this.messageListener = listener;
870
871        if (listener != null) {
872            executor.setDispatchedBySessionPool(true);
873        }
874    }
875
876    /**
877     * Optional operation, intended to be used only by Application Servers, not
878     * by ordinary JMS clients.
879     *
880     * @see javax.jms.ServerSession
881     */
882    @Override
883    public void run() {
884        MessageDispatch messageDispatch;
885        while ((messageDispatch = executor.dequeueNoWait()) != null) {
886            final MessageDispatch md = messageDispatch;
887            ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
888
889            MessageAck earlyAck = null;
890            if (message.isExpired()) {
891                earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1);
892            } else if (connection.isDuplicate(ActiveMQSession.this, message)) {
893                LOG.debug("{} got duplicate: {}", this, message.getMessageId());
894                earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
895                earlyAck.setFirstMessageId(md.getMessage().getMessageId());
896                earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this));
897            }
898            if (earlyAck != null) {
899                try {
900                    asyncSendPacket(earlyAck);
901                } catch (Throwable t) {
902                    LOG.error("error dispatching ack: {} ", earlyAck, t);
903                    connection.onClientInternalException(t);
904                } finally {
905                    continue;
906                }
907            }
908
909            if (isClientAcknowledge()||isIndividualAcknowledge()) {
910                message.setAcknowledgeCallback(new Callback() {
911                    @Override
912                    public void execute() throws Exception {
913                    }
914                });
915            }
916
917            if (deliveryListener != null) {
918                deliveryListener.beforeDelivery(this, message);
919            }
920
921            md.setDeliverySequenceId(getNextDeliveryId());
922
923            final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
924            try {
925                ack.setFirstMessageId(md.getMessage().getMessageId());
926                doStartTransaction();
927                ack.setTransactionId(getTransactionContext().getTransactionId());
928                if (ack.getTransactionId() != null) {
929                    getTransactionContext().addSynchronization(new Synchronization() {
930
931                        final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get());
932                        @Override
933                        public void beforeEnd() throws Exception {
934                            // validate our consumer so we don't push stale acks that get ignored
935                            if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
936                                LOG.debug("forcing rollback - {} consumer no longer active on {}", ack, connection);
937                                throw new TransactionRolledBackException("consumer " + ack.getConsumerId() + " no longer active on " + connection);
938                            }
939                            LOG.trace("beforeEnd ack {}", ack);
940                            sendAck(ack);
941                        }
942
943                        @Override
944                        public void afterRollback() throws Exception {
945                            LOG.trace("rollback {}", ack, new Throwable("here"));
946                            // ensure we don't filter this as a duplicate
947                            connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
948
949                            // don't redeliver if we have been interrupted b/c the broker will redeliver on reconnect
950                            if (clearRequestsCounter.get() > clearRequestCount) {
951                                LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport());
952                                return;
953                            }
954
955                            // validate our consumer so we don't push stale acks that get ignored or redeliver what will be redispatched
956                            if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
957                                LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport());
958                                return;
959                            }
960
961                            RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
962                            int redeliveryCounter = md.getMessage().getRedeliveryCounter();
963                            if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
964                                && redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) {
965                                // We need to NACK the messages so that they get
966                                // sent to the
967                                // DLQ.
968                                // Acknowledge the last message.
969                                MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
970                                ack.setFirstMessageId(md.getMessage().getMessageId());
971                                ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy));
972                                asyncSendPacket(ack);
973
974                            } else {
975
976                                MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
977                                ack.setFirstMessageId(md.getMessage().getMessageId());
978                                asyncSendPacket(ack);
979
980                                // Figure out how long we should wait to resend
981                                // this message.
982                                long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
983                                for (int i = 0; i < redeliveryCounter; i++) {
984                                    redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
985                                }
986                                connection.getScheduler().executeAfterDelay(new Runnable() {
987
988                                    @Override
989                                    public void run() {
990                                        ((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
991                                    }
992                                }, redeliveryDelay);
993                            }
994                            md.getMessage().onMessageRolledBack();
995                        }
996                    });
997                }
998
999                LOG.trace("{} onMessage({})", this, message.getMessageId());
1000                messageListener.onMessage(message);
1001
1002            } catch (Throwable e) {
1003                LOG.error("error dispatching message: ", e);
1004                // A problem while invoking the MessageListener does not
1005                // in general indicate a problem with the connection to the broker, i.e.
1006                // it will usually be sufficient to let the afterDelivery() method either
1007                // commit or roll back in order to deal with the exception.
1008                // However, we notify any registered client internal exception listener
1009                // of the problem.
1010                connection.onClientInternalException(e);
1011            } finally {
1012                if (ack.getTransactionId() == null) {
1013                    try {
1014                        asyncSendPacket(ack);
1015                    } catch (Throwable e) {
1016                        connection.onClientInternalException(e);
1017                    }
1018                }
1019            }
1020
1021            if (deliveryListener != null) {
1022                deliveryListener.afterDelivery(this, message);
1023            }
1024        }
1025    }
1026
1027    /**
1028     * Creates a <CODE>MessageProducer</CODE> to send messages to the
1029     * specified destination.
1030     * <P>
1031     * A client uses a <CODE>MessageProducer</CODE> object to send messages to
1032     * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both
1033     * inherit from <CODE>Destination</CODE>, they can be used in the
1034     * destination parameter to create a <CODE>MessageProducer</CODE> object.
1035     *
1036     * @param destination the <CODE>Destination</CODE> to send to, or null if
1037     *                this is a producer which does not have a specified
1038     *                destination.
1039     * @return the MessageProducer
1040     * @throws JMSException if the session fails to create a MessageProducer due
1041     *                 to some internal error.
1042     * @throws InvalidDestinationException if an invalid destination is
1043     *                 specified.
1044     * @since 1.1
1045     */
1046    @Override
1047    public MessageProducer createProducer(Destination destination) throws JMSException {
1048        checkClosed();
1049        if (destination instanceof CustomDestination) {
1050            CustomDestination customDestination = (CustomDestination)destination;
1051            return customDestination.createProducer(this);
1052        }
1053        int timeSendOut = connection.getSendTimeout();
1054        return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut);
1055    }
1056
1057    /**
1058     * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
1059     * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
1060     * <CODE>Destination</CODE>, they can be used in the destination
1061     * parameter to create a <CODE>MessageConsumer</CODE>.
1062     *
1063     * @param destination the <CODE>Destination</CODE> to access.
1064     * @return the MessageConsumer
1065     * @throws JMSException if the session fails to create a consumer due to
1066     *                 some internal error.
1067     * @throws InvalidDestinationException if an invalid destination is
1068     *                 specified.
1069     * @since 1.1
1070     */
1071    @Override
1072    public MessageConsumer createConsumer(Destination destination) throws JMSException {
1073        return createConsumer(destination, (String) null);
1074    }
1075
1076    /**
1077     * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
1078     * using a message selector. Since <CODE> Queue</CODE> and
1079     * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
1080     * can be used in the destination parameter to create a
1081     * <CODE>MessageConsumer</CODE>.
1082     * <P>
1083     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1084     * that have been sent to a destination.
1085     *
1086     * @param destination the <CODE>Destination</CODE> to access
1087     * @param messageSelector only messages with properties matching the message
1088     *                selector expression are delivered. A value of null or an
1089     *                empty string indicates that there is no message selector
1090     *                for the message consumer.
1091     * @return the MessageConsumer
1092     * @throws JMSException if the session fails to create a MessageConsumer due
1093     *                 to some internal error.
1094     * @throws InvalidDestinationException if an invalid destination is
1095     *                 specified.
1096     * @throws InvalidSelectorException if the message selector is invalid.
1097     * @since 1.1
1098     */
1099    @Override
1100    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
1101        return createConsumer(destination, messageSelector, false);
1102    }
1103
1104    /**
1105     * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
1106     * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
1107     * <CODE>Destination</CODE>, they can be used in the destination
1108     * parameter to create a <CODE>MessageConsumer</CODE>.
1109     *
1110     * @param destination the <CODE>Destination</CODE> to access.
1111     * @param messageListener the listener to use for async consumption of messages
1112     * @return the MessageConsumer
1113     * @throws JMSException if the session fails to create a consumer due to
1114     *                 some internal error.
1115     * @throws InvalidDestinationException if an invalid destination is
1116     *                 specified.
1117     * @since 1.1
1118     */
1119    public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException {
1120        return createConsumer(destination, null, messageListener);
1121    }
1122
1123    /**
1124     * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
1125     * using a message selector. Since <CODE> Queue</CODE> and
1126     * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
1127     * can be used in the destination parameter to create a
1128     * <CODE>MessageConsumer</CODE>.
1129     * <P>
1130     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1131     * that have been sent to a destination.
1132     *
1133     * @param destination the <CODE>Destination</CODE> to access
1134     * @param messageSelector only messages with properties matching the message
1135     *                selector expression are delivered. A value of null or an
1136     *                empty string indicates that there is no message selector
1137     *                for the message consumer.
1138     * @param messageListener the listener to use for async consumption of messages
1139     * @return the MessageConsumer
1140     * @throws JMSException if the session fails to create a MessageConsumer due
1141     *                 to some internal error.
1142     * @throws InvalidDestinationException if an invalid destination is
1143     *                 specified.
1144     * @throws InvalidSelectorException if the message selector is invalid.
1145     * @since 1.1
1146     */
1147    public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException {
1148        return createConsumer(destination, messageSelector, false, messageListener);
1149    }
1150
1151    /**
1152     * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1153     * using a message selector. This method can specify whether messages
1154     * published by its own connection should be delivered to it, if the
1155     * destination is a topic.
1156     * <P>
1157     * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1158     * <CODE>Destination</CODE>, they can be used in the destination
1159     * parameter to create a <CODE>MessageConsumer</CODE>.
1160     * <P>
1161     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1162     * that have been published to a destination.
1163     * <P>
1164     * In some cases, a connection may both publish and subscribe to a topic.
1165     * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1166     * inhibit the delivery of messages published by its own connection. The
1167     * default value for this attribute is False. The <CODE>noLocal</CODE>
1168     * value must be supported by destinations that are topics.
1169     *
1170     * @param destination the <CODE>Destination</CODE> to access
1171     * @param messageSelector only messages with properties matching the message
1172     *                selector expression are delivered. A value of null or an
1173     *                empty string indicates that there is no message selector
1174     *                for the message consumer.
1175     * @param noLocal - if true, and the destination is a topic, inhibits the
1176     *                delivery of messages published by its own connection. The
1177     *                behavior for <CODE>NoLocal</CODE> is not specified if
1178     *                the destination is a queue.
1179     * @return the MessageConsumer
1180     * @throws JMSException if the session fails to create a MessageConsumer due
1181     *                 to some internal error.
1182     * @throws InvalidDestinationException if an invalid destination is
1183     *                 specified.
1184     * @throws InvalidSelectorException if the message selector is invalid.
1185     * @since 1.1
1186     */
1187    @Override
1188    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
1189        return createConsumer(destination, messageSelector, noLocal, null);
1190    }
1191
1192    /**
1193     * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1194     * using a message selector. This method can specify whether messages
1195     * published by its own connection should be delivered to it, if the
1196     * destination is a topic.
1197     * <P>
1198     * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1199     * <CODE>Destination</CODE>, they can be used in the destination
1200     * parameter to create a <CODE>MessageConsumer</CODE>.
1201     * <P>
1202     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1203     * that have been published to a destination.
1204     * <P>
1205     * In some cases, a connection may both publish and subscribe to a topic.
1206     * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1207     * inhibit the delivery of messages published by its own connection. The
1208     * default value for this attribute is False. The <CODE>noLocal</CODE>
1209     * value must be supported by destinations that are topics.
1210     *
1211     * @param destination the <CODE>Destination</CODE> to access
1212     * @param messageSelector only messages with properties matching the message
1213     *                selector expression are delivered. A value of null or an
1214     *                empty string indicates that there is no message selector
1215     *                for the message consumer.
1216     * @param noLocal - if true, and the destination is a topic, inhibits the
1217     *                delivery of messages published by its own connection. The
1218     *                behavior for <CODE>NoLocal</CODE> is not specified if
1219     *                the destination is a queue.
1220     * @param messageListener the listener to use for async consumption of messages
1221     * @return the MessageConsumer
1222     * @throws JMSException if the session fails to create a MessageConsumer due
1223     *                 to some internal error.
1224     * @throws InvalidDestinationException if an invalid destination is
1225     *                 specified.
1226     * @throws InvalidSelectorException if the message selector is invalid.
1227     * @since 1.1
1228     */
1229    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
1230        checkClosed();
1231
1232        if (destination instanceof CustomDestination) {
1233            CustomDestination customDestination = (CustomDestination)destination;
1234            return customDestination.createConsumer(this, messageSelector, noLocal);
1235        }
1236
1237        ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
1238        int prefetch = 0;
1239        if (destination instanceof Topic) {
1240            prefetch = prefetchPolicy.getTopicPrefetch();
1241        } else {
1242            prefetch = prefetchPolicy.getQueuePrefetch();
1243        }
1244        ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
1245        return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
1246                prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener);
1247    }
1248
1249    /**
1250     * Creates a queue identity given a <CODE>Queue</CODE> name.
1251     * <P>
1252     * This facility is provided for the rare cases where clients need to
1253     * dynamically manipulate queue identity. It allows the creation of a queue
1254     * identity with a provider-specific name. Clients that depend on this
1255     * ability are not portable.
1256     * <P>
1257     * Note that this method is not for creating the physical queue. The
1258     * physical creation of queues is an administrative task and is not to be
1259     * initiated by the JMS API. The one exception is the creation of temporary
1260     * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE>
1261     * method.
1262     *
1263     * @param queueName the name of this <CODE>Queue</CODE>
1264     * @return a <CODE>Queue</CODE> with the given name
1265     * @throws JMSException if the session fails to create a queue due to some
1266     *                 internal error.
1267     * @since 1.1
1268     */
1269    @Override
1270    public Queue createQueue(String queueName) throws JMSException {
1271        checkClosed();
1272        if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1273            return new ActiveMQTempQueue(queueName);
1274        }
1275        return new ActiveMQQueue(queueName);
1276    }
1277
1278    /**
1279     * Creates a topic identity given a <CODE>Topic</CODE> name.
1280     * <P>
1281     * This facility is provided for the rare cases where clients need to
1282     * dynamically manipulate topic identity. This allows the creation of a
1283     * topic identity with a provider-specific name. Clients that depend on this
1284     * ability are not portable.
1285     * <P>
1286     * Note that this method is not for creating the physical topic. The
1287     * physical creation of topics is an administrative task and is not to be
1288     * initiated by the JMS API. The one exception is the creation of temporary
1289     * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE>
1290     * method.
1291     *
1292     * @param topicName the name of this <CODE>Topic</CODE>
1293     * @return a <CODE>Topic</CODE> with the given name
1294     * @throws JMSException if the session fails to create a topic due to some
1295     *                 internal error.
1296     * @since 1.1
1297     */
1298    @Override
1299    public Topic createTopic(String topicName) throws JMSException {
1300        checkClosed();
1301        if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1302            return new ActiveMQTempTopic(topicName);
1303        }
1304        return new ActiveMQTopic(topicName);
1305    }
1306
1307    /**
1308     * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1309     * the specified queue.
1310     *
1311     * @param queue the <CODE>queue</CODE> to access
1312     * @exception InvalidDestinationException if an invalid destination is
1313     *                    specified
1314     * @since 1.1
1315     */
1316    /**
1317     * Creates a durable subscriber to the specified topic.
1318     * <P>
1319     * If a client needs to receive all the messages published on a topic,
1320     * including the ones published while the subscriber is inactive, it uses a
1321     * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1322     * record of this durable subscription and insures that all messages from
1323     * the topic's publishers are retained until they are acknowledged by this
1324     * durable subscriber or they have expired.
1325     * <P>
1326     * Sessions with durable subscribers must always provide the same client
1327     * identifier. In addition, each client must specify a name that uniquely
1328     * identifies (within client identifier) each durable subscription it
1329     * creates. Only one session at a time can have a
1330     * <CODE>TopicSubscriber</CODE> for a particular durable subscription.
1331     * <P>
1332     * A client can change an existing durable subscription by creating a
1333     * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1334     * and/or message selector. Changing a durable subscriber is equivalent to
1335     * unsubscribing (deleting) the old one and creating a new one.
1336     * <P>
1337     * In some cases, a connection may both publish and subscribe to a topic.
1338     * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1339     * inhibit the delivery of messages published by its own connection. The
1340     * default value for this attribute is false.
1341     *
1342     * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1343     * @param name the name used to identify this subscription
1344     * @return the TopicSubscriber
1345     * @throws JMSException if the session fails to create a subscriber due to
1346     *                 some internal error.
1347     * @throws InvalidDestinationException if an invalid topic is specified.
1348     * @since 1.1
1349     */
1350    @Override
1351    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
1352        checkClosed();
1353        return createDurableSubscriber(topic, name, null, false);
1354    }
1355
1356    /**
1357     * Creates a durable subscriber to the specified topic, using a message
1358     * selector and specifying whether messages published by its own connection
1359     * should be delivered to it.
1360     * <P>
1361     * If a client needs to receive all the messages published on a topic,
1362     * including the ones published while the subscriber is inactive, it uses a
1363     * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1364     * record of this durable subscription and insures that all messages from
1365     * the topic's publishers are retained until they are acknowledged by this
1366     * durable subscriber or they have expired.
1367     * <P>
1368     * Sessions with durable subscribers must always provide the same client
1369     * identifier. In addition, each client must specify a name which uniquely
1370     * identifies (within client identifier) each durable subscription it
1371     * creates. Only one session at a time can have a
1372     * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An
1373     * inactive durable subscriber is one that exists but does not currently
1374     * have a message consumer associated with it.
1375     * <P>
1376     * A client can change an existing durable subscription by creating a
1377     * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1378     * and/or message selector. Changing a durable subscriber is equivalent to
1379     * unsubscribing (deleting) the old one and creating a new one.
1380     *
1381     * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1382     * @param name the name used to identify this subscription
1383     * @param messageSelector only messages with properties matching the message
1384     *                selector expression are delivered. A value of null or an
1385     *                empty string indicates that there is no message selector
1386     *                for the message consumer.
1387     * @param noLocal if set, inhibits the delivery of messages published by its
1388     *                own connection
1389     * @return the Queue Browser
1390     * @throws JMSException if the session fails to create a subscriber due to
1391     *                 some internal error.
1392     * @throws InvalidDestinationException if an invalid topic is specified.
1393     * @throws InvalidSelectorException if the message selector is invalid.
1394     * @since 1.1
1395     */
1396    @Override
1397    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
1398        checkClosed();
1399
1400        if (topic == null) {
1401            throw new InvalidDestinationException("Topic cannot be null");
1402        }
1403
1404        if (topic instanceof CustomDestination) {
1405            CustomDestination customDestination = (CustomDestination)topic;
1406            return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal);
1407        }
1408
1409        connection.checkClientIDWasManuallySpecified();
1410        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1411        int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch();
1412        int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit();
1413        return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit,
1414                                           noLocal, false, asyncDispatch);
1415    }
1416
1417    /**
1418     * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1419     * the specified queue.
1420     *
1421     * @param queue the <CODE>queue</CODE> to access
1422     * @return the Queue Browser
1423     * @throws JMSException if the session fails to create a browser due to some
1424     *                 internal error.
1425     * @throws InvalidDestinationException if an invalid destination is
1426     *                 specified
1427     * @since 1.1
1428     */
1429    @Override
1430    public QueueBrowser createBrowser(Queue queue) throws JMSException {
1431        checkClosed();
1432        return createBrowser(queue, null);
1433    }
1434
1435    /**
1436     * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1437     * the specified queue using a message selector.
1438     *
1439     * @param queue the <CODE>queue</CODE> to access
1440     * @param messageSelector only messages with properties matching the message
1441     *                selector expression are delivered. A value of null or an
1442     *                empty string indicates that there is no message selector
1443     *                for the message consumer.
1444     * @return the Queue Browser
1445     * @throws JMSException if the session fails to create a browser due to some
1446     *                 internal error.
1447     * @throws InvalidDestinationException if an invalid destination is
1448     *                 specified
1449     * @throws InvalidSelectorException if the message selector is invalid.
1450     * @since 1.1
1451     */
1452    @Override
1453    public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
1454        checkClosed();
1455        return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch);
1456    }
1457
1458    /**
1459     * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that
1460     * of the <CODE>Connection</CODE> unless it is deleted earlier.
1461     *
1462     * @return a temporary queue identity
1463     * @throws JMSException if the session fails to create a temporary queue due
1464     *                 to some internal error.
1465     * @since 1.1
1466     */
1467    @Override
1468    public TemporaryQueue createTemporaryQueue() throws JMSException {
1469        checkClosed();
1470        return (TemporaryQueue)connection.createTempDestination(false);
1471    }
1472
1473    /**
1474     * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that
1475     * of the <CODE>Connection</CODE> unless it is deleted earlier.
1476     *
1477     * @return a temporary topic identity
1478     * @throws JMSException if the session fails to create a temporary topic due
1479     *                 to some internal error.
1480     * @since 1.1
1481     */
1482    @Override
1483    public TemporaryTopic createTemporaryTopic() throws JMSException {
1484        checkClosed();
1485        return (TemporaryTopic)connection.createTempDestination(true);
1486    }
1487
1488    /**
1489     * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1490     * the specified queue.
1491     *
1492     * @param queue the <CODE>Queue</CODE> to access
1493     * @return
1494     * @throws JMSException if the session fails to create a receiver due to
1495     *                 some internal error.
1496     * @throws JMSException
1497     * @throws InvalidDestinationException if an invalid queue is specified.
1498     */
1499    @Override
1500    public QueueReceiver createReceiver(Queue queue) throws JMSException {
1501        checkClosed();
1502        return createReceiver(queue, null);
1503    }
1504
1505    /**
1506     * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1507     * the specified queue using a message selector.
1508     *
1509     * @param queue the <CODE>Queue</CODE> to access
1510     * @param messageSelector only messages with properties matching the message
1511     *                selector expression are delivered. A value of null or an
1512     *                empty string indicates that there is no message selector
1513     *                for the message consumer.
1514     * @return QueueReceiver
1515     * @throws JMSException if the session fails to create a receiver due to
1516     *                 some internal error.
1517     * @throws InvalidDestinationException if an invalid queue is specified.
1518     * @throws InvalidSelectorException if the message selector is invalid.
1519     */
1520    @Override
1521    public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
1522        checkClosed();
1523
1524        if (queue instanceof CustomDestination) {
1525            CustomDestination customDestination = (CustomDestination)queue;
1526            return customDestination.createReceiver(this, messageSelector);
1527        }
1528
1529        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1530        return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(),
1531                                         prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch);
1532    }
1533
1534    /**
1535     * Creates a <CODE>QueueSender</CODE> object to send messages to the
1536     * specified queue.
1537     *
1538     * @param queue the <CODE>Queue</CODE> to access, or null if this is an
1539     *                unidentified producer
1540     * @return QueueSender
1541     * @throws JMSException if the session fails to create a sender due to some
1542     *                 internal error.
1543     * @throws InvalidDestinationException if an invalid queue is specified.
1544     */
1545    @Override
1546    public QueueSender createSender(Queue queue) throws JMSException {
1547        checkClosed();
1548        if (queue instanceof CustomDestination) {
1549            CustomDestination customDestination = (CustomDestination)queue;
1550            return customDestination.createSender(this);
1551        }
1552        int timeSendOut = connection.getSendTimeout();
1553        return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut);
1554    }
1555
1556    /**
1557     * Creates a nondurable subscriber to the specified topic. <p/>
1558     * <P>
1559     * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1560     * that have been published to a topic. <p/>
1561     * <P>
1562     * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1563     * receive only messages that are published while they are active. <p/>
1564     * <P>
1565     * In some cases, a connection may both publish and subscribe to a topic.
1566     * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1567     * inhibit the delivery of messages published by its own connection. The
1568     * default value for this attribute is false.
1569     *
1570     * @param topic the <CODE>Topic</CODE> to subscribe to
1571     * @return TopicSubscriber
1572     * @throws JMSException if the session fails to create a subscriber due to
1573     *                 some internal error.
1574     * @throws InvalidDestinationException if an invalid topic is specified.
1575     */
1576    @Override
1577    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
1578        checkClosed();
1579        return createSubscriber(topic, null, false);
1580    }
1581
1582    /**
1583     * Creates a nondurable subscriber to the specified topic, using a message
1584     * selector or specifying whether messages published by its own connection
1585     * should be delivered to it. <p/>
1586     * <P>
1587     * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1588     * that have been published to a topic. <p/>
1589     * <P>
1590     * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1591     * receive only messages that are published while they are active. <p/>
1592     * <P>
1593     * Messages filtered out by a subscriber's message selector will never be
1594     * delivered to the subscriber. From the subscriber's perspective, they do
1595     * not exist. <p/>
1596     * <P>
1597     * In some cases, a connection may both publish and subscribe to a topic.
1598     * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1599     * inhibit the delivery of messages published by its own connection. The
1600     * default value for this attribute is false.
1601     *
1602     * @param topic the <CODE>Topic</CODE> to subscribe to
1603     * @param messageSelector only messages with properties matching the message
1604     *                selector expression are delivered. A value of null or an
1605     *                empty string indicates that there is no message selector
1606     *                for the message consumer.
1607     * @param noLocal if set, inhibits the delivery of messages published by its
1608     *                own connection
1609     * @return TopicSubscriber
1610     * @throws JMSException if the session fails to create a subscriber due to
1611     *                 some internal error.
1612     * @throws InvalidDestinationException if an invalid topic is specified.
1613     * @throws InvalidSelectorException if the message selector is invalid.
1614     */
1615    @Override
1616    public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
1617        checkClosed();
1618
1619        if (topic instanceof CustomDestination) {
1620            CustomDestination customDestination = (CustomDestination)topic;
1621            return customDestination.createSubscriber(this, messageSelector, noLocal);
1622        }
1623
1624        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1625        return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy
1626            .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch);
1627    }
1628
1629    /**
1630     * Creates a publisher for the specified topic. <p/>
1631     * <P>
1632     * A client uses a <CODE>TopicPublisher</CODE> object to publish messages
1633     * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on
1634     * a topic, it defines a new sequence of messages that have no ordering
1635     * relationship with the messages it has previously sent.
1636     *
1637     * @param topic the <CODE>Topic</CODE> to publish to, or null if this is
1638     *                an unidentified producer
1639     * @return TopicPublisher
1640     * @throws JMSException if the session fails to create a publisher due to
1641     *                 some internal error.
1642     * @throws InvalidDestinationException if an invalid topic is specified.
1643     */
1644    @Override
1645    public TopicPublisher createPublisher(Topic topic) throws JMSException {
1646        checkClosed();
1647
1648        if (topic instanceof CustomDestination) {
1649            CustomDestination customDestination = (CustomDestination)topic;
1650            return customDestination.createPublisher(this);
1651        }
1652        int timeSendOut = connection.getSendTimeout();
1653        return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut);
1654    }
1655
1656    /**
1657     * Unsubscribes a durable subscription that has been created by a client.
1658     * <P>
1659     * This method deletes the state being maintained on behalf of the
1660     * subscriber by its provider.
1661     * <P>
1662     * It is erroneous for a client to delete a durable subscription while there
1663     * is an active <CODE>MessageConsumer </CODE> or
1664     * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
1665     * message is part of a pending transaction or has not been acknowledged in
1666     * the session.
1667     *
1668     * @param name the name used to identify this subscription
1669     * @throws JMSException if the session fails to unsubscribe to the durable
1670     *                 subscription due to some internal error.
1671     * @throws InvalidDestinationException if an invalid subscription name is
1672     *                 specified.
1673     * @since 1.1
1674     */
1675    @Override
1676    public void unsubscribe(String name) throws JMSException {
1677        checkClosed();
1678        connection.unsubscribe(name);
1679    }
1680
1681    @Override
1682    public void dispatch(MessageDispatch messageDispatch) {
1683        try {
1684            executor.execute(messageDispatch);
1685        } catch (InterruptedException e) {
1686            Thread.currentThread().interrupt();
1687            connection.onClientInternalException(e);
1688        }
1689    }
1690
1691    /**
1692     * Acknowledges all consumed messages of the session of this consumed
1693     * message.
1694     * <P>
1695     * All consumed JMS messages support the <CODE>acknowledge</CODE> method
1696     * for use when a client has specified that its JMS session's consumed
1697     * messages are to be explicitly acknowledged. By invoking
1698     * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges
1699     * all messages consumed by the session that the message was delivered to.
1700     * <P>
1701     * Calls to <CODE>acknowledge</CODE> are ignored for both transacted
1702     * sessions and sessions specified to use implicit acknowledgement modes.
1703     * <P>
1704     * A client may individually acknowledge each message as it is consumed, or
1705     * it may choose to acknowledge messages as an application-defined group
1706     * (which is done by calling acknowledge on the last received message of the
1707     * group, thereby acknowledging all messages consumed by the session.)
1708     * <P>
1709     * Messages that have been received but not acknowledged may be redelivered.
1710     *
1711     * @throws JMSException if the JMS provider fails to acknowledge the
1712     *                 messages due to some internal error.
1713     * @throws javax.jms.IllegalStateException if this method is called on a
1714     *                 closed session.
1715     * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
1716     */
1717    public void acknowledge() throws JMSException {
1718        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1719            ActiveMQMessageConsumer c = iter.next();
1720            c.acknowledge();
1721        }
1722    }
1723
1724    /**
1725     * Add a message consumer.
1726     *
1727     * @param consumer - message consumer.
1728     * @throws JMSException
1729     */
1730    protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1731        this.consumers.add(consumer);
1732        if (consumer.isDurableSubscriber()) {
1733            stats.onCreateDurableSubscriber();
1734        }
1735        this.connection.addDispatcher(consumer.getConsumerId(), this);
1736    }
1737
1738    /**
1739     * Remove the message consumer.
1740     *
1741     * @param consumer - consumer to be removed.
1742     * @throws JMSException
1743     */
1744    protected void removeConsumer(ActiveMQMessageConsumer consumer) {
1745        this.connection.removeDispatcher(consumer.getConsumerId());
1746        if (consumer.isDurableSubscriber()) {
1747            stats.onRemoveDurableSubscriber();
1748        }
1749        this.consumers.remove(consumer);
1750        this.connection.removeDispatcher(consumer);
1751    }
1752
1753    /**
1754     * Adds a message producer.
1755     *
1756     * @param producer - message producer to be added.
1757     * @throws JMSException
1758     */
1759    protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
1760        this.producers.add(producer);
1761        this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer);
1762    }
1763
1764    /**
1765     * Removes a message producer.
1766     *
1767     * @param producer - message producer to be removed.
1768     * @throws JMSException
1769     */
1770    protected void removeProducer(ActiveMQMessageProducer producer) {
1771        this.connection.removeProducer(producer.getProducerInfo().getProducerId());
1772        this.producers.remove(producer);
1773    }
1774
1775    /**
1776     * Start this Session.
1777     *
1778     * @throws JMSException
1779     */
1780    protected void start() throws JMSException {
1781        started.set(true);
1782        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1783            ActiveMQMessageConsumer c = iter.next();
1784            c.start();
1785        }
1786        executor.start();
1787    }
1788
1789    /**
1790     * Stops this session.
1791     *
1792     * @throws JMSException
1793     */
1794    protected void stop() throws JMSException {
1795
1796        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1797            ActiveMQMessageConsumer c = iter.next();
1798            c.stop();
1799        }
1800
1801        started.set(false);
1802        executor.stop();
1803    }
1804
1805    /**
1806     * Returns the session id.
1807     *
1808     * @return value - session id.
1809     */
1810    protected SessionId getSessionId() {
1811        return info.getSessionId();
1812    }
1813
1814    /**
1815     * @return
1816     */
1817    protected ConsumerId getNextConsumerId() {
1818        return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId());
1819    }
1820
1821    /**
1822     * @return
1823     */
1824    protected ProducerId getNextProducerId() {
1825        return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId());
1826    }
1827
1828    /**
1829     * Sends the message for dispatch by the broker.
1830     *
1831     *
1832     * @param producer - message producer.
1833     * @param destination - message destination.
1834     * @param message - message to be sent.
1835     * @param deliveryMode - JMS messsage delivery mode.
1836     * @param priority - message priority.
1837     * @param timeToLive - message expiration.
1838     * @param producerWindow
1839     * @param onComplete
1840     * @throws JMSException
1841     */
1842    protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
1843                        MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
1844
1845        checkClosed();
1846        if (destination.isTemporary() && connection.isDeleted(destination)) {
1847            throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
1848        }
1849        synchronized (sendMutex) {
1850            // tell the Broker we are about to start a new transaction
1851            doStartTransaction();
1852            TransactionId txid = transactionContext.getTransactionId();
1853            long sequenceNumber = producer.getMessageSequence();
1854
1855            //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
1856            message.setJMSDeliveryMode(deliveryMode);
1857            long expiration = 0L;
1858            if (!producer.getDisableMessageTimestamp()) {
1859                long timeStamp = System.currentTimeMillis();
1860                message.setJMSTimestamp(timeStamp);
1861                if (timeToLive > 0) {
1862                    expiration = timeToLive + timeStamp;
1863                }
1864            }
1865            message.setJMSExpiration(expiration);
1866            message.setJMSPriority(priority);
1867            message.setJMSRedelivered(false);
1868
1869            // transform to our own message format here
1870            ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
1871            msg.setDestination(destination);
1872            msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
1873
1874            // Set the message id.
1875            if (msg != message) {
1876                message.setJMSMessageID(msg.getMessageId().toString());
1877                // Make sure the JMS destination is set on the foreign messages too.
1878                message.setJMSDestination(destination);
1879            }
1880            //clear the brokerPath in case we are re-sending this message
1881            msg.setBrokerPath(null);
1882
1883            msg.setTransactionId(txid);
1884            if (connection.isCopyMessageOnSend()) {
1885                msg = (ActiveMQMessage)msg.copy();
1886            }
1887            msg.setConnection(connection);
1888            msg.onSend();
1889            msg.setProducerId(msg.getMessageId().getProducerId());
1890            if (LOG.isTraceEnabled()) {
1891                LOG.trace(getSessionId() + " sending message: " + msg);
1892            }
1893            if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
1894                this.connection.asyncSendPacket(msg);
1895                if (producerWindow != null) {
1896                    // Since we defer lots of the marshaling till we hit the
1897                    // wire, this might not
1898                    // provide and accurate size. We may change over to doing
1899                    // more aggressive marshaling,
1900                    // to get more accurate sizes.. this is more important once
1901                    // users start using producer window
1902                    // flow control.
1903                    int size = msg.getSize();
1904                    producerWindow.increaseUsage(size);
1905                }
1906            } else {
1907                if (sendTimeout > 0 && onComplete==null) {
1908                    this.connection.syncSendPacket(msg,sendTimeout);
1909                }else {
1910                    this.connection.syncSendPacket(msg, onComplete);
1911                }
1912            }
1913
1914        }
1915    }
1916
1917    /**
1918     * Send TransactionInfo to indicate transaction has started
1919     *
1920     * @throws JMSException if some internal error occurs
1921     */
1922    protected void doStartTransaction() throws JMSException {
1923        if (getTransacted() && !transactionContext.isInXATransaction()) {
1924            transactionContext.begin();
1925        }
1926    }
1927
1928    /**
1929     * Checks whether the session has unconsumed messages.
1930     *
1931     * @return true - if there are unconsumed messages.
1932     */
1933    public boolean hasUncomsumedMessages() {
1934        return executor.hasUncomsumedMessages();
1935    }
1936
1937    /**
1938     * Checks whether the session uses transactions.
1939     *
1940     * @return true - if the session uses transactions.
1941     */
1942    public boolean isTransacted() {
1943        return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction());
1944    }
1945
1946    /**
1947     * Checks whether the session used client acknowledgment.
1948     *
1949     * @return true - if the session uses client acknowledgment.
1950     */
1951    protected boolean isClientAcknowledge() {
1952        return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE;
1953    }
1954
1955    /**
1956     * Checks whether the session used auto acknowledgment.
1957     *
1958     * @return true - if the session uses client acknowledgment.
1959     */
1960    public boolean isAutoAcknowledge() {
1961        return acknowledgementMode == Session.AUTO_ACKNOWLEDGE;
1962    }
1963
1964    /**
1965     * Checks whether the session used dup ok acknowledgment.
1966     *
1967     * @return true - if the session uses client acknowledgment.
1968     */
1969    public boolean isDupsOkAcknowledge() {
1970        return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
1971    }
1972
1973    public boolean isIndividualAcknowledge(){
1974        return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
1975    }
1976
1977    /**
1978     * Returns the message delivery listener.
1979     *
1980     * @return deliveryListener - message delivery listener.
1981     */
1982    public DeliveryListener getDeliveryListener() {
1983        return deliveryListener;
1984    }
1985
1986    /**
1987     * Sets the message delivery listener.
1988     *
1989     * @param deliveryListener - message delivery listener.
1990     */
1991    public void setDeliveryListener(DeliveryListener deliveryListener) {
1992        this.deliveryListener = deliveryListener;
1993    }
1994
1995    /**
1996     * Returns the SessionInfo bean.
1997     *
1998     * @return info - SessionInfo bean.
1999     * @throws JMSException
2000     */
2001    protected SessionInfo getSessionInfo() throws JMSException {
2002        SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue());
2003        return info;
2004    }
2005
2006    /**
2007     * Send the asynchronus command.
2008     *
2009     * @param command - command to be executed.
2010     * @throws JMSException
2011     */
2012    public void asyncSendPacket(Command command) throws JMSException {
2013        connection.asyncSendPacket(command);
2014    }
2015
2016    /**
2017     * Send the synchronus command.
2018     *
2019     * @param command - command to be executed.
2020     * @return Response
2021     * @throws JMSException
2022     */
2023    public Response syncSendPacket(Command command) throws JMSException {
2024        return connection.syncSendPacket(command);
2025    }
2026
2027    public long getNextDeliveryId() {
2028        return deliveryIdGenerator.getNextSequenceId();
2029    }
2030
2031    public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException {
2032
2033        List<MessageDispatch> c = unconsumedMessages.removeAll();
2034        for (MessageDispatch md : c) {
2035            this.connection.rollbackDuplicate(dispatcher, md.getMessage());
2036        }
2037        Collections.reverse(c);
2038
2039        for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) {
2040            MessageDispatch md = iter.next();
2041            executor.executeFirst(md);
2042        }
2043
2044    }
2045
2046    public boolean isRunning() {
2047        return started.get();
2048    }
2049
2050    public boolean isAsyncDispatch() {
2051        return asyncDispatch;
2052    }
2053
2054    public void setAsyncDispatch(boolean asyncDispatch) {
2055        this.asyncDispatch = asyncDispatch;
2056    }
2057
2058    /**
2059     * @return Returns the sessionAsyncDispatch.
2060     */
2061    public boolean isSessionAsyncDispatch() {
2062        return sessionAsyncDispatch;
2063    }
2064
2065    /**
2066     * @param sessionAsyncDispatch The sessionAsyncDispatch to set.
2067     */
2068    public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) {
2069        this.sessionAsyncDispatch = sessionAsyncDispatch;
2070    }
2071
2072    public MessageTransformer getTransformer() {
2073        return transformer;
2074    }
2075
2076    public ActiveMQConnection getConnection() {
2077        return connection;
2078    }
2079
2080    /**
2081     * Sets the transformer used to transform messages before they are sent on
2082     * to the JMS bus or when they are received from the bus but before they are
2083     * delivered to the JMS client
2084     */
2085    public void setTransformer(MessageTransformer transformer) {
2086        this.transformer = transformer;
2087    }
2088
2089    public BlobTransferPolicy getBlobTransferPolicy() {
2090        return blobTransferPolicy;
2091    }
2092
2093    /**
2094     * Sets the policy used to describe how out-of-band BLOBs (Binary Large
2095     * OBjects) are transferred from producers to brokers to consumers
2096     */
2097    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
2098        this.blobTransferPolicy = blobTransferPolicy;
2099    }
2100
2101    public List<MessageDispatch> getUnconsumedMessages() {
2102        return executor.getUnconsumedMessages();
2103    }
2104
2105    @Override
2106    public String toString() {
2107        return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "}";
2108    }
2109
2110    public void checkMessageListener() throws JMSException {
2111        if (messageListener != null) {
2112            throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
2113        }
2114        for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) {
2115            ActiveMQMessageConsumer consumer = i.next();
2116            if (consumer.hasMessageListener()) {
2117                throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
2118            }
2119        }
2120    }
2121
2122    protected void setOptimizeAcknowledge(boolean value) {
2123        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2124            ActiveMQMessageConsumer c = iter.next();
2125            c.setOptimizeAcknowledge(value);
2126        }
2127    }
2128
2129    protected void setPrefetchSize(ConsumerId id, int prefetch) {
2130        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2131            ActiveMQMessageConsumer c = iter.next();
2132            if (c.getConsumerId().equals(id)) {
2133                c.setPrefetchSize(prefetch);
2134                break;
2135            }
2136        }
2137    }
2138
2139    protected void close(ConsumerId id) {
2140        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2141            ActiveMQMessageConsumer c = iter.next();
2142            if (c.getConsumerId().equals(id)) {
2143                try {
2144                    c.close();
2145                } catch (JMSException e) {
2146                    LOG.warn("Exception closing consumer", e);
2147                }
2148                LOG.warn("Closed consumer on Command, " + id);
2149                break;
2150            }
2151        }
2152    }
2153
2154    public boolean isInUse(ActiveMQTempDestination destination) {
2155        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2156            ActiveMQMessageConsumer c = iter.next();
2157            if (c.isInUse(destination)) {
2158                return true;
2159            }
2160        }
2161        return false;
2162    }
2163
2164    /**
2165     * highest sequence id of the last message delivered by this session.
2166     * Passed to the broker in the close command, maintained by dispose()
2167     * @return lastDeliveredSequenceId
2168     */
2169    public long getLastDeliveredSequenceId() {
2170        return lastDeliveredSequenceId;
2171    }
2172
2173    protected void sendAck(MessageAck ack) throws JMSException {
2174        sendAck(ack,false);
2175    }
2176
2177    protected void sendAck(MessageAck ack, boolean lazy) throws JMSException {
2178        if (lazy || connection.isSendAcksAsync() || getTransacted()) {
2179            asyncSendPacket(ack);
2180        } else {
2181            syncSendPacket(ack);
2182        }
2183    }
2184
2185    protected Scheduler getScheduler() throws JMSException {
2186        return this.connection.getScheduler();
2187    }
2188
2189    protected ThreadPoolExecutor getConnectionExecutor() {
2190        return this.connectionExecutor;
2191    }
2192}