001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.File;
020import java.io.InputStream;
021import java.io.Serializable;
022import java.net.URL;
023import java.util.Collections;
024import java.util.Iterator;
025import java.util.List;
026import java.util.concurrent.CopyOnWriteArrayList;
027import java.util.concurrent.ThreadPoolExecutor;
028import java.util.concurrent.atomic.AtomicBoolean;
029import java.util.concurrent.atomic.AtomicInteger;
030
031import javax.jms.BytesMessage;
032import javax.jms.Destination;
033import javax.jms.IllegalStateException;
034import javax.jms.InvalidDestinationException;
035import javax.jms.InvalidSelectorException;
036import javax.jms.JMSException;
037import javax.jms.MapMessage;
038import javax.jms.Message;
039import javax.jms.MessageConsumer;
040import javax.jms.MessageListener;
041import javax.jms.MessageProducer;
042import javax.jms.ObjectMessage;
043import javax.jms.Queue;
044import javax.jms.QueueBrowser;
045import javax.jms.QueueReceiver;
046import javax.jms.QueueSender;
047import javax.jms.QueueSession;
048import javax.jms.Session;
049import javax.jms.StreamMessage;
050import javax.jms.TemporaryQueue;
051import javax.jms.TemporaryTopic;
052import javax.jms.TextMessage;
053import javax.jms.Topic;
054import javax.jms.TopicPublisher;
055import javax.jms.TopicSession;
056import javax.jms.TopicSubscriber;
057import javax.jms.TransactionRolledBackException;
058
059import org.apache.activemq.blob.BlobDownloader;
060import org.apache.activemq.blob.BlobTransferPolicy;
061import org.apache.activemq.blob.BlobUploader;
062import org.apache.activemq.command.ActiveMQBlobMessage;
063import org.apache.activemq.command.ActiveMQBytesMessage;
064import org.apache.activemq.command.ActiveMQDestination;
065import org.apache.activemq.command.ActiveMQMapMessage;
066import org.apache.activemq.command.ActiveMQMessage;
067import org.apache.activemq.command.ActiveMQObjectMessage;
068import org.apache.activemq.command.ActiveMQQueue;
069import org.apache.activemq.command.ActiveMQStreamMessage;
070import org.apache.activemq.command.ActiveMQTempDestination;
071import org.apache.activemq.command.ActiveMQTempQueue;
072import org.apache.activemq.command.ActiveMQTempTopic;
073import org.apache.activemq.command.ActiveMQTextMessage;
074import org.apache.activemq.command.ActiveMQTopic;
075import org.apache.activemq.command.Command;
076import org.apache.activemq.command.ConsumerId;
077import org.apache.activemq.command.MessageAck;
078import org.apache.activemq.command.MessageDispatch;
079import org.apache.activemq.command.MessageId;
080import org.apache.activemq.command.ProducerId;
081import org.apache.activemq.command.RemoveInfo;
082import org.apache.activemq.command.Response;
083import org.apache.activemq.command.SessionId;
084import org.apache.activemq.command.SessionInfo;
085import org.apache.activemq.command.TransactionId;
086import org.apache.activemq.management.JMSSessionStatsImpl;
087import org.apache.activemq.management.StatsCapable;
088import org.apache.activemq.management.StatsImpl;
089import org.apache.activemq.thread.Scheduler;
090import org.apache.activemq.transaction.Synchronization;
091import org.apache.activemq.usage.MemoryUsage;
092import org.apache.activemq.util.Callback;
093import org.apache.activemq.util.LongSequenceGenerator;
094import org.slf4j.Logger;
095import org.slf4j.LoggerFactory;
096
097/**
098 * <P>
099 * A <CODE>Session</CODE> object is a single-threaded context for producing
100 * and consuming messages. Although it may allocate provider resources outside
101 * the Java virtual machine (JVM), it is considered a lightweight JMS object.
102 * <P>
103 * A session serves several purposes:
104 * <UL>
105 * <LI>It is a factory for its message producers and consumers.
106 * <LI>It supplies provider-optimized message factories.
107 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and
108 * <CODE>TemporaryQueues</CODE>.
109 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE>
110 * objects for those clients that need to dynamically manipulate
111 * provider-specific destination names.
112 * <LI>It supports a single series of transactions that combine work spanning
113 * its producers and consumers into atomic units.
114 * <LI>It defines a serial order for the messages it consumes and the messages
115 * it produces.
116 * <LI>It retains messages it consumes until they have been acknowledged.
117 * <LI>It serializes execution of message listeners registered with its message
118 * consumers.
119 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
120 * </UL>
121 * <P>
122 * A session can create and service multiple message producers and consumers.
123 * <P>
124 * One typical use is to have a thread block on a synchronous
125 * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then
126 * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
127 * <P>
128 * If a client desires to have one thread produce messages while others consume
129 * them, the client should use a separate session for its producing thread.
130 * <P>
131 * Once a connection has been started, any session with one or more registered
132 * message listeners is dedicated to the thread of control that delivers
133 * messages to it. It is erroneous for client code to use this session or any of
134 * its constituent objects from another thread of control. The only exception to
135 * this rule is the use of the session or connection <CODE>close</CODE>
136 * method.
137 * <P>
138 * It should be easy for most clients to partition their work naturally into
139 * sessions. This model allows clients to start simply and incrementally add
140 * message processing complexity as their need for concurrency grows.
141 * <P>
142 * The <CODE>close</CODE> method is the only session method that can be called
143 * while some other session method is being executed in another thread.
144 * <P>
145 * A session may be specified as transacted. Each transacted session supports a
146 * single series of transactions. Each transaction groups a set of message sends
147 * and a set of message receives into an atomic unit of work. In effect,
148 * transactions organize a session's input message stream and output message
149 * stream into series of atomic units. When a transaction commits, its atomic
150 * unit of input is acknowledged and its associated atomic unit of output is
151 * sent. If a transaction rollback is done, the transaction's sent messages are
152 * destroyed and the session's input is automatically recovered.
153 * <P>
154 * The content of a transaction's input and output units is simply those
155 * messages that have been produced and consumed within the session's current
156 * transaction.
157 * <P>
158 * A transaction is completed using either its session's <CODE>commit</CODE>
159 * method or its session's <CODE>rollback </CODE> method. The completion of a
160 * session's current transaction automatically begins the next. The result is
161 * that a transacted session always has a current transaction within which its
162 * work is done.
163 * <P>
164 * The Java Transaction Service (JTS) or some other transaction monitor may be
165 * used to combine a session's transaction with transactions on other resources
166 * (databases, other JMS sessions, etc.). Since Java distributed transactions
167 * are controlled via the Java Transaction API (JTA), use of the session's
168 * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is
169 * prohibited.
170 * <P>
171 * The JMS API does not require support for JTA; however, it does define how a
172 * provider supplies this support.
173 * <P>
174 * Although it is also possible for a JMS client to handle distributed
175 * transactions directly, it is unlikely that many JMS clients will do this.
176 * Support for JTA in the JMS API is targeted at systems vendors who will be
177 * integrating the JMS API into their application server products.
178 *
179 *
180 * @see javax.jms.Session
181 * @see javax.jms.QueueSession
182 * @see javax.jms.TopicSession
183 * @see javax.jms.XASession
184 */
185public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {
186
187    /**
188     * Only acknowledge an individual message - using message.acknowledge()
189     * as opposed to CLIENT_ACKNOWLEDGE which
190     * acknowledges all messages consumed by a session at when acknowledge()
191     * is called
192     */
193    public static final int INDIVIDUAL_ACKNOWLEDGE = 4;
194    public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE;
195
196    public static interface DeliveryListener {
197        void beforeDelivery(ActiveMQSession session, Message msg);
198
199        void afterDelivery(ActiveMQSession session, Message msg);
200    }
201
202    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class);
203    private final ThreadPoolExecutor connectionExecutor;
204
205    protected int acknowledgementMode;
206    protected final ActiveMQConnection connection;
207    protected final SessionInfo info;
208    protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
209    protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
210    protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator();
211    protected final ActiveMQSessionExecutor executor;
212    protected final AtomicBoolean started = new AtomicBoolean(false);
213
214    protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>();
215    protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>();
216
217    protected boolean closed;
218    private volatile boolean synchronizationRegistered;
219    protected boolean asyncDispatch;
220    protected boolean sessionAsyncDispatch;
221    protected final boolean debug;
222    protected Object sendMutex = new Object();
223    private final AtomicBoolean clearInProgress = new AtomicBoolean();
224
225    private MessageListener messageListener;
226    private final JMSSessionStatsImpl stats;
227    private TransactionContext transactionContext;
228    private DeliveryListener deliveryListener;
229    private MessageTransformer transformer;
230    private BlobTransferPolicy blobTransferPolicy;
231    private long lastDeliveredSequenceId;
232
233    /**
234     * Construct the Session
235     *
236     * @param connection
237     * @param sessionId
238     * @param acknowledgeMode n.b if transacted - the acknowledgeMode ==
239     *                Session.SESSION_TRANSACTED
240     * @param asyncDispatch
241     * @param sessionAsyncDispatch
242     * @throws JMSException on internal error
243     */
244    protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException {
245        this.debug = LOG.isDebugEnabled();
246        this.connection = connection;
247        this.acknowledgementMode = acknowledgeMode;
248        this.asyncDispatch = asyncDispatch;
249        this.sessionAsyncDispatch = sessionAsyncDispatch;
250        this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
251        setTransactionContext(new TransactionContext(connection));
252        stats = new JMSSessionStatsImpl(producers, consumers);
253        this.connection.asyncSendPacket(info);
254        setTransformer(connection.getTransformer());
255        setBlobTransferPolicy(connection.getBlobTransferPolicy());
256        this.connectionExecutor=connection.getExecutor();
257        this.executor = new ActiveMQSessionExecutor(this);
258        connection.addSession(this);
259        if (connection.isStarted()) {
260            start();
261        }
262
263    }
264
265    protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException {
266        this(connection, sessionId, acknowledgeMode, asyncDispatch, true);
267    }
268
269    /**
270     * Sets the transaction context of the session.
271     *
272     * @param transactionContext - provides the means to control a JMS
273     *                transaction.
274     */
275    public void setTransactionContext(TransactionContext transactionContext) {
276        this.transactionContext = transactionContext;
277    }
278
279    /**
280     * Returns the transaction context of the session.
281     *
282     * @return transactionContext - session's transaction context.
283     */
284    public TransactionContext getTransactionContext() {
285        return transactionContext;
286    }
287
288    /*
289     * (non-Javadoc)
290     *
291     * @see org.apache.activemq.management.StatsCapable#getStats()
292     */
293    @Override
294    public StatsImpl getStats() {
295        return stats;
296    }
297
298    /**
299     * Returns the session's statistics.
300     *
301     * @return stats - session's statistics.
302     */
303    public JMSSessionStatsImpl getSessionStats() {
304        return stats;
305    }
306
307    /**
308     * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE>
309     * object is used to send a message containing a stream of uninterpreted
310     * bytes.
311     *
312     * @return the an ActiveMQBytesMessage
313     * @throws JMSException if the JMS provider fails to create this message due
314     *                 to some internal error.
315     */
316    @Override
317    public BytesMessage createBytesMessage() throws JMSException {
318        ActiveMQBytesMessage message = new ActiveMQBytesMessage();
319        configureMessage(message);
320        return message;
321    }
322
323    /**
324     * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE>
325     * object is used to send a self-defining set of name-value pairs, where
326     * names are <CODE>String</CODE> objects and values are primitive values
327     * in the Java programming language.
328     *
329     * @return an ActiveMQMapMessage
330     * @throws JMSException if the JMS provider fails to create this message due
331     *                 to some internal error.
332     */
333    @Override
334    public MapMessage createMapMessage() throws JMSException {
335        ActiveMQMapMessage message = new ActiveMQMapMessage();
336        configureMessage(message);
337        return message;
338    }
339
340    /**
341     * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE>
342     * interface is the root interface of all JMS messages. A
343     * <CODE>Message</CODE> object holds all the standard message header
344     * information. It can be sent when a message containing only header
345     * information is sufficient.
346     *
347     * @return an ActiveMQMessage
348     * @throws JMSException if the JMS provider fails to create this message due
349     *                 to some internal error.
350     */
351    @Override
352    public Message createMessage() throws JMSException {
353        ActiveMQMessage message = new ActiveMQMessage();
354        configureMessage(message);
355        return message;
356    }
357
358    /**
359     * Creates an <CODE>ObjectMessage</CODE> object. An
360     * <CODE>ObjectMessage</CODE> object is used to send a message that
361     * contains a serializable Java object.
362     *
363     * @return an ActiveMQObjectMessage
364     * @throws JMSException if the JMS provider fails to create this message due
365     *                 to some internal error.
366     */
367    @Override
368    public ObjectMessage createObjectMessage() throws JMSException {
369        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
370        configureMessage(message);
371        return message;
372    }
373
374    /**
375     * Creates an initialized <CODE>ObjectMessage</CODE> object. An
376     * <CODE>ObjectMessage</CODE> object is used to send a message that
377     * contains a serializable Java object.
378     *
379     * @param object the object to use to initialize this message
380     * @return an ActiveMQObjectMessage
381     * @throws JMSException if the JMS provider fails to create this message due
382     *                 to some internal error.
383     */
384    @Override
385    public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
386        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
387        configureMessage(message);
388        message.setObject(object);
389        return message;
390    }
391
392    /**
393     * Creates a <CODE>StreamMessage</CODE> object. A
394     * <CODE>StreamMessage</CODE> object is used to send a self-defining
395     * stream of primitive values in the Java programming language.
396     *
397     * @return an ActiveMQStreamMessage
398     * @throws JMSException if the JMS provider fails to create this message due
399     *                 to some internal error.
400     */
401    @Override
402    public StreamMessage createStreamMessage() throws JMSException {
403        ActiveMQStreamMessage message = new ActiveMQStreamMessage();
404        configureMessage(message);
405        return message;
406    }
407
408    /**
409     * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE>
410     * object is used to send a message containing a <CODE>String</CODE>
411     * object.
412     *
413     * @return an ActiveMQTextMessage
414     * @throws JMSException if the JMS provider fails to create this message due
415     *                 to some internal error.
416     */
417    @Override
418    public TextMessage createTextMessage() throws JMSException {
419        ActiveMQTextMessage message = new ActiveMQTextMessage();
420        configureMessage(message);
421        return message;
422    }
423
424    /**
425     * Creates an initialized <CODE>TextMessage</CODE> object. A
426     * <CODE>TextMessage</CODE> object is used to send a message containing a
427     * <CODE>String</CODE>.
428     *
429     * @param text the string used to initialize this message
430     * @return an ActiveMQTextMessage
431     * @throws JMSException if the JMS provider fails to create this message due
432     *                 to some internal error.
433     */
434    @Override
435    public TextMessage createTextMessage(String text) throws JMSException {
436        ActiveMQTextMessage message = new ActiveMQTextMessage();
437        message.setText(text);
438        configureMessage(message);
439        return message;
440    }
441
442    /**
443     * Creates an initialized <CODE>BlobMessage</CODE> object. A
444     * <CODE>BlobMessage</CODE> object is used to send a message containing a
445     * <CODE>URL</CODE> which points to some network addressible BLOB.
446     *
447     * @param url the network addressable URL used to pass directly to the
448     *                consumer
449     * @return a BlobMessage
450     * @throws JMSException if the JMS provider fails to create this message due
451     *                 to some internal error.
452     */
453    public BlobMessage createBlobMessage(URL url) throws JMSException {
454        return createBlobMessage(url, false);
455    }
456
457    /**
458     * Creates an initialized <CODE>BlobMessage</CODE> object. A
459     * <CODE>BlobMessage</CODE> object is used to send a message containing a
460     * <CODE>URL</CODE> which points to some network addressible BLOB.
461     *
462     * @param url the network addressable URL used to pass directly to the
463     *                consumer
464     * @param deletedByBroker indicates whether or not the resource is deleted
465     *                by the broker when the message is acknowledged
466     * @return a BlobMessage
467     * @throws JMSException if the JMS provider fails to create this message due
468     *                 to some internal error.
469     */
470    public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException {
471        ActiveMQBlobMessage message = new ActiveMQBlobMessage();
472        configureMessage(message);
473        message.setURL(url);
474        message.setDeletedByBroker(deletedByBroker);
475        message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
476        return message;
477    }
478
479    /**
480     * Creates an initialized <CODE>BlobMessage</CODE> object. A
481     * <CODE>BlobMessage</CODE> object is used to send a message containing
482     * the <CODE>File</CODE> content. Before the message is sent the file
483     * conent will be uploaded to the broker or some other remote repository
484     * depending on the {@link #getBlobTransferPolicy()}.
485     *
486     * @param file the file to be uploaded to some remote repo (or the broker)
487     *                depending on the strategy
488     * @return a BlobMessage
489     * @throws JMSException if the JMS provider fails to create this message due
490     *                 to some internal error.
491     */
492    public BlobMessage createBlobMessage(File file) throws JMSException {
493        ActiveMQBlobMessage message = new ActiveMQBlobMessage();
494        configureMessage(message);
495        message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file));
496        message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy())));
497        message.setDeletedByBroker(true);
498        message.setName(file.getName());
499        return message;
500    }
501
502    /**
503     * Creates an initialized <CODE>BlobMessage</CODE> object. A
504     * <CODE>BlobMessage</CODE> object is used to send a message containing
505     * the <CODE>File</CODE> content. Before the message is sent the file
506     * conent will be uploaded to the broker or some other remote repository
507     * depending on the {@link #getBlobTransferPolicy()}.
508     *
509     * @param in the stream to be uploaded to some remote repo (or the broker)
510     *                depending on the strategy
511     * @return a BlobMessage
512     * @throws JMSException if the JMS provider fails to create this message due
513     *                 to some internal error.
514     */
515    public BlobMessage createBlobMessage(InputStream in) throws JMSException {
516        ActiveMQBlobMessage message = new ActiveMQBlobMessage();
517        configureMessage(message);
518        message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in));
519        message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
520        message.setDeletedByBroker(true);
521        return message;
522    }
523
524    /**
525     * Indicates whether the session is in transacted mode.
526     *
527     * @return true if the session is in transacted mode
528     * @throws JMSException if there is some internal error.
529     */
530    @Override
531    public boolean getTransacted() throws JMSException {
532        checkClosed();
533        return isTransacted();
534    }
535
536    /**
537     * Returns the acknowledgement mode of the session. The acknowledgement mode
538     * is set at the time that the session is created. If the session is
539     * transacted, the acknowledgement mode is ignored.
540     *
541     * @return If the session is not transacted, returns the current
542     *         acknowledgement mode for the session. If the session is
543     *         transacted, returns SESSION_TRANSACTED.
544     * @throws JMSException
545     * @see javax.jms.Connection#createSession(boolean,int)
546     * @since 1.1 exception JMSException if there is some internal error.
547     */
548    @Override
549    public int getAcknowledgeMode() throws JMSException {
550        checkClosed();
551        return this.acknowledgementMode;
552    }
553
554    /**
555     * Commits all messages done in this transaction and releases any locks
556     * currently held.
557     *
558     * @throws JMSException if the JMS provider fails to commit the transaction
559     *                 due to some internal error.
560     * @throws TransactionRolledBackException if the transaction is rolled back
561     *                 due to some internal error during commit.
562     * @throws javax.jms.IllegalStateException if the method is not called by a
563     *                 transacted session.
564     */
565    @Override
566    public void commit() throws JMSException {
567        checkClosed();
568        if (!getTransacted()) {
569            throw new javax.jms.IllegalStateException("Not a transacted session");
570        }
571        if (LOG.isDebugEnabled()) {
572            LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId());
573        }
574        transactionContext.commit();
575    }
576
577    /**
578     * Rolls back any messages done in this transaction and releases any locks
579     * currently held.
580     *
581     * @throws JMSException if the JMS provider fails to roll back the
582     *                 transaction due to some internal error.
583     * @throws javax.jms.IllegalStateException if the method is not called by a
584     *                 transacted session.
585     */
586    @Override
587    public void rollback() throws JMSException {
588        checkClosed();
589        if (!getTransacted()) {
590            throw new javax.jms.IllegalStateException("Not a transacted session");
591        }
592        if (LOG.isDebugEnabled()) {
593            LOG.debug(getSessionId() + " Transaction Rollback, txid:"  + transactionContext.getTransactionId());
594        }
595        transactionContext.rollback();
596    }
597
598    /**
599     * Closes the session.
600     * <P>
601     * Since a provider may allocate some resources on behalf of a session
602     * outside the JVM, clients should close the resources when they are not
603     * needed. Relying on garbage collection to eventually reclaim these
604     * resources may not be timely enough.
605     * <P>
606     * There is no need to close the producers and consumers of a closed
607     * session.
608     * <P>
609     * This call will block until a <CODE>receive</CODE> call or message
610     * listener in progress has completed. A blocked message consumer
611     * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session
612     * is closed.
613     * <P>
614     * Closing a transacted session must roll back the transaction in progress.
615     * <P>
616     * This method is the only <CODE>Session</CODE> method that can be called
617     * concurrently.
618     * <P>
619     * Invoking any other <CODE>Session</CODE> method on a closed session must
620     * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a
621     * closed session must <I>not </I> throw an exception.
622     *
623     * @throws JMSException if the JMS provider fails to close the session due
624     *                 to some internal error.
625     */
626    @Override
627    public void close() throws JMSException {
628        if (!closed) {
629            if (getTransactionContext().isInXATransaction()) {
630                if (!synchronizationRegistered) {
631                    synchronizationRegistered = true;
632                    getTransactionContext().addSynchronization(new Synchronization() {
633
634                                        @Override
635                                        public void afterCommit() throws Exception {
636                                            doClose();
637                                            synchronizationRegistered = false;
638                                        }
639
640                                        @Override
641                                        public void afterRollback() throws Exception {
642                                            doClose();
643                                            synchronizationRegistered = false;
644                                        }
645                                    });
646                }
647
648            } else {
649                doClose();
650            }
651        }
652    }
653
654    private void doClose() throws JMSException {
655        boolean interrupted = Thread.interrupted();
656        dispose();
657        RemoveInfo removeCommand = info.createRemoveCommand();
658        removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
659        connection.asyncSendPacket(removeCommand);
660        if (interrupted) {
661            Thread.currentThread().interrupt();
662        }
663    }
664
665    final AtomicInteger clearRequestsCounter = new AtomicInteger(0);
666    void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) {
667        clearRequestsCounter.incrementAndGet();
668        executor.clearMessagesInProgress();
669        // we are called from inside the transport reconnection logic which involves us
670        // clearing all the connections' consumers dispatch and delivered lists. So rather
671        // than trying to grab a mutex (which could be already owned by the message listener
672        // calling the send or an ack) we allow it to complete in a separate thread via the
673        // scheduler and notify us via connection.transportInterruptionProcessingComplete()
674        //
675        // We must be careful though not to allow multiple calls to this method from a
676        // connection that is having issue becoming fully established from causing a large
677        // build up of scheduled tasks to clear the same consumers over and over.
678        if (consumers.isEmpty()) {
679            return;
680        }
681
682        if (clearInProgress.compareAndSet(false, true)) {
683            for (final ActiveMQMessageConsumer consumer : consumers) {
684                consumer.inProgressClearRequired();
685                transportInterruptionProcessingComplete.incrementAndGet();
686                try {
687                    connection.getScheduler().executeAfterDelay(new Runnable() {
688                        @Override
689                        public void run() {
690                            consumer.clearMessagesInProgress();
691                        }}, 0l);
692                } catch (JMSException e) {
693                    connection.onClientInternalException(e);
694                }
695            }
696
697            try {
698                connection.getScheduler().executeAfterDelay(new Runnable() {
699                    @Override
700                    public void run() {
701                        clearInProgress.set(false);
702                    }}, 0l);
703            } catch (JMSException e) {
704                connection.onClientInternalException(e);
705            }
706        }
707    }
708
709    void deliverAcks() {
710        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
711            ActiveMQMessageConsumer consumer = iter.next();
712            consumer.deliverAcks();
713        }
714    }
715
716    public synchronized void dispose() throws JMSException {
717        if (!closed) {
718
719            try {
720                executor.stop();
721
722                for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
723                    ActiveMQMessageConsumer consumer = iter.next();
724                    consumer.setFailureError(connection.getFirstFailureError());
725                    consumer.dispose();
726                    lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId());
727                }
728                consumers.clear();
729
730                for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) {
731                    ActiveMQMessageProducer producer = iter.next();
732                    producer.dispose();
733                }
734                producers.clear();
735
736                try {
737                    if (getTransactionContext().isInLocalTransaction()) {
738                        rollback();
739                    }
740                } catch (JMSException e) {
741                }
742
743            } finally {
744                connection.removeSession(this);
745                this.transactionContext = null;
746                closed = true;
747            }
748        }
749    }
750
751    /**
752     * Checks that the session is not closed then configures the message
753     */
754    protected void configureMessage(ActiveMQMessage message) throws IllegalStateException {
755        checkClosed();
756        message.setConnection(connection);
757    }
758
759    /**
760     * Check if the session is closed. It is used for ensuring that the session
761     * is open before performing various operations.
762     *
763     * @throws IllegalStateException if the Session is closed
764     */
765    protected void checkClosed() throws IllegalStateException {
766        if (closed) {
767            throw new IllegalStateException("The Session is closed");
768        }
769    }
770
771    /**
772     * Checks if the session is closed.
773     *
774     * @return true if the session is closed, false otherwise.
775     */
776    public boolean isClosed() {
777        return closed;
778    }
779
780    /**
781     * Stops message delivery in this session, and restarts message delivery
782     * with the oldest unacknowledged message.
783     * <P>
784     * All consumers deliver messages in a serial order. Acknowledging a
785     * received message automatically acknowledges all messages that have been
786     * delivered to the client.
787     * <P>
788     * Restarting a session causes it to take the following actions:
789     * <UL>
790     * <LI>Stop message delivery
791     * <LI>Mark all messages that might have been delivered but not
792     * acknowledged as "redelivered"
793     * <LI>Restart the delivery sequence including all unacknowledged messages
794     * that had been previously delivered. Redelivered messages do not have to
795     * be delivered in exactly their original delivery order.
796     * </UL>
797     *
798     * @throws JMSException if the JMS provider fails to stop and restart
799     *                 message delivery due to some internal error.
800     * @throws IllegalStateException if the method is called by a transacted
801     *                 session.
802     */
803    @Override
804    public void recover() throws JMSException {
805
806        checkClosed();
807        if (getTransacted()) {
808            throw new IllegalStateException("This session is transacted");
809        }
810
811        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
812            ActiveMQMessageConsumer c = iter.next();
813            c.rollback();
814        }
815
816    }
817
818    /**
819     * Returns the session's distinguished message listener (optional).
820     *
821     * @return the message listener associated with this session
822     * @throws JMSException if the JMS provider fails to get the message
823     *                 listener due to an internal error.
824     * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
825     * @see javax.jms.ServerSessionPool
826     * @see javax.jms.ServerSession
827     */
828    @Override
829    public MessageListener getMessageListener() throws JMSException {
830        checkClosed();
831        return this.messageListener;
832    }
833
834    /**
835     * Sets the session's distinguished message listener (optional).
836     * <P>
837     * When the distinguished message listener is set, no other form of message
838     * receipt in the session can be used; however, all forms of sending
839     * messages are still supported.
840     * <P>
841     * If this session has been closed, then an {@link IllegalStateException} is
842     * thrown, if trying to set a new listener. However setting the listener
843     * to <tt>null</tt> is allowed, to clear the listener, even if this session
844     * has been closed prior.
845     * <P>
846     * This is an expert facility not used by regular JMS clients.
847     *
848     * @param listener the message listener to associate with this session
849     * @throws JMSException if the JMS provider fails to set the message
850     *                 listener due to an internal error.
851     * @see javax.jms.Session#getMessageListener()
852     * @see javax.jms.ServerSessionPool
853     * @see javax.jms.ServerSession
854     */
855    @Override
856    public void setMessageListener(MessageListener listener) throws JMSException {
857        // only check for closed if we set a new listener, as we allow to clear
858        // the listener, such as when an application is shutting down, and is
859        // no longer using a message listener on this session
860        if (listener != null) {
861            checkClosed();
862        }
863        this.messageListener = listener;
864
865        if (listener != null) {
866            executor.setDispatchedBySessionPool(true);
867        }
868    }
869
870    /**
871     * Optional operation, intended to be used only by Application Servers, not
872     * by ordinary JMS clients.
873     *
874     * @see javax.jms.ServerSession
875     */
876    @Override
877    public void run() {
878        MessageDispatch messageDispatch;
879        while ((messageDispatch = executor.dequeueNoWait()) != null) {
880            final MessageDispatch md = messageDispatch;
881            ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
882
883            MessageAck earlyAck = null;
884            if (message.isExpired()) {
885                earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1);
886            } else if (connection.isDuplicate(ActiveMQSession.this, message)) {
887                LOG.debug("{} got duplicate: {}", this, message.getMessageId());
888                earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
889                earlyAck.setFirstMessageId(md.getMessage().getMessageId());
890                earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this));
891            }
892            if (earlyAck != null) {
893                try {
894                    asyncSendPacket(earlyAck);
895                } catch (Throwable t) {
896                    LOG.error("error dispatching ack: {} ", earlyAck, t);
897                    connection.onClientInternalException(t);
898                } finally {
899                    continue;
900                }
901            }
902
903            if (isClientAcknowledge()||isIndividualAcknowledge()) {
904                message.setAcknowledgeCallback(new Callback() {
905                    @Override
906                    public void execute() throws Exception {
907                    }
908                });
909            }
910
911            if (deliveryListener != null) {
912                deliveryListener.beforeDelivery(this, message);
913            }
914
915            md.setDeliverySequenceId(getNextDeliveryId());
916
917            final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
918            try {
919                ack.setFirstMessageId(md.getMessage().getMessageId());
920                doStartTransaction();
921                ack.setTransactionId(getTransactionContext().getTransactionId());
922                if (ack.getTransactionId() != null) {
923                    getTransactionContext().addSynchronization(new Synchronization() {
924
925                        final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get());
926                        @Override
927                        public void beforeEnd() throws Exception {
928                            // validate our consumer so we don't push stale acks that get ignored
929                            if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
930                                LOG.debug("forcing rollback - {} consumer no longer active on {}", ack, connection);
931                                throw new TransactionRolledBackException("consumer " + ack.getConsumerId() + " no longer active on " + connection);
932                            }
933                            LOG.trace("beforeEnd ack {}", ack);
934                            sendAck(ack);
935                        }
936
937                        @Override
938                        public void afterRollback() throws Exception {
939                            LOG.trace("rollback {}", ack, new Throwable("here"));
940                            md.getMessage().onMessageRolledBack();
941                            // ensure we don't filter this as a duplicate
942                            connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
943
944                            // don't redeliver if we have been interrupted b/c the broker will redeliver on reconnect
945                            if (clearRequestsCounter.get() > clearRequestCount) {
946                                LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport());
947                                return;
948                            }
949
950                            // validate our consumer so we don't push stale acks that get ignored or redeliver what will be redispatched
951                            if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
952                                LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport());
953                                return;
954                            }
955
956                            RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
957                            int redeliveryCounter = md.getMessage().getRedeliveryCounter();
958                            if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
959                                && redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) {
960                                // We need to NACK the messages so that they get
961                                // sent to the
962                                // DLQ.
963                                // Acknowledge the last message.
964                                MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
965                                ack.setFirstMessageId(md.getMessage().getMessageId());
966                                ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy));
967                                asyncSendPacket(ack);
968
969                            } else {
970
971                                MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
972                                ack.setFirstMessageId(md.getMessage().getMessageId());
973                                asyncSendPacket(ack);
974
975                                // Figure out how long we should wait to resend
976                                // this message.
977                                long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
978                                for (int i = 0; i < redeliveryCounter; i++) {
979                                    redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
980                                }
981                                connection.getScheduler().executeAfterDelay(new Runnable() {
982
983                                    @Override
984                                    public void run() {
985                                        ((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
986                                    }
987                                }, redeliveryDelay);
988                            }
989                        }
990                    });
991                }
992
993                LOG.trace("{} onMessage({})", this, message.getMessageId());
994                messageListener.onMessage(message);
995
996            } catch (Throwable e) {
997                LOG.error("error dispatching message: ", e);
998                // A problem while invoking the MessageListener does not
999                // in general indicate a problem with the connection to the broker, i.e.
1000                // it will usually be sufficient to let the afterDelivery() method either
1001                // commit or roll back in order to deal with the exception.
1002                // However, we notify any registered client internal exception listener
1003                // of the problem.
1004                connection.onClientInternalException(e);
1005            } finally {
1006                if (ack.getTransactionId() == null) {
1007                    try {
1008                        asyncSendPacket(ack);
1009                    } catch (Throwable e) {
1010                        connection.onClientInternalException(e);
1011                    }
1012                }
1013            }
1014
1015            if (deliveryListener != null) {
1016                deliveryListener.afterDelivery(this, message);
1017            }
1018        }
1019    }
1020
1021    /**
1022     * Creates a <CODE>MessageProducer</CODE> to send messages to the
1023     * specified destination.
1024     * <P>
1025     * A client uses a <CODE>MessageProducer</CODE> object to send messages to
1026     * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both
1027     * inherit from <CODE>Destination</CODE>, they can be used in the
1028     * destination parameter to create a <CODE>MessageProducer</CODE> object.
1029     *
1030     * @param destination the <CODE>Destination</CODE> to send to, or null if
1031     *                this is a producer which does not have a specified
1032     *                destination.
1033     * @return the MessageProducer
1034     * @throws JMSException if the session fails to create a MessageProducer due
1035     *                 to some internal error.
1036     * @throws InvalidDestinationException if an invalid destination is
1037     *                 specified.
1038     * @since 1.1
1039     */
1040    @Override
1041    public MessageProducer createProducer(Destination destination) throws JMSException {
1042        checkClosed();
1043        if (destination instanceof CustomDestination) {
1044            CustomDestination customDestination = (CustomDestination)destination;
1045            return customDestination.createProducer(this);
1046        }
1047        int timeSendOut = connection.getSendTimeout();
1048        return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut);
1049    }
1050
1051    /**
1052     * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
1053     * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
1054     * <CODE>Destination</CODE>, they can be used in the destination
1055     * parameter to create a <CODE>MessageConsumer</CODE>.
1056     *
1057     * @param destination the <CODE>Destination</CODE> to access.
1058     * @return the MessageConsumer
1059     * @throws JMSException if the session fails to create a consumer due to
1060     *                 some internal error.
1061     * @throws InvalidDestinationException if an invalid destination is
1062     *                 specified.
1063     * @since 1.1
1064     */
1065    @Override
1066    public MessageConsumer createConsumer(Destination destination) throws JMSException {
1067        return createConsumer(destination, (String) null);
1068    }
1069
1070    /**
1071     * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
1072     * using a message selector. Since <CODE> Queue</CODE> and
1073     * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
1074     * can be used in the destination parameter to create a
1075     * <CODE>MessageConsumer</CODE>.
1076     * <P>
1077     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1078     * that have been sent to a destination.
1079     *
1080     * @param destination the <CODE>Destination</CODE> to access
1081     * @param messageSelector only messages with properties matching the message
1082     *                selector expression are delivered. A value of null or an
1083     *                empty string indicates that there is no message selector
1084     *                for the message consumer.
1085     * @return the MessageConsumer
1086     * @throws JMSException if the session fails to create a MessageConsumer due
1087     *                 to some internal error.
1088     * @throws InvalidDestinationException if an invalid destination is
1089     *                 specified.
1090     * @throws InvalidSelectorException if the message selector is invalid.
1091     * @since 1.1
1092     */
1093    @Override
1094    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
1095        return createConsumer(destination, messageSelector, false);
1096    }
1097
1098    /**
1099     * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
1100     * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
1101     * <CODE>Destination</CODE>, they can be used in the destination
1102     * parameter to create a <CODE>MessageConsumer</CODE>.
1103     *
1104     * @param destination the <CODE>Destination</CODE> to access.
1105     * @param messageListener the listener to use for async consumption of messages
1106     * @return the MessageConsumer
1107     * @throws JMSException if the session fails to create a consumer due to
1108     *                 some internal error.
1109     * @throws InvalidDestinationException if an invalid destination is
1110     *                 specified.
1111     * @since 1.1
1112     */
1113    public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException {
1114        return createConsumer(destination, null, messageListener);
1115    }
1116
1117    /**
1118     * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
1119     * using a message selector. Since <CODE> Queue</CODE> and
1120     * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
1121     * can be used in the destination parameter to create a
1122     * <CODE>MessageConsumer</CODE>.
1123     * <P>
1124     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1125     * that have been sent to a destination.
1126     *
1127     * @param destination the <CODE>Destination</CODE> to access
1128     * @param messageSelector only messages with properties matching the message
1129     *                selector expression are delivered. A value of null or an
1130     *                empty string indicates that there is no message selector
1131     *                for the message consumer.
1132     * @param messageListener the listener to use for async consumption of messages
1133     * @return the MessageConsumer
1134     * @throws JMSException if the session fails to create a MessageConsumer due
1135     *                 to some internal error.
1136     * @throws InvalidDestinationException if an invalid destination is
1137     *                 specified.
1138     * @throws InvalidSelectorException if the message selector is invalid.
1139     * @since 1.1
1140     */
1141    public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException {
1142        return createConsumer(destination, messageSelector, false, messageListener);
1143    }
1144
1145    /**
1146     * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1147     * using a message selector. This method can specify whether messages
1148     * published by its own connection should be delivered to it, if the
1149     * destination is a topic.
1150     * <P>
1151     * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1152     * <CODE>Destination</CODE>, they can be used in the destination
1153     * parameter to create a <CODE>MessageConsumer</CODE>.
1154     * <P>
1155     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1156     * that have been published to a destination.
1157     * <P>
1158     * In some cases, a connection may both publish and subscribe to a topic.
1159     * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1160     * inhibit the delivery of messages published by its own connection. The
1161     * default value for this attribute is False. The <CODE>noLocal</CODE>
1162     * value must be supported by destinations that are topics.
1163     *
1164     * @param destination the <CODE>Destination</CODE> to access
1165     * @param messageSelector only messages with properties matching the message
1166     *                selector expression are delivered. A value of null or an
1167     *                empty string indicates that there is no message selector
1168     *                for the message consumer.
1169     * @param noLocal - if true, and the destination is a topic, inhibits the
1170     *                delivery of messages published by its own connection. The
1171     *                behavior for <CODE>NoLocal</CODE> is not specified if
1172     *                the destination is a queue.
1173     * @return the MessageConsumer
1174     * @throws JMSException if the session fails to create a MessageConsumer due
1175     *                 to some internal error.
1176     * @throws InvalidDestinationException if an invalid destination is
1177     *                 specified.
1178     * @throws InvalidSelectorException if the message selector is invalid.
1179     * @since 1.1
1180     */
1181    @Override
1182    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
1183        return createConsumer(destination, messageSelector, noLocal, null);
1184    }
1185
1186    /**
1187     * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1188     * using a message selector. This method can specify whether messages
1189     * published by its own connection should be delivered to it, if the
1190     * destination is a topic.
1191     * <P>
1192     * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1193     * <CODE>Destination</CODE>, they can be used in the destination
1194     * parameter to create a <CODE>MessageConsumer</CODE>.
1195     * <P>
1196     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1197     * that have been published to a destination.
1198     * <P>
1199     * In some cases, a connection may both publish and subscribe to a topic.
1200     * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1201     * inhibit the delivery of messages published by its own connection. The
1202     * default value for this attribute is False. The <CODE>noLocal</CODE>
1203     * value must be supported by destinations that are topics.
1204     *
1205     * @param destination the <CODE>Destination</CODE> to access
1206     * @param messageSelector only messages with properties matching the message
1207     *                selector expression are delivered. A value of null or an
1208     *                empty string indicates that there is no message selector
1209     *                for the message consumer.
1210     * @param noLocal - if true, and the destination is a topic, inhibits the
1211     *                delivery of messages published by its own connection. The
1212     *                behavior for <CODE>NoLocal</CODE> is not specified if
1213     *                the destination is a queue.
1214     * @param messageListener the listener to use for async consumption of messages
1215     * @return the MessageConsumer
1216     * @throws JMSException if the session fails to create a MessageConsumer due
1217     *                 to some internal error.
1218     * @throws InvalidDestinationException if an invalid destination is
1219     *                 specified.
1220     * @throws InvalidSelectorException if the message selector is invalid.
1221     * @since 1.1
1222     */
1223    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
1224        checkClosed();
1225
1226        if (destination instanceof CustomDestination) {
1227            CustomDestination customDestination = (CustomDestination)destination;
1228            return customDestination.createConsumer(this, messageSelector, noLocal);
1229        }
1230
1231        ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
1232        int prefetch = 0;
1233        if (destination instanceof Topic) {
1234            prefetch = prefetchPolicy.getTopicPrefetch();
1235        } else {
1236            prefetch = prefetchPolicy.getQueuePrefetch();
1237        }
1238        ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
1239        return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
1240                prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener);
1241    }
1242
1243    /**
1244     * Creates a queue identity given a <CODE>Queue</CODE> name.
1245     * <P>
1246     * This facility is provided for the rare cases where clients need to
1247     * dynamically manipulate queue identity. It allows the creation of a queue
1248     * identity with a provider-specific name. Clients that depend on this
1249     * ability are not portable.
1250     * <P>
1251     * Note that this method is not for creating the physical queue. The
1252     * physical creation of queues is an administrative task and is not to be
1253     * initiated by the JMS API. The one exception is the creation of temporary
1254     * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE>
1255     * method.
1256     *
1257     * @param queueName the name of this <CODE>Queue</CODE>
1258     * @return a <CODE>Queue</CODE> with the given name
1259     * @throws JMSException if the session fails to create a queue due to some
1260     *                 internal error.
1261     * @since 1.1
1262     */
1263    @Override
1264    public Queue createQueue(String queueName) throws JMSException {
1265        checkClosed();
1266        if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1267            return new ActiveMQTempQueue(queueName);
1268        }
1269        return new ActiveMQQueue(queueName);
1270    }
1271
1272    /**
1273     * Creates a topic identity given a <CODE>Topic</CODE> name.
1274     * <P>
1275     * This facility is provided for the rare cases where clients need to
1276     * dynamically manipulate topic identity. This allows the creation of a
1277     * topic identity with a provider-specific name. Clients that depend on this
1278     * ability are not portable.
1279     * <P>
1280     * Note that this method is not for creating the physical topic. The
1281     * physical creation of topics is an administrative task and is not to be
1282     * initiated by the JMS API. The one exception is the creation of temporary
1283     * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE>
1284     * method.
1285     *
1286     * @param topicName the name of this <CODE>Topic</CODE>
1287     * @return a <CODE>Topic</CODE> with the given name
1288     * @throws JMSException if the session fails to create a topic due to some
1289     *                 internal error.
1290     * @since 1.1
1291     */
1292    @Override
1293    public Topic createTopic(String topicName) throws JMSException {
1294        checkClosed();
1295        if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1296            return new ActiveMQTempTopic(topicName);
1297        }
1298        return new ActiveMQTopic(topicName);
1299    }
1300
1301    /**
1302     * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1303     * the specified queue.
1304     *
1305     * @param queue the <CODE>queue</CODE> to access
1306     * @exception InvalidDestinationException if an invalid destination is
1307     *                    specified
1308     * @since 1.1
1309     */
1310    /**
1311     * Creates a durable subscriber to the specified topic.
1312     * <P>
1313     * If a client needs to receive all the messages published on a topic,
1314     * including the ones published while the subscriber is inactive, it uses a
1315     * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1316     * record of this durable subscription and insures that all messages from
1317     * the topic's publishers are retained until they are acknowledged by this
1318     * durable subscriber or they have expired.
1319     * <P>
1320     * Sessions with durable subscribers must always provide the same client
1321     * identifier. In addition, each client must specify a name that uniquely
1322     * identifies (within client identifier) each durable subscription it
1323     * creates. Only one session at a time can have a
1324     * <CODE>TopicSubscriber</CODE> for a particular durable subscription.
1325     * <P>
1326     * A client can change an existing durable subscription by creating a
1327     * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1328     * and/or message selector. Changing a durable subscriber is equivalent to
1329     * unsubscribing (deleting) the old one and creating a new one.
1330     * <P>
1331     * In some cases, a connection may both publish and subscribe to a topic.
1332     * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1333     * inhibit the delivery of messages published by its own connection. The
1334     * default value for this attribute is false.
1335     *
1336     * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1337     * @param name the name used to identify this subscription
1338     * @return the TopicSubscriber
1339     * @throws JMSException if the session fails to create a subscriber due to
1340     *                 some internal error.
1341     * @throws InvalidDestinationException if an invalid topic is specified.
1342     * @since 1.1
1343     */
1344    @Override
1345    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
1346        checkClosed();
1347        return createDurableSubscriber(topic, name, null, false);
1348    }
1349
1350    /**
1351     * Creates a durable subscriber to the specified topic, using a message
1352     * selector and specifying whether messages published by its own connection
1353     * should be delivered to it.
1354     * <P>
1355     * If a client needs to receive all the messages published on a topic,
1356     * including the ones published while the subscriber is inactive, it uses a
1357     * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1358     * record of this durable subscription and insures that all messages from
1359     * the topic's publishers are retained until they are acknowledged by this
1360     * durable subscriber or they have expired.
1361     * <P>
1362     * Sessions with durable subscribers must always provide the same client
1363     * identifier. In addition, each client must specify a name which uniquely
1364     * identifies (within client identifier) each durable subscription it
1365     * creates. Only one session at a time can have a
1366     * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An
1367     * inactive durable subscriber is one that exists but does not currently
1368     * have a message consumer associated with it.
1369     * <P>
1370     * A client can change an existing durable subscription by creating a
1371     * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1372     * and/or message selector. Changing a durable subscriber is equivalent to
1373     * unsubscribing (deleting) the old one and creating a new one.
1374     *
1375     * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1376     * @param name the name used to identify this subscription
1377     * @param messageSelector only messages with properties matching the message
1378     *                selector expression are delivered. A value of null or an
1379     *                empty string indicates that there is no message selector
1380     *                for the message consumer.
1381     * @param noLocal if set, inhibits the delivery of messages published by its
1382     *                own connection
1383     * @return the Queue Browser
1384     * @throws JMSException if the session fails to create a subscriber due to
1385     *                 some internal error.
1386     * @throws InvalidDestinationException if an invalid topic is specified.
1387     * @throws InvalidSelectorException if the message selector is invalid.
1388     * @since 1.1
1389     */
1390    @Override
1391    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
1392        checkClosed();
1393
1394        if (topic == null) {
1395            throw new InvalidDestinationException("Topic cannot be null");
1396        }
1397
1398        if (topic instanceof CustomDestination) {
1399            CustomDestination customDestination = (CustomDestination)topic;
1400            return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal);
1401        }
1402
1403        connection.checkClientIDWasManuallySpecified();
1404        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1405        int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch();
1406        int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit();
1407        return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit,
1408                                           noLocal, false, asyncDispatch);
1409    }
1410
1411    /**
1412     * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1413     * the specified queue.
1414     *
1415     * @param queue the <CODE>queue</CODE> to access
1416     * @return the Queue Browser
1417     * @throws JMSException if the session fails to create a browser due to some
1418     *                 internal error.
1419     * @throws InvalidDestinationException if an invalid destination is
1420     *                 specified
1421     * @since 1.1
1422     */
1423    @Override
1424    public QueueBrowser createBrowser(Queue queue) throws JMSException {
1425        checkClosed();
1426        return createBrowser(queue, null);
1427    }
1428
1429    /**
1430     * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1431     * the specified queue using a message selector.
1432     *
1433     * @param queue the <CODE>queue</CODE> to access
1434     * @param messageSelector only messages with properties matching the message
1435     *                selector expression are delivered. A value of null or an
1436     *                empty string indicates that there is no message selector
1437     *                for the message consumer.
1438     * @return the Queue Browser
1439     * @throws JMSException if the session fails to create a browser due to some
1440     *                 internal error.
1441     * @throws InvalidDestinationException if an invalid destination is
1442     *                 specified
1443     * @throws InvalidSelectorException if the message selector is invalid.
1444     * @since 1.1
1445     */
1446    @Override
1447    public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
1448        checkClosed();
1449        return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch);
1450    }
1451
1452    /**
1453     * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that
1454     * of the <CODE>Connection</CODE> unless it is deleted earlier.
1455     *
1456     * @return a temporary queue identity
1457     * @throws JMSException if the session fails to create a temporary queue due
1458     *                 to some internal error.
1459     * @since 1.1
1460     */
1461    @Override
1462    public TemporaryQueue createTemporaryQueue() throws JMSException {
1463        checkClosed();
1464        return (TemporaryQueue)connection.createTempDestination(false);
1465    }
1466
1467    /**
1468     * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that
1469     * of the <CODE>Connection</CODE> unless it is deleted earlier.
1470     *
1471     * @return a temporary topic identity
1472     * @throws JMSException if the session fails to create a temporary topic due
1473     *                 to some internal error.
1474     * @since 1.1
1475     */
1476    @Override
1477    public TemporaryTopic createTemporaryTopic() throws JMSException {
1478        checkClosed();
1479        return (TemporaryTopic)connection.createTempDestination(true);
1480    }
1481
1482    /**
1483     * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1484     * the specified queue.
1485     *
1486     * @param queue the <CODE>Queue</CODE> to access
1487     * @return
1488     * @throws JMSException if the session fails to create a receiver due to
1489     *                 some internal error.
1490     * @throws JMSException
1491     * @throws InvalidDestinationException if an invalid queue is specified.
1492     */
1493    @Override
1494    public QueueReceiver createReceiver(Queue queue) throws JMSException {
1495        checkClosed();
1496        return createReceiver(queue, null);
1497    }
1498
1499    /**
1500     * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1501     * the specified queue using a message selector.
1502     *
1503     * @param queue the <CODE>Queue</CODE> to access
1504     * @param messageSelector only messages with properties matching the message
1505     *                selector expression are delivered. A value of null or an
1506     *                empty string indicates that there is no message selector
1507     *                for the message consumer.
1508     * @return QueueReceiver
1509     * @throws JMSException if the session fails to create a receiver due to
1510     *                 some internal error.
1511     * @throws InvalidDestinationException if an invalid queue is specified.
1512     * @throws InvalidSelectorException if the message selector is invalid.
1513     */
1514    @Override
1515    public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
1516        checkClosed();
1517
1518        if (queue instanceof CustomDestination) {
1519            CustomDestination customDestination = (CustomDestination)queue;
1520            return customDestination.createReceiver(this, messageSelector);
1521        }
1522
1523        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1524        return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(),
1525                                         prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch);
1526    }
1527
1528    /**
1529     * Creates a <CODE>QueueSender</CODE> object to send messages to the
1530     * specified queue.
1531     *
1532     * @param queue the <CODE>Queue</CODE> to access, or null if this is an
1533     *                unidentified producer
1534     * @return QueueSender
1535     * @throws JMSException if the session fails to create a sender due to some
1536     *                 internal error.
1537     * @throws InvalidDestinationException if an invalid queue is specified.
1538     */
1539    @Override
1540    public QueueSender createSender(Queue queue) throws JMSException {
1541        checkClosed();
1542        if (queue instanceof CustomDestination) {
1543            CustomDestination customDestination = (CustomDestination)queue;
1544            return customDestination.createSender(this);
1545        }
1546        int timeSendOut = connection.getSendTimeout();
1547        return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut);
1548    }
1549
1550    /**
1551     * Creates a nondurable subscriber to the specified topic. <p/>
1552     * <P>
1553     * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1554     * that have been published to a topic. <p/>
1555     * <P>
1556     * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1557     * receive only messages that are published while they are active. <p/>
1558     * <P>
1559     * In some cases, a connection may both publish and subscribe to a topic.
1560     * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1561     * inhibit the delivery of messages published by its own connection. The
1562     * default value for this attribute is false.
1563     *
1564     * @param topic the <CODE>Topic</CODE> to subscribe to
1565     * @return TopicSubscriber
1566     * @throws JMSException if the session fails to create a subscriber due to
1567     *                 some internal error.
1568     * @throws InvalidDestinationException if an invalid topic is specified.
1569     */
1570    @Override
1571    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
1572        checkClosed();
1573        return createSubscriber(topic, null, false);
1574    }
1575
1576    /**
1577     * Creates a nondurable subscriber to the specified topic, using a message
1578     * selector or specifying whether messages published by its own connection
1579     * should be delivered to it. <p/>
1580     * <P>
1581     * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1582     * that have been published to a topic. <p/>
1583     * <P>
1584     * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1585     * receive only messages that are published while they are active. <p/>
1586     * <P>
1587     * Messages filtered out by a subscriber's message selector will never be
1588     * delivered to the subscriber. From the subscriber's perspective, they do
1589     * not exist. <p/>
1590     * <P>
1591     * In some cases, a connection may both publish and subscribe to a topic.
1592     * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1593     * inhibit the delivery of messages published by its own connection. The
1594     * default value for this attribute is false.
1595     *
1596     * @param topic the <CODE>Topic</CODE> to subscribe to
1597     * @param messageSelector only messages with properties matching the message
1598     *                selector expression are delivered. A value of null or an
1599     *                empty string indicates that there is no message selector
1600     *                for the message consumer.
1601     * @param noLocal if set, inhibits the delivery of messages published by its
1602     *                own connection
1603     * @return TopicSubscriber
1604     * @throws JMSException if the session fails to create a subscriber due to
1605     *                 some internal error.
1606     * @throws InvalidDestinationException if an invalid topic is specified.
1607     * @throws InvalidSelectorException if the message selector is invalid.
1608     */
1609    @Override
1610    public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
1611        checkClosed();
1612
1613        if (topic instanceof CustomDestination) {
1614            CustomDestination customDestination = (CustomDestination)topic;
1615            return customDestination.createSubscriber(this, messageSelector, noLocal);
1616        }
1617
1618        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1619        return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy
1620            .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch);
1621    }
1622
1623    /**
1624     * Creates a publisher for the specified topic. <p/>
1625     * <P>
1626     * A client uses a <CODE>TopicPublisher</CODE> object to publish messages
1627     * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on
1628     * a topic, it defines a new sequence of messages that have no ordering
1629     * relationship with the messages it has previously sent.
1630     *
1631     * @param topic the <CODE>Topic</CODE> to publish to, or null if this is
1632     *                an unidentified producer
1633     * @return TopicPublisher
1634     * @throws JMSException if the session fails to create a publisher due to
1635     *                 some internal error.
1636     * @throws InvalidDestinationException if an invalid topic is specified.
1637     */
1638    @Override
1639    public TopicPublisher createPublisher(Topic topic) throws JMSException {
1640        checkClosed();
1641
1642        if (topic instanceof CustomDestination) {
1643            CustomDestination customDestination = (CustomDestination)topic;
1644            return customDestination.createPublisher(this);
1645        }
1646        int timeSendOut = connection.getSendTimeout();
1647        return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut);
1648    }
1649
1650    /**
1651     * Unsubscribes a durable subscription that has been created by a client.
1652     * <P>
1653     * This method deletes the state being maintained on behalf of the
1654     * subscriber by its provider.
1655     * <P>
1656     * It is erroneous for a client to delete a durable subscription while there
1657     * is an active <CODE>MessageConsumer </CODE> or
1658     * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
1659     * message is part of a pending transaction or has not been acknowledged in
1660     * the session.
1661     *
1662     * @param name the name used to identify this subscription
1663     * @throws JMSException if the session fails to unsubscribe to the durable
1664     *                 subscription due to some internal error.
1665     * @throws InvalidDestinationException if an invalid subscription name is
1666     *                 specified.
1667     * @since 1.1
1668     */
1669    @Override
1670    public void unsubscribe(String name) throws JMSException {
1671        checkClosed();
1672        connection.unsubscribe(name);
1673    }
1674
1675    @Override
1676    public void dispatch(MessageDispatch messageDispatch) {
1677        try {
1678            executor.execute(messageDispatch);
1679        } catch (InterruptedException e) {
1680            Thread.currentThread().interrupt();
1681            connection.onClientInternalException(e);
1682        }
1683    }
1684
1685    /**
1686     * Acknowledges all consumed messages of the session of this consumed
1687     * message.
1688     * <P>
1689     * All consumed JMS messages support the <CODE>acknowledge</CODE> method
1690     * for use when a client has specified that its JMS session's consumed
1691     * messages are to be explicitly acknowledged. By invoking
1692     * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges
1693     * all messages consumed by the session that the message was delivered to.
1694     * <P>
1695     * Calls to <CODE>acknowledge</CODE> are ignored for both transacted
1696     * sessions and sessions specified to use implicit acknowledgement modes.
1697     * <P>
1698     * A client may individually acknowledge each message as it is consumed, or
1699     * it may choose to acknowledge messages as an application-defined group
1700     * (which is done by calling acknowledge on the last received message of the
1701     * group, thereby acknowledging all messages consumed by the session.)
1702     * <P>
1703     * Messages that have been received but not acknowledged may be redelivered.
1704     *
1705     * @throws JMSException if the JMS provider fails to acknowledge the
1706     *                 messages due to some internal error.
1707     * @throws javax.jms.IllegalStateException if this method is called on a
1708     *                 closed session.
1709     * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
1710     */
1711    public void acknowledge() throws JMSException {
1712        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1713            ActiveMQMessageConsumer c = iter.next();
1714            c.acknowledge();
1715        }
1716    }
1717
1718    /**
1719     * Add a message consumer.
1720     *
1721     * @param consumer - message consumer.
1722     * @throws JMSException
1723     */
1724    protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1725        this.consumers.add(consumer);
1726        if (consumer.isDurableSubscriber()) {
1727            stats.onCreateDurableSubscriber();
1728        }
1729        this.connection.addDispatcher(consumer.getConsumerId(), this);
1730    }
1731
1732    /**
1733     * Remove the message consumer.
1734     *
1735     * @param consumer - consumer to be removed.
1736     * @throws JMSException
1737     */
1738    protected void removeConsumer(ActiveMQMessageConsumer consumer) {
1739        this.connection.removeDispatcher(consumer.getConsumerId());
1740        if (consumer.isDurableSubscriber()) {
1741            stats.onRemoveDurableSubscriber();
1742        }
1743        this.consumers.remove(consumer);
1744        this.connection.removeDispatcher(consumer);
1745    }
1746
1747    /**
1748     * Adds a message producer.
1749     *
1750     * @param producer - message producer to be added.
1751     * @throws JMSException
1752     */
1753    protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
1754        this.producers.add(producer);
1755        this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer);
1756    }
1757
1758    /**
1759     * Removes a message producer.
1760     *
1761     * @param producer - message producer to be removed.
1762     * @throws JMSException
1763     */
1764    protected void removeProducer(ActiveMQMessageProducer producer) {
1765        this.connection.removeProducer(producer.getProducerInfo().getProducerId());
1766        this.producers.remove(producer);
1767    }
1768
1769    /**
1770     * Start this Session.
1771     *
1772     * @throws JMSException
1773     */
1774    protected void start() throws JMSException {
1775        started.set(true);
1776        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1777            ActiveMQMessageConsumer c = iter.next();
1778            c.start();
1779        }
1780        executor.start();
1781    }
1782
1783    /**
1784     * Stops this session.
1785     *
1786     * @throws JMSException
1787     */
1788    protected void stop() throws JMSException {
1789
1790        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1791            ActiveMQMessageConsumer c = iter.next();
1792            c.stop();
1793        }
1794
1795        started.set(false);
1796        executor.stop();
1797    }
1798
1799    /**
1800     * Returns the session id.
1801     *
1802     * @return value - session id.
1803     */
1804    protected SessionId getSessionId() {
1805        return info.getSessionId();
1806    }
1807
1808    /**
1809     * @return
1810     */
1811    protected ConsumerId getNextConsumerId() {
1812        return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId());
1813    }
1814
1815    /**
1816     * @return
1817     */
1818    protected ProducerId getNextProducerId() {
1819        return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId());
1820    }
1821
1822    /**
1823     * Sends the message for dispatch by the broker.
1824     *
1825     *
1826     * @param producer - message producer.
1827     * @param destination - message destination.
1828     * @param message - message to be sent.
1829     * @param deliveryMode - JMS messsage delivery mode.
1830     * @param priority - message priority.
1831     * @param timeToLive - message expiration.
1832     * @param producerWindow
1833     * @param onComplete
1834     * @throws JMSException
1835     */
1836    protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
1837                        MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
1838
1839        checkClosed();
1840        if (destination.isTemporary() && connection.isDeleted(destination)) {
1841            throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
1842        }
1843        synchronized (sendMutex) {
1844            // tell the Broker we are about to start a new transaction
1845            doStartTransaction();
1846            TransactionId txid = transactionContext.getTransactionId();
1847            long sequenceNumber = producer.getMessageSequence();
1848
1849            //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
1850            message.setJMSDeliveryMode(deliveryMode);
1851            long expiration = 0L;
1852            if (!producer.getDisableMessageTimestamp()) {
1853                long timeStamp = System.currentTimeMillis();
1854                message.setJMSTimestamp(timeStamp);
1855                if (timeToLive > 0) {
1856                    expiration = timeToLive + timeStamp;
1857                }
1858            }
1859            message.setJMSExpiration(expiration);
1860            message.setJMSPriority(priority);
1861            message.setJMSRedelivered(false);
1862
1863            // transform to our own message format here
1864            ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
1865            msg.setDestination(destination);
1866            msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
1867
1868            // Set the message id.
1869            if (msg != message) {
1870                message.setJMSMessageID(msg.getMessageId().toString());
1871                // Make sure the JMS destination is set on the foreign messages too.
1872                message.setJMSDestination(destination);
1873            }
1874            //clear the brokerPath in case we are re-sending this message
1875            msg.setBrokerPath(null);
1876
1877            msg.setTransactionId(txid);
1878            if (connection.isCopyMessageOnSend()) {
1879                msg = (ActiveMQMessage)msg.copy();
1880            }
1881            msg.setConnection(connection);
1882            msg.onSend();
1883            msg.setProducerId(msg.getMessageId().getProducerId());
1884            if (LOG.isTraceEnabled()) {
1885                LOG.trace(getSessionId() + " sending message: " + msg);
1886            }
1887            if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
1888                this.connection.asyncSendPacket(msg);
1889                if (producerWindow != null) {
1890                    // Since we defer lots of the marshaling till we hit the
1891                    // wire, this might not
1892                    // provide and accurate size. We may change over to doing
1893                    // more aggressive marshaling,
1894                    // to get more accurate sizes.. this is more important once
1895                    // users start using producer window
1896                    // flow control.
1897                    int size = msg.getSize();
1898                    producerWindow.increaseUsage(size);
1899                }
1900            } else {
1901                if (sendTimeout > 0 && onComplete==null) {
1902                    this.connection.syncSendPacket(msg,sendTimeout);
1903                }else {
1904                    this.connection.syncSendPacket(msg, onComplete);
1905                }
1906            }
1907
1908        }
1909    }
1910
1911    /**
1912     * Send TransactionInfo to indicate transaction has started
1913     *
1914     * @throws JMSException if some internal error occurs
1915     */
1916    protected void doStartTransaction() throws JMSException {
1917        if (getTransacted() && !transactionContext.isInXATransaction()) {
1918            transactionContext.begin();
1919        }
1920    }
1921
1922    /**
1923     * Checks whether the session has unconsumed messages.
1924     *
1925     * @return true - if there are unconsumed messages.
1926     */
1927    public boolean hasUncomsumedMessages() {
1928        return executor.hasUncomsumedMessages();
1929    }
1930
1931    /**
1932     * Checks whether the session uses transactions.
1933     *
1934     * @return true - if the session uses transactions.
1935     */
1936    public boolean isTransacted() {
1937        return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction());
1938    }
1939
1940    /**
1941     * Checks whether the session used client acknowledgment.
1942     *
1943     * @return true - if the session uses client acknowledgment.
1944     */
1945    protected boolean isClientAcknowledge() {
1946        return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE;
1947    }
1948
1949    /**
1950     * Checks whether the session used auto acknowledgment.
1951     *
1952     * @return true - if the session uses client acknowledgment.
1953     */
1954    public boolean isAutoAcknowledge() {
1955        return acknowledgementMode == Session.AUTO_ACKNOWLEDGE;
1956    }
1957
1958    /**
1959     * Checks whether the session used dup ok acknowledgment.
1960     *
1961     * @return true - if the session uses client acknowledgment.
1962     */
1963    public boolean isDupsOkAcknowledge() {
1964        return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
1965    }
1966
1967    public boolean isIndividualAcknowledge(){
1968        return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
1969    }
1970
1971    /**
1972     * Returns the message delivery listener.
1973     *
1974     * @return deliveryListener - message delivery listener.
1975     */
1976    public DeliveryListener getDeliveryListener() {
1977        return deliveryListener;
1978    }
1979
1980    /**
1981     * Sets the message delivery listener.
1982     *
1983     * @param deliveryListener - message delivery listener.
1984     */
1985    public void setDeliveryListener(DeliveryListener deliveryListener) {
1986        this.deliveryListener = deliveryListener;
1987    }
1988
1989    /**
1990     * Returns the SessionInfo bean.
1991     *
1992     * @return info - SessionInfo bean.
1993     * @throws JMSException
1994     */
1995    protected SessionInfo getSessionInfo() throws JMSException {
1996        SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue());
1997        return info;
1998    }
1999
2000    /**
2001     * Send the asynchronus command.
2002     *
2003     * @param command - command to be executed.
2004     * @throws JMSException
2005     */
2006    public void asyncSendPacket(Command command) throws JMSException {
2007        connection.asyncSendPacket(command);
2008    }
2009
2010    /**
2011     * Send the synchronus command.
2012     *
2013     * @param command - command to be executed.
2014     * @return Response
2015     * @throws JMSException
2016     */
2017    public Response syncSendPacket(Command command) throws JMSException {
2018        return connection.syncSendPacket(command);
2019    }
2020
2021    public long getNextDeliveryId() {
2022        return deliveryIdGenerator.getNextSequenceId();
2023    }
2024
2025    public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException {
2026
2027        List<MessageDispatch> c = unconsumedMessages.removeAll();
2028        for (MessageDispatch md : c) {
2029            this.connection.rollbackDuplicate(dispatcher, md.getMessage());
2030        }
2031        Collections.reverse(c);
2032
2033        for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) {
2034            MessageDispatch md = iter.next();
2035            executor.executeFirst(md);
2036        }
2037
2038    }
2039
2040    public boolean isRunning() {
2041        return started.get();
2042    }
2043
2044    public boolean isAsyncDispatch() {
2045        return asyncDispatch;
2046    }
2047
2048    public void setAsyncDispatch(boolean asyncDispatch) {
2049        this.asyncDispatch = asyncDispatch;
2050    }
2051
2052    /**
2053     * @return Returns the sessionAsyncDispatch.
2054     */
2055    public boolean isSessionAsyncDispatch() {
2056        return sessionAsyncDispatch;
2057    }
2058
2059    /**
2060     * @param sessionAsyncDispatch The sessionAsyncDispatch to set.
2061     */
2062    public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) {
2063        this.sessionAsyncDispatch = sessionAsyncDispatch;
2064    }
2065
2066    public MessageTransformer getTransformer() {
2067        return transformer;
2068    }
2069
2070    public ActiveMQConnection getConnection() {
2071        return connection;
2072    }
2073
2074    /**
2075     * Sets the transformer used to transform messages before they are sent on
2076     * to the JMS bus or when they are received from the bus but before they are
2077     * delivered to the JMS client
2078     */
2079    public void setTransformer(MessageTransformer transformer) {
2080        this.transformer = transformer;
2081    }
2082
2083    public BlobTransferPolicy getBlobTransferPolicy() {
2084        return blobTransferPolicy;
2085    }
2086
2087    /**
2088     * Sets the policy used to describe how out-of-band BLOBs (Binary Large
2089     * OBjects) are transferred from producers to brokers to consumers
2090     */
2091    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
2092        this.blobTransferPolicy = blobTransferPolicy;
2093    }
2094
2095    public List<MessageDispatch> getUnconsumedMessages() {
2096        return executor.getUnconsumedMessages();
2097    }
2098
2099    @Override
2100    public String toString() {
2101        return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "}";
2102    }
2103
2104    public void checkMessageListener() throws JMSException {
2105        if (messageListener != null) {
2106            throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
2107        }
2108        for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) {
2109            ActiveMQMessageConsumer consumer = i.next();
2110            if (consumer.hasMessageListener()) {
2111                throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
2112            }
2113        }
2114    }
2115
2116    protected void setOptimizeAcknowledge(boolean value) {
2117        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2118            ActiveMQMessageConsumer c = iter.next();
2119            c.setOptimizeAcknowledge(value);
2120        }
2121    }
2122
2123    protected void setPrefetchSize(ConsumerId id, int prefetch) {
2124        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2125            ActiveMQMessageConsumer c = iter.next();
2126            if (c.getConsumerId().equals(id)) {
2127                c.setPrefetchSize(prefetch);
2128                break;
2129            }
2130        }
2131    }
2132
2133    protected void close(ConsumerId id) {
2134        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2135            ActiveMQMessageConsumer c = iter.next();
2136            if (c.getConsumerId().equals(id)) {
2137                try {
2138                    c.close();
2139                } catch (JMSException e) {
2140                    LOG.warn("Exception closing consumer", e);
2141                }
2142                LOG.warn("Closed consumer on Command, " + id);
2143                break;
2144            }
2145        }
2146    }
2147
2148    public boolean isInUse(ActiveMQTempDestination destination) {
2149        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2150            ActiveMQMessageConsumer c = iter.next();
2151            if (c.isInUse(destination)) {
2152                return true;
2153            }
2154        }
2155        return false;
2156    }
2157
2158    /**
2159     * highest sequence id of the last message delivered by this session.
2160     * Passed to the broker in the close command, maintained by dispose()
2161     * @return lastDeliveredSequenceId
2162     */
2163    public long getLastDeliveredSequenceId() {
2164        return lastDeliveredSequenceId;
2165    }
2166
2167    protected void sendAck(MessageAck ack) throws JMSException {
2168        sendAck(ack,false);
2169    }
2170
2171    protected void sendAck(MessageAck ack, boolean lazy) throws JMSException {
2172        if (lazy || connection.isSendAcksAsync() || getTransacted()) {
2173            asyncSendPacket(ack);
2174        } else {
2175            syncSendPacket(ack);
2176        }
2177    }
2178
2179    protected Scheduler getScheduler() throws JMSException {
2180        return this.connection.getScheduler();
2181    }
2182
2183    protected ThreadPoolExecutor getConnectionExecutor() {
2184        return this.connectionExecutor;
2185    }
2186}