001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.IOException;
020import java.io.InputStream;
021import java.io.OutputStream;
022import java.net.URI;
023import java.net.URISyntaxException;
024import java.util.HashMap;
025import java.util.Iterator;
026import java.util.Map;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.CopyOnWriteArrayList;
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.LinkedBlockingQueue;
031import java.util.concurrent.RejectedExecutionHandler;
032import java.util.concurrent.ThreadFactory;
033import java.util.concurrent.ThreadPoolExecutor;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.atomic.AtomicBoolean;
036import java.util.concurrent.atomic.AtomicInteger;
037
038import javax.jms.Connection;
039import javax.jms.ConnectionConsumer;
040import javax.jms.ConnectionMetaData;
041import javax.jms.DeliveryMode;
042import javax.jms.Destination;
043import javax.jms.ExceptionListener;
044import javax.jms.IllegalStateException;
045import javax.jms.InvalidDestinationException;
046import javax.jms.JMSException;
047import javax.jms.Queue;
048import javax.jms.QueueConnection;
049import javax.jms.QueueSession;
050import javax.jms.ServerSessionPool;
051import javax.jms.Session;
052import javax.jms.Topic;
053import javax.jms.TopicConnection;
054import javax.jms.TopicSession;
055import javax.jms.XAConnection;
056
057import org.apache.activemq.advisory.DestinationSource;
058import org.apache.activemq.blob.BlobTransferPolicy;
059import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
060import org.apache.activemq.command.ActiveMQDestination;
061import org.apache.activemq.command.ActiveMQMessage;
062import org.apache.activemq.command.ActiveMQTempDestination;
063import org.apache.activemq.command.ActiveMQTempQueue;
064import org.apache.activemq.command.ActiveMQTempTopic;
065import org.apache.activemq.command.BrokerInfo;
066import org.apache.activemq.command.Command;
067import org.apache.activemq.command.CommandTypes;
068import org.apache.activemq.command.ConnectionControl;
069import org.apache.activemq.command.ConnectionError;
070import org.apache.activemq.command.ConnectionId;
071import org.apache.activemq.command.ConnectionInfo;
072import org.apache.activemq.command.ConsumerControl;
073import org.apache.activemq.command.ConsumerId;
074import org.apache.activemq.command.ConsumerInfo;
075import org.apache.activemq.command.ControlCommand;
076import org.apache.activemq.command.DestinationInfo;
077import org.apache.activemq.command.ExceptionResponse;
078import org.apache.activemq.command.Message;
079import org.apache.activemq.command.MessageDispatch;
080import org.apache.activemq.command.MessageId;
081import org.apache.activemq.command.ProducerAck;
082import org.apache.activemq.command.ProducerId;
083import org.apache.activemq.command.RemoveInfo;
084import org.apache.activemq.command.RemoveSubscriptionInfo;
085import org.apache.activemq.command.Response;
086import org.apache.activemq.command.SessionId;
087import org.apache.activemq.command.ShutdownInfo;
088import org.apache.activemq.command.WireFormatInfo;
089import org.apache.activemq.management.JMSConnectionStatsImpl;
090import org.apache.activemq.management.JMSStatsImpl;
091import org.apache.activemq.management.StatsCapable;
092import org.apache.activemq.management.StatsImpl;
093import org.apache.activemq.state.CommandVisitorAdapter;
094import org.apache.activemq.thread.Scheduler;
095import org.apache.activemq.thread.TaskRunnerFactory;
096import org.apache.activemq.transport.FutureResponse;
097import org.apache.activemq.transport.RequestTimedOutIOException;
098import org.apache.activemq.transport.ResponseCallback;
099import org.apache.activemq.transport.Transport;
100import org.apache.activemq.transport.TransportListener;
101import org.apache.activemq.transport.failover.FailoverTransport;
102import org.apache.activemq.util.IdGenerator;
103import org.apache.activemq.util.IntrospectionSupport;
104import org.apache.activemq.util.JMSExceptionSupport;
105import org.apache.activemq.util.LongSequenceGenerator;
106import org.apache.activemq.util.ServiceSupport;
107import org.apache.activemq.util.ThreadPoolUtils;
108import org.slf4j.Logger;
109import org.slf4j.LoggerFactory;
110
111public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection {
112
113    public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
114    public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
115    public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
116    public static int DEFAULT_THREAD_POOL_SIZE = 1000;
117
118    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class);
119
120    public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
121
122    protected boolean dispatchAsync=true;
123    protected boolean alwaysSessionAsync = true;
124
125    private TaskRunnerFactory sessionTaskRunner;
126    private final ThreadPoolExecutor executor;
127
128    // Connection state variables
129    private final ConnectionInfo info;
130    private ExceptionListener exceptionListener;
131    private ClientInternalExceptionListener clientInternalExceptionListener;
132    private boolean clientIDSet;
133    private boolean isConnectionInfoSentToBroker;
134    private boolean userSpecifiedClientID;
135
136    // Configuration options variables
137    private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
138    private BlobTransferPolicy blobTransferPolicy;
139    private RedeliveryPolicyMap redeliveryPolicyMap;
140    private MessageTransformer transformer;
141
142    private boolean disableTimeStampsByDefault;
143    private boolean optimizedMessageDispatch = true;
144    private boolean copyMessageOnSend = true;
145    private boolean useCompression;
146    private boolean objectMessageSerializationDefered;
147    private boolean useAsyncSend;
148    private boolean optimizeAcknowledge;
149    private long optimizeAcknowledgeTimeOut = 0;
150    private long optimizedAckScheduledAckInterval = 0;
151    private boolean nestedMapAndListEnabled = true;
152    private boolean useRetroactiveConsumer;
153    private boolean exclusiveConsumer;
154    private boolean alwaysSyncSend;
155    private int closeTimeout = 15000;
156    private boolean watchTopicAdvisories = true;
157    private long warnAboutUnstartedConnectionTimeout = 500L;
158    private int sendTimeout =0;
159    private boolean sendAcksAsync=true;
160    private boolean checkForDuplicates = true;
161    private boolean queueOnlyConnection = false;
162    private boolean consumerExpiryCheckEnabled = true;
163
164    private final Transport transport;
165    private final IdGenerator clientIdGenerator;
166    private final JMSStatsImpl factoryStats;
167    private final JMSConnectionStatsImpl stats;
168
169    private final AtomicBoolean started = new AtomicBoolean(false);
170    private final AtomicBoolean closing = new AtomicBoolean(false);
171    private final AtomicBoolean closed = new AtomicBoolean(false);
172    private final AtomicBoolean transportFailed = new AtomicBoolean(false);
173    private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
174    private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
175    private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
176    // Stream are deprecated and will be removed in a later release.
177    private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>();
178    private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>();
179
180    // Maps ConsumerIds to ActiveMQConsumer objects
181    private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
182    private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
183    private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
184    private final SessionId connectionSessionId;
185    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
186    private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
187    private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
188    private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
189
190    private AdvisoryConsumer advisoryConsumer;
191    private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
192    private BrokerInfo brokerInfo;
193    private IOException firstFailureError;
194    private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
195
196    // Assume that protocol is the latest. Change to the actual protocol
197    // version when a WireFormatInfo is received.
198    private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
199    private final long timeCreated;
200    private final ConnectionAudit connectionAudit = new ConnectionAudit();
201    private DestinationSource destinationSource;
202    private final Object ensureConnectionInfoSentMutex = new Object();
203    private boolean useDedicatedTaskRunner;
204    protected AtomicInteger transportInterruptionProcessingComplete = new AtomicInteger(0);
205    private long consumerFailoverRedeliveryWaitPeriod;
206    private Scheduler scheduler;
207    private boolean messagePrioritySupported = false;
208    private boolean transactedIndividualAck = false;
209    private boolean nonBlockingRedelivery = false;
210    private boolean rmIdFromConnectionId = false;
211
212    private int maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE;
213    private RejectedExecutionHandler rejectedTaskHandler = null;
214
215    /**
216     * Construct an <code>ActiveMQConnection</code>
217     *
218     * @param transport
219     * @param factoryStats
220     * @throws Exception
221     */
222    protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
223
224        this.transport = transport;
225        this.clientIdGenerator = clientIdGenerator;
226        this.factoryStats = factoryStats;
227
228        // Configure a single threaded executor who's core thread can timeout if
229        // idle
230        executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
231            @Override
232            public Thread newThread(Runnable r) {
233                Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
234                //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
235                //thread.setDaemon(true);
236                return thread;
237            }
238        });
239        // asyncConnectionThread.allowCoreThreadTimeOut(true);
240        String uniqueId = connectionIdGenerator.generateId();
241        this.info = new ConnectionInfo(new ConnectionId(uniqueId));
242        this.info.setManageable(true);
243        this.info.setFaultTolerant(transport.isFaultTolerant());
244        this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
245
246        this.transport.setTransportListener(this);
247
248        this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
249        this.factoryStats.addConnection(this);
250        this.timeCreated = System.currentTimeMillis();
251        this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
252    }
253
254    protected void setUserName(String userName) {
255        this.info.setUserName(userName);
256    }
257
258    protected void setPassword(String password) {
259        this.info.setPassword(password);
260    }
261
262    /**
263     * A static helper method to create a new connection
264     *
265     * @return an ActiveMQConnection
266     * @throws JMSException
267     */
268    public static ActiveMQConnection makeConnection() throws JMSException {
269        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
270        return (ActiveMQConnection)factory.createConnection();
271    }
272
273    /**
274     * A static helper method to create a new connection
275     *
276     * @param uri
277     * @return and ActiveMQConnection
278     * @throws JMSException
279     */
280    public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
281        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
282        return (ActiveMQConnection)factory.createConnection();
283    }
284
285    /**
286     * A static helper method to create a new connection
287     *
288     * @param user
289     * @param password
290     * @param uri
291     * @return an ActiveMQConnection
292     * @throws JMSException
293     */
294    public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
295        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
296        return (ActiveMQConnection)factory.createConnection();
297    }
298
299    /**
300     * @return a number unique for this connection
301     */
302    public JMSConnectionStatsImpl getConnectionStats() {
303        return stats;
304    }
305
306    /**
307     * Creates a <CODE>Session</CODE> object.
308     *
309     * @param transacted indicates whether the session is transacted
310     * @param acknowledgeMode indicates whether the consumer or the client will
311     *                acknowledge any messages it receives; ignored if the
312     *                session is transacted. Legal values are
313     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
314     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
315     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
316     * @return a newly created session
317     * @throws JMSException if the <CODE>Connection</CODE> object fails to
318     *                 create a session due to some internal error or lack of
319     *                 support for the specific transaction and acknowledgement
320     *                 mode.
321     * @see Session#AUTO_ACKNOWLEDGE
322     * @see Session#CLIENT_ACKNOWLEDGE
323     * @see Session#DUPS_OK_ACKNOWLEDGE
324     * @since 1.1
325     */
326    @Override
327    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
328        checkClosedOrFailed();
329        ensureConnectionInfoSent();
330        if(!transacted) {
331            if (acknowledgeMode==Session.SESSION_TRANSACTED) {
332                throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
333            } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
334                throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
335                        "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
336            }
337        }
338        return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
339            ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
340    }
341
342    /**
343     * @return sessionId
344     */
345    protected SessionId getNextSessionId() {
346        return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
347    }
348
349    /**
350     * Gets the client identifier for this connection.
351     * <P>
352     * This value is specific to the JMS provider. It is either preconfigured by
353     * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
354     * dynamically by the application by calling the <code>setClientID</code>
355     * method.
356     *
357     * @return the unique client identifier
358     * @throws JMSException if the JMS provider fails to return the client ID
359     *                 for this connection due to some internal error.
360     */
361    @Override
362    public String getClientID() throws JMSException {
363        checkClosedOrFailed();
364        return this.info.getClientId();
365    }
366
367    /**
368     * Sets the client identifier for this connection.
369     * <P>
370     * The preferred way to assign a JMS client's client identifier is for it to
371     * be configured in a client-specific <CODE>ConnectionFactory</CODE>
372     * object and transparently assigned to the <CODE>Connection</CODE> object
373     * it creates.
374     * <P>
375     * Alternatively, a client can set a connection's client identifier using a
376     * provider-specific value. The facility to set a connection's client
377     * identifier explicitly is not a mechanism for overriding the identifier
378     * that has been administratively configured. It is provided for the case
379     * where no administratively specified identifier exists. If one does exist,
380     * an attempt to change it by setting it must throw an
381     * <CODE>IllegalStateException</CODE>. If a client sets the client
382     * identifier explicitly, it must do so immediately after it creates the
383     * connection and before any other action on the connection is taken. After
384     * this point, setting the client identifier is a programming error that
385     * should throw an <CODE>IllegalStateException</CODE>.
386     * <P>
387     * The purpose of the client identifier is to associate a connection and its
388     * objects with a state maintained on behalf of the client by a provider.
389     * The only such state identified by the JMS API is that required to support
390     * durable subscriptions.
391     * <P>
392     * If another connection with the same <code>clientID</code> is already
393     * running when this method is called, the JMS provider should detect the
394     * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
395     *
396     * @param newClientID the unique client identifier
397     * @throws JMSException if the JMS provider fails to set the client ID for
398     *                 this connection due to some internal error.
399     * @throws javax.jms.InvalidClientIDException if the JMS client specifies an
400     *                 invalid or duplicate client ID.
401     * @throws javax.jms.IllegalStateException if the JMS client attempts to set
402     *                 a connection's client ID at the wrong time or when it has
403     *                 been administratively configured.
404     */
405    @Override
406    public void setClientID(String newClientID) throws JMSException {
407        checkClosedOrFailed();
408
409        if (this.clientIDSet) {
410            throw new IllegalStateException("The clientID has already been set");
411        }
412
413        if (this.isConnectionInfoSentToBroker) {
414            throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
415        }
416
417        this.info.setClientId(newClientID);
418        this.userSpecifiedClientID = true;
419        ensureConnectionInfoSent();
420    }
421
422    /**
423     * Sets the default client id that the connection will use if explicitly not
424     * set with the setClientId() call.
425     */
426    public void setDefaultClientID(String clientID) throws JMSException {
427        this.info.setClientId(clientID);
428        this.userSpecifiedClientID = true;
429    }
430
431    /**
432     * Gets the metadata for this connection.
433     *
434     * @return the connection metadata
435     * @throws JMSException if the JMS provider fails to get the connection
436     *                 metadata for this connection.
437     * @see javax.jms.ConnectionMetaData
438     */
439    @Override
440    public ConnectionMetaData getMetaData() throws JMSException {
441        checkClosedOrFailed();
442        return ActiveMQConnectionMetaData.INSTANCE;
443    }
444
445    /**
446     * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
447     * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
448     * associated with it.
449     *
450     * @return the <CODE>ExceptionListener</CODE> for this connection, or
451     *         null, if no <CODE>ExceptionListener</CODE> is associated with
452     *         this connection.
453     * @throws JMSException if the JMS provider fails to get the
454     *                 <CODE>ExceptionListener</CODE> for this connection.
455     * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
456     */
457    @Override
458    public ExceptionListener getExceptionListener() throws JMSException {
459        checkClosedOrFailed();
460        return this.exceptionListener;
461    }
462
463    /**
464     * Sets an exception listener for this connection.
465     * <P>
466     * If a JMS provider detects a serious problem with a connection, it informs
467     * the connection's <CODE> ExceptionListener</CODE>, if one has been
468     * registered. It does this by calling the listener's <CODE>onException
469     * </CODE>
470     * method, passing it a <CODE>JMSException</CODE> object describing the
471     * problem.
472     * <P>
473     * An exception listener allows a client to be notified of a problem
474     * asynchronously. Some connections only consume messages, so they would
475     * have no other way to learn their connection has failed.
476     * <P>
477     * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
478     * <P>
479     * A JMS provider should attempt to resolve connection problems itself
480     * before it notifies the client of them.
481     *
482     * @param listener the exception listener
483     * @throws JMSException if the JMS provider fails to set the exception
484     *                 listener for this connection.
485     */
486    @Override
487    public void setExceptionListener(ExceptionListener listener) throws JMSException {
488        checkClosedOrFailed();
489        this.exceptionListener = listener;
490    }
491
492    /**
493     * Gets the <code>ClientInternalExceptionListener</code> object for this connection.
494     * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
495     * associated with it.
496     *
497     * @return the listener or <code>null</code> if no listener is registered with the connection.
498     */
499    public ClientInternalExceptionListener getClientInternalExceptionListener() {
500        return clientInternalExceptionListener;
501    }
502
503    /**
504     * Sets a client internal exception listener for this connection.
505     * The connection will notify the listener, if one has been registered, of exceptions thrown by container components
506     * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
507     * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
508     * describing the problem.
509     *
510     * @param listener the exception listener
511     */
512    public void setClientInternalExceptionListener(ClientInternalExceptionListener listener) {
513        this.clientInternalExceptionListener = listener;
514    }
515
516    /**
517     * Starts (or restarts) a connection's delivery of incoming messages. A call
518     * to <CODE>start</CODE> on a connection that has already been started is
519     * ignored.
520     *
521     * @throws JMSException if the JMS provider fails to start message delivery
522     *                 due to some internal error.
523     * @see javax.jms.Connection#stop()
524     */
525    @Override
526    public void start() throws JMSException {
527        checkClosedOrFailed();
528        ensureConnectionInfoSent();
529        if (started.compareAndSet(false, true)) {
530            for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
531                ActiveMQSession session = i.next();
532                session.start();
533            }
534        }
535    }
536
537    /**
538     * Temporarily stops a connection's delivery of incoming messages. Delivery
539     * can be restarted using the connection's <CODE>start</CODE> method. When
540     * the connection is stopped, delivery to all the connection's message
541     * consumers is inhibited: synchronous receives block, and messages are not
542     * delivered to message listeners.
543     * <P>
544     * This call blocks until receives and/or message listeners in progress have
545     * completed.
546     * <P>
547     * Stopping a connection has no effect on its ability to send messages. A
548     * call to <CODE>stop</CODE> on a connection that has already been stopped
549     * is ignored.
550     * <P>
551     * A call to <CODE>stop</CODE> must not return until delivery of messages
552     * has paused. This means that a client can rely on the fact that none of
553     * its message listeners will be called and that all threads of control
554     * waiting for <CODE>receive</CODE> calls to return will not return with a
555     * message until the connection is restarted. The receive timers for a
556     * stopped connection continue to advance, so receives may time out while
557     * the connection is stopped.
558     * <P>
559     * If message listeners are running when <CODE>stop</CODE> is invoked, the
560     * <CODE>stop</CODE> call must wait until all of them have returned before
561     * it may return. While these message listeners are completing, they must
562     * have the full services of the connection available to them.
563     *
564     * @throws JMSException if the JMS provider fails to stop message delivery
565     *                 due to some internal error.
566     * @see javax.jms.Connection#start()
567     */
568    @Override
569    public void stop() throws JMSException {
570        doStop(true);
571    }
572
573    /**
574     * @see #stop()
575     * @param checkClosed <tt>true</tt> to check for already closed and throw {@link java.lang.IllegalStateException} if already closed,
576     *                    <tt>false</tt> to skip this check
577     * @throws JMSException if the JMS provider fails to stop message delivery due to some internal error.
578     */
579    void doStop(boolean checkClosed) throws JMSException {
580        if (checkClosed) {
581            checkClosedOrFailed();
582        }
583        if (started.compareAndSet(true, false)) {
584            synchronized(sessions) {
585                for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
586                    ActiveMQSession s = i.next();
587                    s.stop();
588                }
589            }
590        }
591    }
592
593    /**
594     * Closes the connection.
595     * <P>
596     * Since a provider typically allocates significant resources outside the
597     * JVM on behalf of a connection, clients should close these resources when
598     * they are not needed. Relying on garbage collection to eventually reclaim
599     * these resources may not be timely enough.
600     * <P>
601     * There is no need to close the sessions, producers, and consumers of a
602     * closed connection.
603     * <P>
604     * Closing a connection causes all temporary destinations to be deleted.
605     * <P>
606     * When this method is invoked, it should not return until message
607     * processing has been shut down in an orderly fashion. This means that all
608     * message listeners that may have been running have returned, and that all
609     * pending receives have returned. A close terminates all pending message
610     * receives on the connection's sessions' consumers. The receives may return
611     * with a message or with null, depending on whether there was a message
612     * available at the time of the close. If one or more of the connection's
613     * sessions' message listeners is processing a message at the time when
614     * connection <CODE>close</CODE> is invoked, all the facilities of the
615     * connection and its sessions must remain available to those listeners
616     * until they return control to the JMS provider.
617     * <P>
618     * Closing a connection causes any of its sessions' transactions in progress
619     * to be rolled back. In the case where a session's work is coordinated by
620     * an external transaction manager, a session's <CODE>commit</CODE> and
621     * <CODE> rollback</CODE> methods are not used and the result of a closed
622     * session's work is determined later by the transaction manager. Closing a
623     * connection does NOT force an acknowledgment of client-acknowledged
624     * sessions.
625     * <P>
626     * Invoking the <CODE>acknowledge</CODE> method of a received message from
627     * a closed connection's session must throw an
628     * <CODE>IllegalStateException</CODE>. Closing a closed connection must
629     * NOT throw an exception.
630     *
631     * @throws JMSException if the JMS provider fails to close the connection
632     *                 due to some internal error. For example, a failure to
633     *                 release resources or to close a socket connection can
634     *                 cause this exception to be thrown.
635     */
636    @Override
637    public void close() throws JMSException {
638        // Store the interrupted state and clear so that cleanup happens without
639        // leaking connection resources.  Reset in finally to preserve state.
640        boolean interrupted = Thread.interrupted();
641
642        try {
643
644            // If we were running, lets stop first.
645            if (!closed.get() && !transportFailed.get()) {
646                // do not fail if already closed as according to JMS spec we must not
647                // throw exception if already closed
648                doStop(false);
649            }
650
651            synchronized (this) {
652                if (!closed.get()) {
653                    closing.set(true);
654
655                    if (destinationSource != null) {
656                        destinationSource.stop();
657                        destinationSource = null;
658                    }
659                    if (advisoryConsumer != null) {
660                        advisoryConsumer.dispose();
661                        advisoryConsumer = null;
662                    }
663
664                    Scheduler scheduler = this.scheduler;
665                    if (scheduler != null) {
666                        try {
667                            scheduler.stop();
668                        } catch (Exception e) {
669                            JMSException ex =  JMSExceptionSupport.create(e);
670                            throw ex;
671                        }
672                    }
673
674                    long lastDeliveredSequenceId = 0;
675                    for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
676                        ActiveMQSession s = i.next();
677                        s.dispose();
678                        lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
679                    }
680                    for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
681                        ActiveMQConnectionConsumer c = i.next();
682                        c.dispose();
683                    }
684                    // Stream are deprecated and will be removed in a later release.
685                    for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
686                        ActiveMQInputStream c = i.next();
687                        c.dispose();
688                    }
689                    for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
690                        ActiveMQOutputStream c = i.next();
691                        c.dispose();
692                    }
693
694                    this.activeTempDestinations.clear();
695
696                    if (isConnectionInfoSentToBroker) {
697                        // If we announced ourselves to the broker.. Try to let the broker
698                        // know that the connection is being shutdown.
699                        RemoveInfo removeCommand = info.createRemoveCommand();
700                        removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
701                        try {
702                            doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
703                        } catch (JMSException e) {
704                            if (e.getCause() instanceof RequestTimedOutIOException) {
705                                // expected
706                            } else {
707                                throw e;
708                            }
709                        }
710                        doAsyncSendPacket(new ShutdownInfo());
711                    }
712
713                    started.set(false);
714
715                    // TODO if we move the TaskRunnerFactory to the connection
716                    // factory
717                    // then we may need to call
718                    // factory.onConnectionClose(this);
719                    if (sessionTaskRunner != null) {
720                        sessionTaskRunner.shutdown();
721                    }
722                    closed.set(true);
723                    closing.set(false);
724                }
725            }
726        } finally {
727            try {
728                if (executor != null) {
729                    ThreadPoolUtils.shutdown(executor);
730                }
731            } catch (Throwable e) {
732                LOG.warn("Error shutting down thread pool: " + executor + ". This exception will be ignored.", e);
733            }
734
735            ServiceSupport.dispose(this.transport);
736
737            factoryStats.removeConnection(this);
738            if (interrupted) {
739                Thread.currentThread().interrupt();
740            }
741        }
742    }
743
744    /**
745     * Tells the broker to terminate its VM. This can be used to cleanly
746     * terminate a broker running in a standalone java process. Server must have
747     * property enable.vm.shutdown=true defined to allow this to work.
748     */
749    // TODO : org.apache.activemq.message.BrokerAdminCommand not yet
750    // implemented.
751    /*
752     * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
753     * command = new BrokerAdminCommand();
754     * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
755     * asyncSendPacket(command); }
756     */
757
758    /**
759     * Create a durable connection consumer for this connection (optional
760     * operation). This is an expert facility not used by regular JMS clients.
761     *
762     * @param topic topic to access
763     * @param subscriptionName durable subscription name
764     * @param messageSelector only messages with properties matching the message
765     *                selector expression are delivered. A value of null or an
766     *                empty string indicates that there is no message selector
767     *                for the message consumer.
768     * @param sessionPool the server session pool to associate with this durable
769     *                connection consumer
770     * @param maxMessages the maximum number of messages that can be assigned to
771     *                a server session at one time
772     * @return the durable connection consumer
773     * @throws JMSException if the <CODE>Connection</CODE> object fails to
774     *                 create a connection consumer due to some internal error
775     *                 or invalid arguments for <CODE>sessionPool</CODE> and
776     *                 <CODE>messageSelector</CODE>.
777     * @throws javax.jms.InvalidDestinationException if an invalid destination
778     *                 is specified.
779     * @throws javax.jms.InvalidSelectorException if the message selector is
780     *                 invalid.
781     * @see javax.jms.ConnectionConsumer
782     * @since 1.1
783     */
784    @Override
785    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
786        throws JMSException {
787        return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
788    }
789
790    /**
791     * Create a durable connection consumer for this connection (optional
792     * operation). This is an expert facility not used by regular JMS clients.
793     *
794     * @param topic topic to access
795     * @param subscriptionName durable subscription name
796     * @param messageSelector only messages with properties matching the message
797     *                selector expression are delivered. A value of null or an
798     *                empty string indicates that there is no message selector
799     *                for the message consumer.
800     * @param sessionPool the server session pool to associate with this durable
801     *                connection consumer
802     * @param maxMessages the maximum number of messages that can be assigned to
803     *                a server session at one time
804     * @param noLocal set true if you want to filter out messages published
805     *                locally
806     * @return the durable connection consumer
807     * @throws JMSException if the <CODE>Connection</CODE> object fails to
808     *                 create a connection consumer due to some internal error
809     *                 or invalid arguments for <CODE>sessionPool</CODE> and
810     *                 <CODE>messageSelector</CODE>.
811     * @throws javax.jms.InvalidDestinationException if an invalid destination
812     *                 is specified.
813     * @throws javax.jms.InvalidSelectorException if the message selector is
814     *                 invalid.
815     * @see javax.jms.ConnectionConsumer
816     * @since 1.1
817     */
818    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
819                                                              boolean noLocal) throws JMSException {
820        checkClosedOrFailed();
821
822        if (queueOnlyConnection) {
823            throw new IllegalStateException("QueueConnection cannot be used to create Pub/Sub based resources.");
824        }
825
826        ensureConnectionInfoSent();
827        SessionId sessionId = new SessionId(info.getConnectionId(), -1);
828        ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
829        info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
830        info.setSubscriptionName(subscriptionName);
831        info.setSelector(messageSelector);
832        info.setPrefetchSize(maxMessages);
833        info.setDispatchAsync(isDispatchAsync());
834
835        // Allows the options on the destination to configure the consumerInfo
836        if (info.getDestination().getOptions() != null) {
837            Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
838            IntrospectionSupport.setProperties(this.info, options, "consumer.");
839        }
840
841        return new ActiveMQConnectionConsumer(this, sessionPool, info);
842    }
843
844    // Properties
845    // -------------------------------------------------------------------------
846
847    /**
848     * Returns true if this connection has been started
849     *
850     * @return true if this Connection is started
851     */
852    public boolean isStarted() {
853        return started.get();
854    }
855
856    /**
857     * Returns true if the connection is closed
858     */
859    public boolean isClosed() {
860        return closed.get();
861    }
862
863    /**
864     * Returns true if the connection is in the process of being closed
865     */
866    public boolean isClosing() {
867        return closing.get();
868    }
869
870    /**
871     * Returns true if the underlying transport has failed
872     */
873    public boolean isTransportFailed() {
874        return transportFailed.get();
875    }
876
877    /**
878     * @return Returns the prefetchPolicy.
879     */
880    public ActiveMQPrefetchPolicy getPrefetchPolicy() {
881        return prefetchPolicy;
882    }
883
884    /**
885     * Sets the <a
886     * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
887     * policy</a> for consumers created by this connection.
888     */
889    public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
890        this.prefetchPolicy = prefetchPolicy;
891    }
892
893    /**
894     */
895    public Transport getTransportChannel() {
896        return transport;
897    }
898
899    /**
900     * @return Returns the clientID of the connection, forcing one to be
901     *         generated if one has not yet been configured.
902     */
903    public String getInitializedClientID() throws JMSException {
904        ensureConnectionInfoSent();
905        return info.getClientId();
906    }
907
908    /**
909     * @return Returns the timeStampsDisableByDefault.
910     */
911    public boolean isDisableTimeStampsByDefault() {
912        return disableTimeStampsByDefault;
913    }
914
915    /**
916     * Sets whether or not timestamps on messages should be disabled or not. If
917     * you disable them it adds a small performance boost.
918     */
919    public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
920        this.disableTimeStampsByDefault = timeStampsDisableByDefault;
921    }
922
923    /**
924     * @return Returns the dispatchOptimizedMessage.
925     */
926    public boolean isOptimizedMessageDispatch() {
927        return optimizedMessageDispatch;
928    }
929
930    /**
931     * If this flag is set then an larger prefetch limit is used - only
932     * applicable for durable topic subscribers.
933     */
934    public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
935        this.optimizedMessageDispatch = dispatchOptimizedMessage;
936    }
937
938    /**
939     * @return Returns the closeTimeout.
940     */
941    public int getCloseTimeout() {
942        return closeTimeout;
943    }
944
945    /**
946     * Sets the timeout before a close is considered complete. Normally a
947     * close() on a connection waits for confirmation from the broker; this
948     * allows that operation to timeout to save the client hanging if there is
949     * no broker
950     */
951    public void setCloseTimeout(int closeTimeout) {
952        this.closeTimeout = closeTimeout;
953    }
954
955    /**
956     * @return ConnectionInfo
957     */
958    public ConnectionInfo getConnectionInfo() {
959        return this.info;
960    }
961
962    public boolean isUseRetroactiveConsumer() {
963        return useRetroactiveConsumer;
964    }
965
966    /**
967     * Sets whether or not retroactive consumers are enabled. Retroactive
968     * consumers allow non-durable topic subscribers to receive old messages
969     * that were published before the non-durable subscriber started.
970     */
971    public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
972        this.useRetroactiveConsumer = useRetroactiveConsumer;
973    }
974
975    public boolean isNestedMapAndListEnabled() {
976        return nestedMapAndListEnabled;
977    }
978
979    /**
980     * Enables/disables whether or not Message properties and MapMessage entries
981     * support <a
982     * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
983     * Structures</a> of Map and List objects
984     */
985    public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
986        this.nestedMapAndListEnabled = structuredMapsEnabled;
987    }
988
989    public boolean isExclusiveConsumer() {
990        return exclusiveConsumer;
991    }
992
993    /**
994     * Enables or disables whether or not queue consumers should be exclusive or
995     * not for example to preserve ordering when not using <a
996     * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
997     *
998     * @param exclusiveConsumer
999     */
1000    public void setExclusiveConsumer(boolean exclusiveConsumer) {
1001        this.exclusiveConsumer = exclusiveConsumer;
1002    }
1003
1004    /**
1005     * Adds a transport listener so that a client can be notified of events in
1006     * the underlying transport
1007     */
1008    public void addTransportListener(TransportListener transportListener) {
1009        transportListeners.add(transportListener);
1010    }
1011
1012    public void removeTransportListener(TransportListener transportListener) {
1013        transportListeners.remove(transportListener);
1014    }
1015
1016    public boolean isUseDedicatedTaskRunner() {
1017        return useDedicatedTaskRunner;
1018    }
1019
1020    public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
1021        this.useDedicatedTaskRunner = useDedicatedTaskRunner;
1022    }
1023
1024    public TaskRunnerFactory getSessionTaskRunner() {
1025        synchronized (this) {
1026            if (sessionTaskRunner == null) {
1027                sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner(), maxThreadPoolSize);
1028                sessionTaskRunner.setRejectedTaskHandler(rejectedTaskHandler);
1029            }
1030        }
1031        return sessionTaskRunner;
1032    }
1033
1034    public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
1035        this.sessionTaskRunner = sessionTaskRunner;
1036    }
1037
1038    public MessageTransformer getTransformer() {
1039        return transformer;
1040    }
1041
1042    /**
1043     * Sets the transformer used to transform messages before they are sent on
1044     * to the JMS bus or when they are received from the bus but before they are
1045     * delivered to the JMS client
1046     */
1047    public void setTransformer(MessageTransformer transformer) {
1048        this.transformer = transformer;
1049    }
1050
1051    /**
1052     * @return the statsEnabled
1053     */
1054    public boolean isStatsEnabled() {
1055        return this.stats.isEnabled();
1056    }
1057
1058    /**
1059     * @param statsEnabled the statsEnabled to set
1060     */
1061    public void setStatsEnabled(boolean statsEnabled) {
1062        this.stats.setEnabled(statsEnabled);
1063    }
1064
1065    /**
1066     * Returns the {@link DestinationSource} object which can be used to listen to destinations
1067     * being created or destroyed or to enquire about the current destinations available on the broker
1068     *
1069     * @return a lazily created destination source
1070     * @throws JMSException
1071     */
1072    @Override
1073    public DestinationSource getDestinationSource() throws JMSException {
1074        if (destinationSource == null) {
1075            destinationSource = new DestinationSource(this);
1076            destinationSource.start();
1077        }
1078        return destinationSource;
1079    }
1080
1081    // Implementation methods
1082    // -------------------------------------------------------------------------
1083
1084    /**
1085     * Used internally for adding Sessions to the Connection
1086     *
1087     * @param session
1088     * @throws JMSException
1089     * @throws JMSException
1090     */
1091    protected void addSession(ActiveMQSession session) throws JMSException {
1092        this.sessions.add(session);
1093        if (sessions.size() > 1 || session.isTransacted()) {
1094            optimizedMessageDispatch = false;
1095        }
1096    }
1097
1098    /**
1099     * Used interanlly for removing Sessions from a Connection
1100     *
1101     * @param session
1102     */
1103    protected void removeSession(ActiveMQSession session) {
1104        this.sessions.remove(session);
1105        this.removeDispatcher(session);
1106    }
1107
1108    /**
1109     * Add a ConnectionConsumer
1110     *
1111     * @param connectionConsumer
1112     * @throws JMSException
1113     */
1114    protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
1115        this.connectionConsumers.add(connectionConsumer);
1116    }
1117
1118    /**
1119     * Remove a ConnectionConsumer
1120     *
1121     * @param connectionConsumer
1122     */
1123    protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
1124        this.connectionConsumers.remove(connectionConsumer);
1125        this.removeDispatcher(connectionConsumer);
1126    }
1127
1128    /**
1129     * Creates a <CODE>TopicSession</CODE> object.
1130     *
1131     * @param transacted indicates whether the session is transacted
1132     * @param acknowledgeMode indicates whether the consumer or the client will
1133     *                acknowledge any messages it receives; ignored if the
1134     *                session is transacted. Legal values are
1135     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1136     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1137     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1138     * @return a newly created topic session
1139     * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1140     *                 to create a session due to some internal error or lack of
1141     *                 support for the specific transaction and acknowledgement
1142     *                 mode.
1143     * @see Session#AUTO_ACKNOWLEDGE
1144     * @see Session#CLIENT_ACKNOWLEDGE
1145     * @see Session#DUPS_OK_ACKNOWLEDGE
1146     */
1147    @Override
1148    public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
1149        return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1150    }
1151
1152    /**
1153     * Creates a connection consumer for this connection (optional operation).
1154     * This is an expert facility not used by regular JMS clients.
1155     *
1156     * @param topic the topic to access
1157     * @param messageSelector only messages with properties matching the message
1158     *                selector expression are delivered. A value of null or an
1159     *                empty string indicates that there is no message selector
1160     *                for the message consumer.
1161     * @param sessionPool the server session pool to associate with this
1162     *                connection consumer
1163     * @param maxMessages the maximum number of messages that can be assigned to
1164     *                a server session at one time
1165     * @return the connection consumer
1166     * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1167     *                 to create a connection consumer due to some internal
1168     *                 error or invalid arguments for <CODE>sessionPool</CODE>
1169     *                 and <CODE>messageSelector</CODE>.
1170     * @throws javax.jms.InvalidDestinationException if an invalid topic is
1171     *                 specified.
1172     * @throws javax.jms.InvalidSelectorException if the message selector is
1173     *                 invalid.
1174     * @see javax.jms.ConnectionConsumer
1175     */
1176    @Override
1177    public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1178        return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
1179    }
1180
1181    /**
1182     * Creates a connection consumer for this connection (optional operation).
1183     * This is an expert facility not used by regular JMS clients.
1184     *
1185     * @param queue the queue to access
1186     * @param messageSelector only messages with properties matching the message
1187     *                selector expression are delivered. A value of null or an
1188     *                empty string indicates that there is no message selector
1189     *                for the message consumer.
1190     * @param sessionPool the server session pool to associate with this
1191     *                connection consumer
1192     * @param maxMessages the maximum number of messages that can be assigned to
1193     *                a server session at one time
1194     * @return the connection consumer
1195     * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1196     *                 to create a connection consumer due to some internal
1197     *                 error or invalid arguments for <CODE>sessionPool</CODE>
1198     *                 and <CODE>messageSelector</CODE>.
1199     * @throws javax.jms.InvalidDestinationException if an invalid queue is
1200     *                 specified.
1201     * @throws javax.jms.InvalidSelectorException if the message selector is
1202     *                 invalid.
1203     * @see javax.jms.ConnectionConsumer
1204     */
1205    @Override
1206    public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1207        return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
1208    }
1209
1210    /**
1211     * Creates a connection consumer for this connection (optional operation).
1212     * This is an expert facility not used by regular JMS clients.
1213     *
1214     * @param destination the destination to access
1215     * @param messageSelector only messages with properties matching the message
1216     *                selector expression are delivered. A value of null or an
1217     *                empty string indicates that there is no message selector
1218     *                for the message consumer.
1219     * @param sessionPool the server session pool to associate with this
1220     *                connection consumer
1221     * @param maxMessages the maximum number of messages that can be assigned to
1222     *                a server session at one time
1223     * @return the connection consumer
1224     * @throws JMSException if the <CODE>Connection</CODE> object fails to
1225     *                 create a connection consumer due to some internal error
1226     *                 or invalid arguments for <CODE>sessionPool</CODE> and
1227     *                 <CODE>messageSelector</CODE>.
1228     * @throws javax.jms.InvalidDestinationException if an invalid destination
1229     *                 is specified.
1230     * @throws javax.jms.InvalidSelectorException if the message selector is
1231     *                 invalid.
1232     * @see javax.jms.ConnectionConsumer
1233     * @since 1.1
1234     */
1235    @Override
1236    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1237        return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
1238    }
1239
1240    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
1241        throws JMSException {
1242
1243        checkClosedOrFailed();
1244        ensureConnectionInfoSent();
1245
1246        ConsumerId consumerId = createConsumerId();
1247        ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
1248        consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
1249        consumerInfo.setSelector(messageSelector);
1250        consumerInfo.setPrefetchSize(maxMessages);
1251        consumerInfo.setNoLocal(noLocal);
1252        consumerInfo.setDispatchAsync(isDispatchAsync());
1253
1254        // Allows the options on the destination to configure the consumerInfo
1255        if (consumerInfo.getDestination().getOptions() != null) {
1256            Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions());
1257            IntrospectionSupport.setProperties(consumerInfo, options, "consumer.");
1258        }
1259
1260        return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo);
1261    }
1262
1263    /**
1264     * @return
1265     */
1266    private ConsumerId createConsumerId() {
1267        return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
1268    }
1269
1270    /**
1271     * @return
1272     */
1273    private ProducerId createProducerId() {
1274        return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId());
1275    }
1276
1277    /**
1278     * Creates a <CODE>QueueSession</CODE> object.
1279     *
1280     * @param transacted indicates whether the session is transacted
1281     * @param acknowledgeMode indicates whether the consumer or the client will
1282     *                acknowledge any messages it receives; ignored if the
1283     *                session is transacted. Legal values are
1284     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1285     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1286     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1287     * @return a newly created queue session
1288     * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1289     *                 to create a session due to some internal error or lack of
1290     *                 support for the specific transaction and acknowledgement
1291     *                 mode.
1292     * @see Session#AUTO_ACKNOWLEDGE
1293     * @see Session#CLIENT_ACKNOWLEDGE
1294     * @see Session#DUPS_OK_ACKNOWLEDGE
1295     */
1296    @Override
1297    public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
1298        return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1299    }
1300
1301    /**
1302     * Ensures that the clientID was manually specified and not auto-generated.
1303     * If the clientID was not specified this method will throw an exception.
1304     * This method is used to ensure that the clientID + durableSubscriber name
1305     * are used correctly.
1306     *
1307     * @throws JMSException
1308     */
1309    public void checkClientIDWasManuallySpecified() throws JMSException {
1310        if (!userSpecifiedClientID) {
1311            throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1312        }
1313    }
1314
1315    /**
1316     * send a Packet through the Connection - for internal use only
1317     *
1318     * @param command
1319     * @throws JMSException
1320     */
1321    public void asyncSendPacket(Command command) throws JMSException {
1322        if (isClosed()) {
1323            throw new ConnectionClosedException();
1324        } else {
1325            doAsyncSendPacket(command);
1326        }
1327    }
1328
1329    private void doAsyncSendPacket(Command command) throws JMSException {
1330        try {
1331            this.transport.oneway(command);
1332        } catch (IOException e) {
1333            throw JMSExceptionSupport.create(e);
1334        }
1335    }
1336
1337    /**
1338     * Send a packet through a Connection - for internal use only
1339     *
1340     * @param command
1341     * @return
1342     * @throws JMSException
1343     */
1344    public void syncSendPacket(final Command command, final AsyncCallback onComplete) throws JMSException {
1345        if(onComplete==null) {
1346            syncSendPacket(command);
1347        } else {
1348            if (isClosed()) {
1349                throw new ConnectionClosedException();
1350            }
1351            try {
1352                this.transport.asyncRequest(command, new ResponseCallback() {
1353                    @Override
1354                    public void onCompletion(FutureResponse resp) {
1355                        Response response;
1356                        Throwable exception = null;
1357                        try {
1358                            response = resp.getResult();
1359                            if (response.isException()) {
1360                                ExceptionResponse er = (ExceptionResponse)response;
1361                                exception = er.getException();
1362                            }
1363                        } catch (Exception e) {
1364                            exception = e;
1365                        }
1366                        if(exception!=null) {
1367                            if ( exception instanceof JMSException) {
1368                                onComplete.onException((JMSException) exception);
1369                            } else {
1370                                if (isClosed()||closing.get()) {
1371                                    LOG.debug("Received an exception but connection is closing");
1372                                }
1373                                JMSException jmsEx = null;
1374                                try {
1375                                    jmsEx = JMSExceptionSupport.create(exception);
1376                                } catch(Throwable e) {
1377                                    LOG.error("Caught an exception trying to create a JMSException for " +exception,e);
1378                                }
1379                                // dispose of transport for security exceptions on connection initiation
1380                                if (exception instanceof SecurityException && command instanceof ConnectionInfo){
1381                                    forceCloseOnSecurityException(exception);
1382                                }
1383                                if (jmsEx !=null) {
1384                                    onComplete.onException(jmsEx);
1385                                }
1386                            }
1387                        } else {
1388                            onComplete.onSuccess();
1389                        }
1390                    }
1391                });
1392            } catch (IOException e) {
1393                throw JMSExceptionSupport.create(e);
1394            }
1395        }
1396    }
1397
1398    private void forceCloseOnSecurityException(Throwable exception) {
1399        LOG.trace("force close on security exception:" + this + ", transport=" + transport, exception);
1400        onException(new IOException("Force close due to SecurityException on connect", exception));
1401    }
1402
1403    public Response syncSendPacket(Command command) throws JMSException {
1404        if (isClosed()) {
1405            throw new ConnectionClosedException();
1406        } else {
1407
1408            try {
1409                Response response = (Response)this.transport.request(command);
1410                if (response.isException()) {
1411                    ExceptionResponse er = (ExceptionResponse)response;
1412                    if (er.getException() instanceof JMSException) {
1413                        throw (JMSException)er.getException();
1414                    } else {
1415                        if (isClosed()||closing.get()) {
1416                            LOG.debug("Received an exception but connection is closing");
1417                        }
1418                        JMSException jmsEx = null;
1419                        try {
1420                            jmsEx = JMSExceptionSupport.create(er.getException());
1421                        } catch(Throwable e) {
1422                            LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
1423                        }
1424                        if (er.getException() instanceof SecurityException && command instanceof ConnectionInfo){
1425                            forceCloseOnSecurityException(er.getException());
1426                        }
1427                        if (jmsEx !=null) {
1428                            throw jmsEx;
1429                        }
1430                    }
1431                }
1432                return response;
1433            } catch (IOException e) {
1434                throw JMSExceptionSupport.create(e);
1435            }
1436        }
1437    }
1438
1439    /**
1440     * Send a packet through a Connection - for internal use only
1441     *
1442     * @param command
1443     * @return
1444     * @throws JMSException
1445     */
1446    public Response syncSendPacket(Command command, int timeout) throws JMSException {
1447        if (isClosed() || closing.get()) {
1448            throw new ConnectionClosedException();
1449        } else {
1450            return doSyncSendPacket(command, timeout);
1451        }
1452    }
1453
1454    private Response doSyncSendPacket(Command command, int timeout)
1455            throws JMSException {
1456        try {
1457            Response response = (Response) (timeout > 0
1458                    ? this.transport.request(command, timeout)
1459                    : this.transport.request(command));
1460            if (response != null && response.isException()) {
1461                ExceptionResponse er = (ExceptionResponse)response;
1462                if (er.getException() instanceof JMSException) {
1463                    throw (JMSException)er.getException();
1464                } else {
1465                    throw JMSExceptionSupport.create(er.getException());
1466                }
1467            }
1468            return response;
1469        } catch (IOException e) {
1470            throw JMSExceptionSupport.create(e);
1471        }
1472    }
1473
1474    /**
1475     * @return statistics for this Connection
1476     */
1477    @Override
1478    public StatsImpl getStats() {
1479        return stats;
1480    }
1481
1482    /**
1483     * simply throws an exception if the Connection is already closed or the
1484     * Transport has failed
1485     *
1486     * @throws JMSException
1487     */
1488    protected synchronized void checkClosedOrFailed() throws JMSException {
1489        checkClosed();
1490        if (transportFailed.get()) {
1491            throw new ConnectionFailedException(firstFailureError);
1492        }
1493    }
1494
1495    /**
1496     * simply throws an exception if the Connection is already closed
1497     *
1498     * @throws JMSException
1499     */
1500    protected synchronized void checkClosed() throws JMSException {
1501        if (closed.get()) {
1502            throw new ConnectionClosedException();
1503        }
1504    }
1505
1506    /**
1507     * Send the ConnectionInfo to the Broker
1508     *
1509     * @throws JMSException
1510     */
1511    protected void ensureConnectionInfoSent() throws JMSException {
1512        synchronized(this.ensureConnectionInfoSentMutex) {
1513            // Can we skip sending the ConnectionInfo packet??
1514            if (isConnectionInfoSentToBroker || closed.get()) {
1515                return;
1516            }
1517            //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?
1518            if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
1519                info.setClientId(clientIdGenerator.generateId());
1520            }
1521            syncSendPacket(info.copy());
1522
1523            this.isConnectionInfoSentToBroker = true;
1524            // Add a temp destination advisory consumer so that
1525            // We know what the valid temporary destinations are on the
1526            // broker without having to do an RPC to the broker.
1527
1528            ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
1529            if (watchTopicAdvisories) {
1530                advisoryConsumer = new AdvisoryConsumer(this, consumerId);
1531            }
1532        }
1533    }
1534
1535    public synchronized boolean isWatchTopicAdvisories() {
1536        return watchTopicAdvisories;
1537    }
1538
1539    public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
1540        this.watchTopicAdvisories = watchTopicAdvisories;
1541    }
1542
1543    /**
1544     * @return Returns the useAsyncSend.
1545     */
1546    public boolean isUseAsyncSend() {
1547        return useAsyncSend;
1548    }
1549
1550    /**
1551     * Forces the use of <a
1552     * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
1553     * adds a massive performance boost; but means that the send() method will
1554     * return immediately whether the message has been sent or not which could
1555     * lead to message loss.
1556     */
1557    public void setUseAsyncSend(boolean useAsyncSend) {
1558        this.useAsyncSend = useAsyncSend;
1559    }
1560
1561    /**
1562     * @return true if always sync send messages
1563     */
1564    public boolean isAlwaysSyncSend() {
1565        return this.alwaysSyncSend;
1566    }
1567
1568    /**
1569     * Set true if always require messages to be sync sent
1570     *
1571     * @param alwaysSyncSend
1572     */
1573    public void setAlwaysSyncSend(boolean alwaysSyncSend) {
1574        this.alwaysSyncSend = alwaysSyncSend;
1575    }
1576
1577    /**
1578     * @return the messagePrioritySupported
1579     */
1580    public boolean isMessagePrioritySupported() {
1581        return this.messagePrioritySupported;
1582    }
1583
1584    /**
1585     * @param messagePrioritySupported the messagePrioritySupported to set
1586     */
1587    public void setMessagePrioritySupported(boolean messagePrioritySupported) {
1588        this.messagePrioritySupported = messagePrioritySupported;
1589    }
1590
1591    /**
1592     * Cleans up this connection so that it's state is as if the connection was
1593     * just created. This allows the Resource Adapter to clean up a connection
1594     * so that it can be reused without having to close and recreate the
1595     * connection.
1596     */
1597    public void cleanup() throws JMSException {
1598
1599        if (advisoryConsumer != null && !isTransportFailed()) {
1600            advisoryConsumer.dispose();
1601            advisoryConsumer = null;
1602        }
1603
1604        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1605            ActiveMQSession s = i.next();
1606            s.dispose();
1607        }
1608        for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
1609            ActiveMQConnectionConsumer c = i.next();
1610            c.dispose();
1611        }
1612
1613        // Stream are deprecated and will be removed in a later release.
1614        for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
1615            ActiveMQInputStream c = i.next();
1616            c.dispose();
1617        }
1618        for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
1619            ActiveMQOutputStream c = i.next();
1620            c.dispose();
1621        }
1622
1623        if (isConnectionInfoSentToBroker) {
1624            if (!transportFailed.get() && !closing.get()) {
1625                syncSendPacket(info.createRemoveCommand());
1626            }
1627            isConnectionInfoSentToBroker = false;
1628        }
1629        if (userSpecifiedClientID) {
1630            info.setClientId(null);
1631            userSpecifiedClientID = false;
1632        }
1633        clientIDSet = false;
1634
1635        started.set(false);
1636    }
1637
1638    /**
1639     * Changes the associated username/password that is associated with this
1640     * connection. If the connection has been used, you must called cleanup()
1641     * before calling this method.
1642     *
1643     * @throws IllegalStateException if the connection is in used.
1644     */
1645    public void changeUserInfo(String userName, String password) throws JMSException {
1646        if (isConnectionInfoSentToBroker) {
1647            throw new IllegalStateException("changeUserInfo used Connection is not allowed");
1648        }
1649        this.info.setUserName(userName);
1650        this.info.setPassword(password);
1651    }
1652
1653    /**
1654     * @return Returns the resourceManagerId.
1655     * @throws JMSException
1656     */
1657    public String getResourceManagerId() throws JMSException {
1658        if (isRmIdFromConnectionId()) {
1659            return info.getConnectionId().getValue();
1660        }
1661        waitForBrokerInfo();
1662        if (brokerInfo == null) {
1663            throw new JMSException("Connection failed before Broker info was received.");
1664        }
1665        return brokerInfo.getBrokerId().getValue();
1666    }
1667
1668    /**
1669     * Returns the broker name if one is available or null if one is not
1670     * available yet.
1671     */
1672    public String getBrokerName() {
1673        try {
1674            brokerInfoReceived.await(5, TimeUnit.SECONDS);
1675            if (brokerInfo == null) {
1676                return null;
1677            }
1678            return brokerInfo.getBrokerName();
1679        } catch (InterruptedException e) {
1680            Thread.currentThread().interrupt();
1681            return null;
1682        }
1683    }
1684
1685    /**
1686     * Returns the broker information if it is available or null if it is not
1687     * available yet.
1688     */
1689    public BrokerInfo getBrokerInfo() {
1690        return brokerInfo;
1691    }
1692
1693    /**
1694     * @return Returns the RedeliveryPolicy.
1695     * @throws JMSException
1696     */
1697    public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
1698        return redeliveryPolicyMap.getDefaultEntry();
1699    }
1700
1701    /**
1702     * Sets the redelivery policy to be used when messages are rolled back
1703     */
1704    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
1705        this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy);
1706    }
1707
1708    public BlobTransferPolicy getBlobTransferPolicy() {
1709        if (blobTransferPolicy == null) {
1710            blobTransferPolicy = createBlobTransferPolicy();
1711        }
1712        return blobTransferPolicy;
1713    }
1714
1715    /**
1716     * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1717     * OBjects) are transferred from producers to brokers to consumers
1718     */
1719    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1720        this.blobTransferPolicy = blobTransferPolicy;
1721    }
1722
1723    /**
1724     * @return Returns the alwaysSessionAsync.
1725     */
1726    public boolean isAlwaysSessionAsync() {
1727        return alwaysSessionAsync;
1728    }
1729
1730    /**
1731     * If this flag is not set then a separate thread is not used for dispatching messages for each Session in
1732     * the Connection. However, a separate thread is always used if there is more than one session, or the session
1733     * isn't in auto acknowledge or duplicates ok mode.  By default this value is set to true and session dispatch
1734     * happens asynchronously.
1735     */
1736    public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
1737        this.alwaysSessionAsync = alwaysSessionAsync;
1738    }
1739
1740    /**
1741     * @return Returns the optimizeAcknowledge.
1742     */
1743    public boolean isOptimizeAcknowledge() {
1744        return optimizeAcknowledge;
1745    }
1746
1747    /**
1748     * Enables an optimised acknowledgement mode where messages are acknowledged
1749     * in batches rather than individually
1750     *
1751     * @param optimizeAcknowledge The optimizeAcknowledge to set.
1752     */
1753    public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
1754        this.optimizeAcknowledge = optimizeAcknowledge;
1755    }
1756
1757    /**
1758     * The max time in milliseconds between optimized ack batches
1759     * @param optimizeAcknowledgeTimeOut
1760     */
1761    public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) {
1762        this.optimizeAcknowledgeTimeOut =  optimizeAcknowledgeTimeOut;
1763    }
1764
1765    public long getOptimizeAcknowledgeTimeOut() {
1766        return optimizeAcknowledgeTimeOut;
1767    }
1768
1769    public long getWarnAboutUnstartedConnectionTimeout() {
1770        return warnAboutUnstartedConnectionTimeout;
1771    }
1772
1773    /**
1774     * Enables the timeout from a connection creation to when a warning is
1775     * generated if the connection is not properly started via {@link #start()}
1776     * and a message is received by a consumer. It is a very common gotcha to
1777     * forget to <a
1778     * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
1779     * the connection</a> so this option makes the default case to create a
1780     * warning if the user forgets. To disable the warning just set the value to <
1781     * 0 (say -1).
1782     */
1783    public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
1784        this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
1785    }
1786
1787    /**
1788     * @return the sendTimeout (in milliseconds)
1789     */
1790    public int getSendTimeout() {
1791        return sendTimeout;
1792    }
1793
1794    /**
1795     * @param sendTimeout the sendTimeout to set (in milliseconds)
1796     */
1797    public void setSendTimeout(int sendTimeout) {
1798        this.sendTimeout = sendTimeout;
1799    }
1800
1801    /**
1802     * @return the sendAcksAsync
1803     */
1804    public boolean isSendAcksAsync() {
1805        return sendAcksAsync;
1806    }
1807
1808    /**
1809     * @param sendAcksAsync the sendAcksAsync to set
1810     */
1811    public void setSendAcksAsync(boolean sendAcksAsync) {
1812        this.sendAcksAsync = sendAcksAsync;
1813    }
1814
1815    /**
1816     * Returns the time this connection was created
1817     */
1818    public long getTimeCreated() {
1819        return timeCreated;
1820    }
1821
1822    private void waitForBrokerInfo() throws JMSException {
1823        try {
1824            brokerInfoReceived.await();
1825        } catch (InterruptedException e) {
1826            Thread.currentThread().interrupt();
1827            throw JMSExceptionSupport.create(e);
1828        }
1829    }
1830
1831    // Package protected so that it can be used in unit tests
1832    public Transport getTransport() {
1833        return transport;
1834    }
1835
1836    public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
1837        producers.put(producerId, producer);
1838    }
1839
1840    public void removeProducer(ProducerId producerId) {
1841        producers.remove(producerId);
1842    }
1843
1844    public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
1845        dispatchers.put(consumerId, dispatcher);
1846    }
1847
1848    public void removeDispatcher(ConsumerId consumerId) {
1849        dispatchers.remove(consumerId);
1850    }
1851
1852    public boolean hasDispatcher(ConsumerId consumerId) {
1853        return dispatchers.containsKey(consumerId);
1854    }
1855
1856    /**
1857     * @param o - the command to consume
1858     */
1859    @Override
1860    public void onCommand(final Object o) {
1861        final Command command = (Command)o;
1862        if (!closed.get() && command != null) {
1863            try {
1864                command.visit(new CommandVisitorAdapter() {
1865                    @Override
1866                    public Response processMessageDispatch(MessageDispatch md) throws Exception {
1867                        waitForTransportInterruptionProcessingToComplete();
1868                        ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
1869                        if (dispatcher != null) {
1870                            // Copy in case a embedded broker is dispatching via
1871                            // vm://
1872                            // md.getMessage() == null to signal end of queue
1873                            // browse.
1874                            Message msg = md.getMessage();
1875                            if (msg != null) {
1876                                msg = msg.copy();
1877                                msg.setReadOnlyBody(true);
1878                                msg.setReadOnlyProperties(true);
1879                                msg.setRedeliveryCounter(md.getRedeliveryCounter());
1880                                msg.setConnection(ActiveMQConnection.this);
1881                                msg.setMemoryUsage(null);
1882                                md.setMessage(msg);
1883                            }
1884                            dispatcher.dispatch(md);
1885                        } else {
1886                            LOG.debug("{} no dispatcher for {} in {}", this, md, dispatchers);
1887                        }
1888                        return null;
1889                    }
1890
1891                    @Override
1892                    public Response processProducerAck(ProducerAck pa) throws Exception {
1893                        if (pa != null && pa.getProducerId() != null) {
1894                            ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
1895                            if (producer != null) {
1896                                producer.onProducerAck(pa);
1897                            }
1898                        }
1899                        return null;
1900                    }
1901
1902                    @Override
1903                    public Response processBrokerInfo(BrokerInfo info) throws Exception {
1904                        brokerInfo = info;
1905                        brokerInfoReceived.countDown();
1906                        optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
1907                        getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
1908                        return null;
1909                    }
1910
1911                    @Override
1912                    public Response processConnectionError(final ConnectionError error) throws Exception {
1913                        executor.execute(new Runnable() {
1914                            @Override
1915                            public void run() {
1916                                onAsyncException(error.getException());
1917                            }
1918                        });
1919                        return null;
1920                    }
1921
1922                    @Override
1923                    public Response processControlCommand(ControlCommand command) throws Exception {
1924                        onControlCommand(command);
1925                        return null;
1926                    }
1927
1928                    @Override
1929                    public Response processConnectionControl(ConnectionControl control) throws Exception {
1930                        onConnectionControl((ConnectionControl)command);
1931                        return null;
1932                    }
1933
1934                    @Override
1935                    public Response processConsumerControl(ConsumerControl control) throws Exception {
1936                        onConsumerControl((ConsumerControl)command);
1937                        return null;
1938                    }
1939
1940                    @Override
1941                    public Response processWireFormat(WireFormatInfo info) throws Exception {
1942                        onWireFormatInfo((WireFormatInfo)command);
1943                        return null;
1944                    }
1945                });
1946            } catch (Exception e) {
1947                onClientInternalException(e);
1948            }
1949        }
1950
1951        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1952            TransportListener listener = iter.next();
1953            listener.onCommand(command);
1954        }
1955    }
1956
1957    protected void onWireFormatInfo(WireFormatInfo info) {
1958        protocolVersion.set(info.getVersion());
1959    }
1960
1961    /**
1962     * Handles async client internal exceptions.
1963     * A client internal exception is usually one that has been thrown
1964     * by a container runtime component during asynchronous processing of a
1965     * message that does not affect the connection itself.
1966     * This method notifies the <code>ClientInternalExceptionListener</code> by invoking
1967     * its <code>onException</code> method, if one has been registered with this connection.
1968     *
1969     * @param error the exception that the problem
1970     */
1971    public void onClientInternalException(final Throwable error) {
1972        if ( !closed.get() && !closing.get() ) {
1973            if ( this.clientInternalExceptionListener != null ) {
1974                executor.execute(new Runnable() {
1975                    @Override
1976                    public void run() {
1977                        ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
1978                    }
1979                });
1980            } else {
1981                LOG.debug("Async client internal exception occurred with no exception listener registered: "
1982                        + error, error);
1983            }
1984        }
1985    }
1986
1987    /**
1988     * Used for handling async exceptions
1989     *
1990     * @param error
1991     */
1992    public void onAsyncException(Throwable error) {
1993        if (!closed.get() && !closing.get()) {
1994            if (this.exceptionListener != null) {
1995
1996                if (!(error instanceof JMSException)) {
1997                    error = JMSExceptionSupport.create(error);
1998                }
1999                final JMSException e = (JMSException)error;
2000
2001                executor.execute(new Runnable() {
2002                    @Override
2003                    public void run() {
2004                        ActiveMQConnection.this.exceptionListener.onException(e);
2005                    }
2006                });
2007
2008            } else {
2009                LOG.debug("Async exception with no exception listener: " + error, error);
2010            }
2011        }
2012    }
2013
2014    @Override
2015    public void onException(final IOException error) {
2016        onAsyncException(error);
2017        if (!closing.get() && !closed.get()) {
2018            executor.execute(new Runnable() {
2019                @Override
2020                public void run() {
2021                    transportFailed(error);
2022                    ServiceSupport.dispose(ActiveMQConnection.this.transport);
2023                    brokerInfoReceived.countDown();
2024                    try {
2025                        cleanup();
2026                    } catch (JMSException e) {
2027                        LOG.warn("Exception during connection cleanup, " + e, e);
2028                    }
2029                    for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2030                        TransportListener listener = iter.next();
2031                        listener.onException(error);
2032                    }
2033                }
2034            });
2035        }
2036    }
2037
2038    @Override
2039    public void transportInterupted() {
2040        transportInterruptionProcessingComplete.set(1);
2041        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2042            ActiveMQSession s = i.next();
2043            s.clearMessagesInProgress(transportInterruptionProcessingComplete);
2044        }
2045
2046        for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
2047            connectionConsumer.clearMessagesInProgress(transportInterruptionProcessingComplete);
2048        }
2049
2050        if (transportInterruptionProcessingComplete.decrementAndGet() > 0) {
2051            if (LOG.isDebugEnabled()) {
2052                LOG.debug("transport interrupted - processing required, dispatchers: " + transportInterruptionProcessingComplete.get());
2053            }
2054            signalInterruptionProcessingNeeded();
2055        }
2056
2057        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2058            TransportListener listener = iter.next();
2059            listener.transportInterupted();
2060        }
2061    }
2062
2063    @Override
2064    public void transportResumed() {
2065        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2066            TransportListener listener = iter.next();
2067            listener.transportResumed();
2068        }
2069    }
2070
2071    /**
2072     * Create the DestinationInfo object for the temporary destination.
2073     *
2074     * @param topic - if its true topic, else queue.
2075     * @return DestinationInfo
2076     * @throws JMSException
2077     */
2078    protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
2079
2080        // Check if Destination info is of temporary type.
2081        ActiveMQTempDestination dest;
2082        if (topic) {
2083            dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2084        } else {
2085            dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2086        }
2087
2088        DestinationInfo info = new DestinationInfo();
2089        info.setConnectionId(this.info.getConnectionId());
2090        info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
2091        info.setDestination(dest);
2092        syncSendPacket(info);
2093
2094        dest.setConnection(this);
2095        activeTempDestinations.put(dest, dest);
2096        return dest;
2097    }
2098
2099    /**
2100     * @param destination
2101     * @throws JMSException
2102     */
2103    public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
2104
2105        checkClosedOrFailed();
2106
2107        for (ActiveMQSession session : this.sessions) {
2108            if (session.isInUse(destination)) {
2109                throw new JMSException("A consumer is consuming from the temporary destination");
2110            }
2111        }
2112
2113        activeTempDestinations.remove(destination);
2114
2115        DestinationInfo destInfo = new DestinationInfo();
2116        destInfo.setConnectionId(this.info.getConnectionId());
2117        destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2118        destInfo.setDestination(destination);
2119        destInfo.setTimeout(0);
2120        syncSendPacket(destInfo);
2121    }
2122
2123    public boolean isDeleted(ActiveMQDestination dest) {
2124
2125        // If we are not watching the advisories.. then
2126        // we will assume that the temp destination does exist.
2127        if (advisoryConsumer == null) {
2128            return false;
2129        }
2130
2131        return !activeTempDestinations.contains(dest);
2132    }
2133
2134    public boolean isCopyMessageOnSend() {
2135        return copyMessageOnSend;
2136    }
2137
2138    public LongSequenceGenerator getLocalTransactionIdGenerator() {
2139        return localTransactionIdGenerator;
2140    }
2141
2142    public boolean isUseCompression() {
2143        return useCompression;
2144    }
2145
2146    /**
2147     * Enables the use of compression of the message bodies
2148     */
2149    public void setUseCompression(boolean useCompression) {
2150        this.useCompression = useCompression;
2151    }
2152
2153    public void destroyDestination(ActiveMQDestination destination) throws JMSException {
2154
2155        checkClosedOrFailed();
2156        ensureConnectionInfoSent();
2157
2158        DestinationInfo info = new DestinationInfo();
2159        info.setConnectionId(this.info.getConnectionId());
2160        info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2161        info.setDestination(destination);
2162        info.setTimeout(0);
2163        syncSendPacket(info);
2164    }
2165
2166    public boolean isDispatchAsync() {
2167        return dispatchAsync;
2168    }
2169
2170    /**
2171     * Enables or disables the default setting of whether or not consumers have
2172     * their messages <a
2173     * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
2174     * synchronously or asynchronously by the broker</a>. For non-durable
2175     * topics for example we typically dispatch synchronously by default to
2176     * minimize context switches which boost performance. However sometimes its
2177     * better to go slower to ensure that a single blocked consumer socket does
2178     * not block delivery to other consumers.
2179     *
2180     * @param asyncDispatch If true then consumers created on this connection
2181     *                will default to having their messages dispatched
2182     *                asynchronously. The default value is true.
2183     */
2184    public void setDispatchAsync(boolean asyncDispatch) {
2185        this.dispatchAsync = asyncDispatch;
2186    }
2187
2188    public boolean isObjectMessageSerializationDefered() {
2189        return objectMessageSerializationDefered;
2190    }
2191
2192    /**
2193     * When an object is set on an ObjectMessage, the JMS spec requires the
2194     * object to be serialized by that set method. Enabling this flag causes the
2195     * object to not get serialized. The object may subsequently get serialized
2196     * if the message needs to be sent over a socket or stored to disk.
2197     */
2198    public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
2199        this.objectMessageSerializationDefered = objectMessageSerializationDefered;
2200    }
2201
2202    @Override
2203    @Deprecated
2204    public InputStream createInputStream(Destination dest) throws JMSException {
2205        return createInputStream(dest, null);
2206    }
2207
2208    @Override
2209    @Deprecated
2210    public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException {
2211        return createInputStream(dest, messageSelector, false);
2212    }
2213
2214    @Override
2215    @Deprecated
2216    public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
2217        return createInputStream(dest, messageSelector, noLocal, -1);
2218    }
2219
2220    @Override
2221    @Deprecated
2222    public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2223        return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
2224    }
2225
2226    @Override
2227    @Deprecated
2228    public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
2229        return createInputStream(dest, null, false);
2230    }
2231
2232    @Override
2233    @Deprecated
2234    public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException {
2235        return createDurableInputStream(dest, name, messageSelector, false);
2236    }
2237
2238    @Override
2239    @Deprecated
2240    public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException {
2241        return createDurableInputStream(dest, name, messageSelector, noLocal, -1);
2242    }
2243
2244    @Override
2245    @Deprecated
2246    public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2247        return doCreateInputStream(dest, messageSelector, noLocal, name, timeout);
2248    }
2249
2250    @Deprecated
2251    private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException {
2252        checkClosedOrFailed();
2253        ensureConnectionInfoSent();
2254        return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch(), timeout);
2255    }
2256
2257    /**
2258     * Creates a persistent output stream; individual messages will be written
2259     * to disk/database by the broker
2260     */
2261    @Override
2262    @Deprecated
2263    public OutputStream createOutputStream(Destination dest) throws JMSException {
2264        return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2265    }
2266
2267    /**
2268     * Creates a non persistent output stream; messages will not be written to
2269     * disk
2270     */
2271    @Deprecated
2272    public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException {
2273        return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2274    }
2275
2276    /**
2277     * Creates an output stream allowing full control over the delivery mode,
2278     * the priority and time to live of the messages and the properties added to
2279     * messages on the stream.
2280     *
2281     * @param streamProperties defines a map of key-value pairs where the keys
2282     *                are strings and the values are primitive values (numbers
2283     *                and strings) which are appended to the messages similarly
2284     *                to using the
2285     *                {@link javax.jms.Message#setObjectProperty(String, Object)}
2286     *                method
2287     */
2288    @Override
2289    @Deprecated
2290    public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
2291        checkClosedOrFailed();
2292        ensureConnectionInfoSent();
2293        return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive);
2294    }
2295
2296    /**
2297     * Unsubscribes a durable subscription that has been created by a client.
2298     * <P>
2299     * This method deletes the state being maintained on behalf of the
2300     * subscriber by its provider.
2301     * <P>
2302     * It is erroneous for a client to delete a durable subscription while there
2303     * is an active <CODE>MessageConsumer </CODE> or
2304     * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
2305     * message is part of a pending transaction or has not been acknowledged in
2306     * the session.
2307     *
2308     * @param name the name used to identify this subscription
2309     * @throws JMSException if the session fails to unsubscribe to the durable
2310     *                 subscription due to some internal error.
2311     * @throws InvalidDestinationException if an invalid subscription name is
2312     *                 specified.
2313     * @since 1.1
2314     */
2315    @Override
2316    public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
2317        checkClosedOrFailed();
2318        RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
2319        rsi.setConnectionId(getConnectionInfo().getConnectionId());
2320        rsi.setSubscriptionName(name);
2321        rsi.setClientId(getConnectionInfo().getClientId());
2322        syncSendPacket(rsi);
2323    }
2324
2325    /**
2326     * Internal send method optimized: - It does not copy the message - It can
2327     * only handle ActiveMQ messages. - You can specify if the send is async or
2328     * sync - Does not allow you to send /w a transaction.
2329     */
2330    void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
2331        checkClosedOrFailed();
2332
2333        if (destination.isTemporary() && isDeleted(destination)) {
2334            throw new JMSException("Cannot publish to a deleted Destination: " + destination);
2335        }
2336
2337        msg.setJMSDestination(destination);
2338        msg.setJMSDeliveryMode(deliveryMode);
2339        long expiration = 0L;
2340
2341        if (!isDisableTimeStampsByDefault()) {
2342            long timeStamp = System.currentTimeMillis();
2343            msg.setJMSTimestamp(timeStamp);
2344            if (timeToLive > 0) {
2345                expiration = timeToLive + timeStamp;
2346            }
2347        }
2348
2349        msg.setJMSExpiration(expiration);
2350        msg.setJMSPriority(priority);
2351        msg.setJMSRedelivered(false);
2352        msg.setMessageId(messageId);
2353        msg.onSend();
2354        msg.setProducerId(msg.getMessageId().getProducerId());
2355
2356        if (LOG.isDebugEnabled()) {
2357            LOG.debug("Sending message: " + msg);
2358        }
2359
2360        if (async) {
2361            asyncSendPacket(msg);
2362        } else {
2363            syncSendPacket(msg);
2364        }
2365    }
2366
2367    @Deprecated
2368    public void addOutputStream(ActiveMQOutputStream stream) {
2369        outputStreams.add(stream);
2370    }
2371
2372    @Deprecated
2373    public void removeOutputStream(ActiveMQOutputStream stream) {
2374        outputStreams.remove(stream);
2375    }
2376
2377    @Deprecated
2378    public void addInputStream(ActiveMQInputStream stream) {
2379        inputStreams.add(stream);
2380    }
2381
2382    @Deprecated
2383    public void removeInputStream(ActiveMQInputStream stream) {
2384        inputStreams.remove(stream);
2385    }
2386
2387    protected void onControlCommand(ControlCommand command) {
2388        String text = command.getCommand();
2389        if (text != null) {
2390            if ("shutdown".equals(text)) {
2391                LOG.info("JVM told to shutdown");
2392                System.exit(0);
2393            }
2394
2395            // TODO Should we handle the "close" case?
2396            // if (false && "close".equals(text)){
2397            //     LOG.error("Broker " + getBrokerInfo() + "shutdown connection");
2398            //     try {
2399            //         close();
2400            //     } catch (JMSException e) {
2401            //     }
2402            // }
2403        }
2404    }
2405
2406    protected void onConnectionControl(ConnectionControl command) {
2407        if (command.isFaultTolerant()) {
2408            this.optimizeAcknowledge = false;
2409            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2410                ActiveMQSession s = i.next();
2411                s.setOptimizeAcknowledge(false);
2412            }
2413        }
2414    }
2415
2416    protected void onConsumerControl(ConsumerControl command) {
2417        if (command.isClose()) {
2418            for (ActiveMQSession session : this.sessions) {
2419                session.close(command.getConsumerId());
2420            }
2421        } else {
2422            for (ActiveMQSession session : this.sessions) {
2423                session.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
2424            }
2425            for (ActiveMQConnectionConsumer connectionConsumer: connectionConsumers) {
2426                ConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo();
2427                if (consumerInfo.getConsumerId().equals(command.getConsumerId())) {
2428                    consumerInfo.setPrefetchSize(command.getPrefetch());
2429                }
2430            }
2431        }
2432    }
2433
2434    protected void transportFailed(IOException error) {
2435        transportFailed.set(true);
2436        if (firstFailureError == null) {
2437            firstFailureError = error;
2438        }
2439    }
2440
2441    /**
2442     * Should a JMS message be copied to a new JMS Message object as part of the
2443     * send() method in JMS. This is enabled by default to be compliant with the
2444     * JMS specification. You can disable it if you do not mutate JMS messages
2445     * after they are sent for a performance boost
2446     */
2447    public void setCopyMessageOnSend(boolean copyMessageOnSend) {
2448        this.copyMessageOnSend = copyMessageOnSend;
2449    }
2450
2451    @Override
2452    public String toString() {
2453        return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
2454    }
2455
2456    protected BlobTransferPolicy createBlobTransferPolicy() {
2457        return new BlobTransferPolicy();
2458    }
2459
2460    public int getProtocolVersion() {
2461        return protocolVersion.get();
2462    }
2463
2464    public int getProducerWindowSize() {
2465        return producerWindowSize;
2466    }
2467
2468    public void setProducerWindowSize(int producerWindowSize) {
2469        this.producerWindowSize = producerWindowSize;
2470    }
2471
2472    public void setAuditDepth(int auditDepth) {
2473        connectionAudit.setAuditDepth(auditDepth);
2474    }
2475
2476    public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
2477        connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
2478    }
2479
2480    protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
2481        connectionAudit.removeDispatcher(dispatcher);
2482    }
2483
2484    protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2485        return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message);
2486    }
2487
2488    protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2489        connectionAudit.rollbackDuplicate(dispatcher, message);
2490    }
2491
2492    public IOException getFirstFailureError() {
2493        return firstFailureError;
2494    }
2495
2496    protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
2497        if (!closed.get() && !transportFailed.get() && transportInterruptionProcessingComplete.get()>0) {
2498            LOG.warn("dispatch with outstanding dispatch interruption processing count " + transportInterruptionProcessingComplete.get());
2499            signalInterruptionProcessingComplete();
2500        }
2501    }
2502
2503    protected void transportInterruptionProcessingComplete() {
2504        if (transportInterruptionProcessingComplete.decrementAndGet() == 0) {
2505            signalInterruptionProcessingComplete();
2506        }
2507    }
2508
2509    private void signalInterruptionProcessingComplete() {
2510            if (LOG.isDebugEnabled()) {
2511                LOG.debug("transportInterruptionProcessingComplete: " + transportInterruptionProcessingComplete.get()
2512                        + " for:" + this.getConnectionInfo().getConnectionId());
2513            }
2514
2515            FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2516            if (failoverTransport != null) {
2517                failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
2518                if (LOG.isDebugEnabled()) {
2519                    LOG.debug("notified failover transport (" + failoverTransport
2520                            + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
2521                }
2522            }
2523            transportInterruptionProcessingComplete.set(0);
2524    }
2525
2526    private void signalInterruptionProcessingNeeded() {
2527        FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2528        if (failoverTransport != null) {
2529            failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId());
2530            if (LOG.isDebugEnabled()) {
2531                LOG.debug("notified failover transport (" + failoverTransport
2532                        + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId());
2533            }
2534        }
2535    }
2536
2537    /*
2538     * specify the amount of time in milliseconds that a consumer with a transaction pending recovery
2539     * will wait to receive re dispatched messages.
2540     * default value is 0 so there is no wait by default.
2541     */
2542    public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
2543        this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
2544    }
2545
2546    public long getConsumerFailoverRedeliveryWaitPeriod() {
2547        return consumerFailoverRedeliveryWaitPeriod;
2548    }
2549
2550    protected Scheduler getScheduler() throws JMSException {
2551        Scheduler result = scheduler;
2552        if (result == null) {
2553            if (isClosing() || isClosed()) {
2554                // without lock contention report the closing state
2555                throw new ConnectionClosedException();
2556            }
2557            synchronized (this) {
2558                result = scheduler;
2559                if (result == null) {
2560                    checkClosed();
2561                    try {
2562                        result = new Scheduler("ActiveMQConnection["+info.getConnectionId().getValue()+"] Scheduler");
2563                        result.start();
2564                        scheduler = result;
2565                    } catch(Exception e) {
2566                        throw JMSExceptionSupport.create(e);
2567                    }
2568                }
2569            }
2570        }
2571        return result;
2572    }
2573
2574    protected ThreadPoolExecutor getExecutor() {
2575        return this.executor;
2576    }
2577
2578    protected CopyOnWriteArrayList<ActiveMQSession> getSessions() {
2579        return sessions;
2580    }
2581
2582    /**
2583     * @return the checkForDuplicates
2584     */
2585    public boolean isCheckForDuplicates() {
2586        return this.checkForDuplicates;
2587    }
2588
2589    /**
2590     * @param checkForDuplicates the checkForDuplicates to set
2591     */
2592    public void setCheckForDuplicates(boolean checkForDuplicates) {
2593        this.checkForDuplicates = checkForDuplicates;
2594    }
2595
2596    public boolean isTransactedIndividualAck() {
2597        return transactedIndividualAck;
2598    }
2599
2600    public void setTransactedIndividualAck(boolean transactedIndividualAck) {
2601        this.transactedIndividualAck = transactedIndividualAck;
2602    }
2603
2604    public boolean isNonBlockingRedelivery() {
2605        return nonBlockingRedelivery;
2606    }
2607
2608    public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
2609        this.nonBlockingRedelivery = nonBlockingRedelivery;
2610    }
2611
2612    public boolean isRmIdFromConnectionId() {
2613        return rmIdFromConnectionId;
2614    }
2615
2616    public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) {
2617        this.rmIdFromConnectionId = rmIdFromConnectionId;
2618    }
2619
2620    /**
2621     * Removes any TempDestinations that this connection has cached, ignoring
2622     * any exceptions generated because the destination is in use as they should
2623     * not be removed.
2624     * Used from a pooled connection, b/c it will not be explicitly closed.
2625     */
2626    public void cleanUpTempDestinations() {
2627
2628        if (this.activeTempDestinations == null || this.activeTempDestinations.isEmpty()) {
2629            return;
2630        }
2631
2632        Iterator<ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination>> entries
2633            = this.activeTempDestinations.entrySet().iterator();
2634        while(entries.hasNext()) {
2635            ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination> entry = entries.next();
2636            try {
2637                // Only delete this temp destination if it was created from this connection. The connection used
2638                // for the advisory consumer may also have a reference to this temp destination.
2639                ActiveMQTempDestination dest = entry.getValue();
2640                String thisConnectionId = (info.getConnectionId() == null) ? "" : info.getConnectionId().toString();
2641                if (dest.getConnectionId() != null && dest.getConnectionId().equals(thisConnectionId)) {
2642                    this.deleteTempDestination(entry.getValue());
2643                }
2644            } catch (Exception ex) {
2645                // the temp dest is in use so it can not be deleted.
2646                // it is ok to leave it to connection tear down phase
2647            }
2648        }
2649    }
2650
2651    /**
2652     * Sets the Connection wide RedeliveryPolicyMap for handling messages that are being rolled back.
2653     * @param redeliveryPolicyMap the redeliveryPolicyMap to set
2654     */
2655    public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
2656        this.redeliveryPolicyMap = redeliveryPolicyMap;
2657    }
2658
2659    /**
2660     * Gets the Connection's configured RedeliveryPolicyMap which will be used by all the
2661     * Consumers when dealing with transaction messages that have been rolled back.
2662     *
2663     * @return the redeliveryPolicyMap
2664     */
2665    public RedeliveryPolicyMap getRedeliveryPolicyMap() {
2666        return redeliveryPolicyMap;
2667    }
2668
2669    public int getMaxThreadPoolSize() {
2670        return maxThreadPoolSize;
2671    }
2672
2673    public void setMaxThreadPoolSize(int maxThreadPoolSize) {
2674        this.maxThreadPoolSize = maxThreadPoolSize;
2675    }
2676
2677    /**
2678     * Enable enforcement of QueueConnection semantics.
2679     *
2680     * @return this object, useful for chaining
2681     */
2682    ActiveMQConnection enforceQueueOnlyConnection() {
2683        this.queueOnlyConnection = true;
2684        return this;
2685    }
2686
2687    public RejectedExecutionHandler getRejectedTaskHandler() {
2688        return rejectedTaskHandler;
2689    }
2690
2691    public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
2692        this.rejectedTaskHandler = rejectedTaskHandler;
2693    }
2694
2695    /**
2696     * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled
2697     * to send an ack for any outstanding Message Acks.  By default this value is set to zero meaning that the consumers
2698     * will not do any background Message acknowledgment.
2699     *
2700     * @return the scheduledOptimizedAckInterval
2701     */
2702    public long getOptimizedAckScheduledAckInterval() {
2703        return optimizedAckScheduledAckInterval;
2704    }
2705
2706    /**
2707     * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that
2708     * have been configured with optimizeAcknowledge enabled.
2709     *
2710     * @param optimizedAckScheduledAckInterval the scheduledOptimizedAckInterval to set
2711     */
2712    public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
2713        this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
2714    }
2715
2716    /**
2717     * @return true if MessageConsumer instance will check for expired messages before dispatch.
2718     */
2719    public boolean isConsumerExpiryCheckEnabled() {
2720        return consumerExpiryCheckEnabled;
2721    }
2722
2723    /**
2724     * Controls whether message expiration checking is done in each MessageConsumer
2725     * prior to dispatching a message.  Disabling this check can lead to consumption
2726     * of expired messages.
2727     *
2728     * @param consumerExpiryCheckEnabled
2729     *        controls whether expiration checking is done prior to dispatch.
2730     */
2731    public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) {
2732        this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
2733    }
2734}