View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq;
20  
21  import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
22  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
23  import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.codehaus.activemq.capacity.CapacityMonitorEvent;
27  import org.codehaus.activemq.capacity.CapacityMonitorEventListener;
28  import org.codehaus.activemq.management.JMSConnectionStatsImpl;
29  import org.codehaus.activemq.management.JMSStatsImpl;
30  import org.codehaus.activemq.management.StatsCapable;
31  import org.codehaus.activemq.message.ActiveMQMessage;
32  import org.codehaus.activemq.message.CapacityInfo;
33  import org.codehaus.activemq.message.ConnectionInfo;
34  import org.codehaus.activemq.message.ConsumerInfo;
35  import org.codehaus.activemq.message.Packet;
36  import org.codehaus.activemq.message.PacketListener;
37  import org.codehaus.activemq.message.ProducerInfo;
38  import org.codehaus.activemq.message.Receipt;
39  import org.codehaus.activemq.message.SessionInfo;
40  import org.codehaus.activemq.message.util.MemoryBoundedQueue;
41  import org.codehaus.activemq.message.util.MemoryBoundedQueueManager;
42  import org.codehaus.activemq.transport.TransportChannel;
43  import org.codehaus.activemq.transport.TransportStatusEvent;
44  import org.codehaus.activemq.transport.TransportStatusEventListener;
45  import org.codehaus.activemq.util.IdGenerator;
46  import org.codehaus.activemq.util.JMSExceptionHelper;
47  
48  import javax.jms.*;
49  import javax.jms.IllegalStateException;
50  import javax.management.j2ee.statistics.Stats;
51  import java.util.Iterator;
52  
53  /***
54   * A <CODE>Connection</CODE> object is a client's active connection to its JMS provider. It typically allocates
55   * provider resources outside the Java virtual machine (JVM).
56   * <P>
57   * Connections support concurrent use.
58   * <P>
59   * A connection serves several purposes:
60   * <UL>
61   * <LI>It encapsulates an open connection with a JMS provider. It typically represents an open TCP/IP socket between a
62   * client and the service provider software.
63   * <LI>Its creation is where client authentication takes place.
64   * <LI>It can specify a unique client identifier.
65   * <LI>It provides a <CODE>ConnectionMetaData</CODE> object.
66   * <LI>It supports an optional <CODE>ExceptionListener</CODE> object.
67   * </UL>
68   * <P>
69   * Because the creation of a connection involves setting up authentication and communication, a connection is a
70   * relatively heavyweight object. Most clients will do all their messaging with a single connection. Other more advanced
71   * applications may use several connections. The JMS API does not architect a reason for using multiple connections;
72   * however, there may be operational reasons for doing so.
73   * <P>
74   * A JMS client typically creates a connection, one or more sessions, and a number of message producers and consumers.
75   * When a connection is created, it is in stopped mode. That means that no messages are being delivered.
76   * <P>
77   * It is typical to leave the connection in stopped mode until setup is complete (that is, until all message consumers
78   * have been created). At that point, the client calls the connection's <CODE>start</CODE> method, and messages begin
79   * arriving at the connection's consumers. This setup convention minimizes any client confusion that may result from
80   * asynchronous message delivery while the client is still in the process of setting itself up.
81   * <P>
82   * A connection can be started immediately, and the setup can be done afterwards. Clients that do this must be prepared
83   * to handle asynchronous message delivery while they are still in the process of setting up.
84   * <P>
85   * A message producer can send messages while a connection is stopped. <p/>This class is also a <CODE>TopicConnection
86   * </CODE>. A <CODE>TopicConnection</CODE> object is an active connection to a publish/subscribe JMS provider. A
87   * client uses a <CODE>TopicConnection</CODE> object to create one or more <CODE>TopicSession</CODE> objects for
88   * producing and consuming messages.
89   * <P>
90   * A <CODE>TopicConnection</CODE> can be used to create a <CODE>TopicSession</CODE>, from which specialized
91   * topic-related objects can be created. A more general, and recommended approach is to use the <CODE>Connection
92   * </CODE> object.
93   * <P>
94   * <p/><p/>This class is also a <CODE>QueueConnection</CODE>. A A <CODE>QueueConnection</CODE> object is an active
95   * connection to a point-to-point JMS provider. A client uses a <CODE>QueueConnection</CODE> object to create one or
96   * more <CODE>QueueSession</CODE> objects for producing and consuming messages.
97   * <P>
98   * A <CODE>QueueConnection</CODE> can be used to create a <CODE>QueueSession</CODE>, from which specialized
99   * queue-related objects can be created. A more general, and recommended, approach is to use the <CODE>Connection
100  * </CODE> object.
101  * <P>
102  * A <CODE>QueueConnection</CODE> cannot be used to create objects specific to the publish/subscribe domain. The
103  * <CODE>createDurableConnectionConsumer</CODE> method inherits from <CODE>Connection</CODE>, but must throw an
104  * <CODE>IllegalStateException</CODE> if used from <CODE>QueueConnection</CODE>. // *
105  *
106  * @version $Revision: 1.72 $
107  * @see javax.jms.Connection
108  * @see javax.jms.ConnectionFactory
109  * @see javax.jms.QueueConnection
110  * @see javax.jms.TopicConnection
111  * @see javax.jms.TopicConnectionFactory
112  * @see javax.jms.QueueConnection
113  * @see javax.jms.QueueConnectionFactory
114  */
115 public class ActiveMQConnection
116         implements
117         Connection,
118         PacketListener,
119         ExceptionListener,
120         TopicConnection,
121         QueueConnection,
122         StatsCapable,
123         CapacityMonitorEventListener,
124         TransportStatusEventListener {
125 
126     /***
127      * Default UserName for the Connection
128      */
129     public static final String DEFAULT_USER = "defaultUser";
130     /***
131      * Default URL for the ActiveMQ Broker
132      */
133     public static final String DEFAULT_URL = "tcp://localhost:61616";
134     /***
135      * Default Password for the Connection
136      */
137     public static final String DEFAULT_PASSWORD = "defaultPassword";
138 
139     private static final Log log = LogFactory.getLog(ActiveMQConnection.class);
140     private static final int DEFAULT_CONNECTION_MEMORY_LIMIT = 10 * 1024 * 1024;
141 
142     // properties
143     private ActiveMQConnectionFactory factory;
144     private String userName;
145     private String password;
146     protected String clientID;
147     private int sendCloseTimeout = 2000;
148     private TransportChannel transportChannel;
149     private ExceptionListener exceptionListener;
150     private ActiveMQPrefetchPolicy prefetchPolicy;
151     private JMSStatsImpl factoryStats;
152     private MemoryBoundedQueueManager boundedQueueManager;
153     protected IdGenerator consumerIdGenerator;
154     private IdGenerator clientIdGenerator;
155     protected IdGenerator packetIdGenerator;
156     private IdGenerator sessionIdGenerator;
157     private JMSConnectionStatsImpl stats;
158 
159     // internal state
160     private CopyOnWriteArrayList sessions;
161     private CopyOnWriteArrayList messageDispatchers;
162     private CopyOnWriteArrayList connectionConsumers;
163     private SynchronizedInt consumerNumberGenerator;
164     private ActiveMQConnectionMetaData connectionMetaData;
165     private SynchronizedBoolean closed;
166     private SynchronizedBoolean started;
167     private boolean clientIDSet;
168     private boolean isConnectionInfoSentToBroker;
169     private boolean isTransportOK;
170     private boolean startedTransport;
171     private long startTime;
172     private long flowControlSleepTime = 0;
173 
174     private boolean userSpecifiedClientID;
175     /***
176      * Should we use an async send for persistent non transacted messages ?
177      */
178     protected boolean useAsyncSend = true;
179     private int sendConnectionInfoTimeout = 30000;
180 
181     /***
182      * A static helper method to create a new connection
183      *
184      * @return an ActiveMQConnection
185      * @throws JMSException
186      */
187     public static ActiveMQConnection makeConnection() throws JMSException {
188         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
189         return (ActiveMQConnection) factory.createConnection();
190     }
191 
192     /***
193      * A static helper method to create a new connection
194      *
195      * @param uri
196      * @return and ActiveMQConnection
197      * @throws JMSException
198      */
199     public static ActiveMQConnection makeConnection(String uri) throws JMSException {
200         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
201         return (ActiveMQConnection) factory.createConnection();
202     }
203 
204     /***
205      * A static helper method to create a new connection
206      *
207      * @param user
208      * @param password
209      * @param uri
210      * @return an ActiveMQConnection
211      * @throws JMSException
212      */
213     public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException {
214         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, uri);
215         return (ActiveMQConnection) factory.createConnection();
216     }
217 
218     /***
219      * Constructs a connection from an existing TransportChannel and user/password.
220      *
221      * @param factory
222      * @param theUserName      the users name
223      * @param thePassword      the password
224      * @param transportChannel the transport channel to communicate with the server
225      * @throws JMSException
226      */
227     public ActiveMQConnection(ActiveMQConnectionFactory factory, String theUserName, String thePassword,
228                               TransportChannel transportChannel) throws JMSException {
229         this(factory, theUserName, thePassword);
230         this.transportChannel = transportChannel;
231         this.transportChannel.setPacketListener(this);
232         this.transportChannel.setExceptionListener(this);
233         this.transportChannel.addTransportStatusEventListener(this);
234         this.isTransportOK = true;
235     }
236 
237     protected ActiveMQConnection(ActiveMQConnectionFactory factory, String theUserName, String thePassword) {
238         this.factory = factory;
239         this.userName = theUserName;
240         this.password = thePassword;
241         this.clientIdGenerator = new IdGenerator();
242         this.packetIdGenerator = new IdGenerator();
243         this.consumerIdGenerator = new IdGenerator();
244         this.sessionIdGenerator = new IdGenerator();
245         this.consumerNumberGenerator = new SynchronizedInt(0);
246         this.sessions = new CopyOnWriteArrayList();
247         this.messageDispatchers = new CopyOnWriteArrayList();
248         this.connectionConsumers = new CopyOnWriteArrayList();
249         this.connectionMetaData = new ActiveMQConnectionMetaData();
250         this.closed = new SynchronizedBoolean(false);
251         this.started = new SynchronizedBoolean(false);
252         this.startTime = System.currentTimeMillis();
253         this.prefetchPolicy = new ActiveMQPrefetchPolicy();
254         this.boundedQueueManager = new MemoryBoundedQueueManager(clientID, DEFAULT_CONNECTION_MEMORY_LIMIT);
255         this.boundedQueueManager.addCapacityEventListener(this);
256         boolean transactional = this instanceof XAConnection;
257         factoryStats = factory.getFactoryStats();
258         factoryStats.addConnection(this);
259         stats = new JMSConnectionStatsImpl(sessions, transactional);
260         factory.onConnectionCreate(this);
261     }
262 
263     /***
264      * @return statistics for this Connection
265      */
266     public Stats getStats() {
267         return stats;
268     }
269 
270     /***
271      * @return a number unique for this connection
272      */
273     public JMSConnectionStatsImpl getConnectionStats() {
274         return stats;
275     }
276 
277     /***
278      * Creates a <CODE>Session</CODE> object.
279      *
280      * @param transacted      indicates whether the session is transacted
281      * @param acknowledgeMode indicates whether the consumer or the client will acknowledge any messages it receives;
282      *                        ignored if the session is transacted. Legal values are <code>Session.AUTO_ACKNOWLEDGE</code>,
283      *                        <code>Session.CLIENT_ACKNOWLEDGE</code>, and <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
284      * @return a newly created session
285      * @throws JMSException if the <CODE>Connection</CODE> object fails to create a session due to some internal error
286      *                      or lack of support for the specific transaction and acknowledgement mode.
287      * @see Session#AUTO_ACKNOWLEDGE
288      * @see Session#CLIENT_ACKNOWLEDGE
289      * @see Session#DUPS_OK_ACKNOWLEDGE
290      * @since 1.1
291      */
292     public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
293         checkClosed();
294         ensureClientIDInitialised();
295         return new ActiveMQSession(this, (transacted ? Session.SESSION_TRANSACTED : acknowledgeMode));
296     }
297 
298     /***
299      * Gets the client identifier for this connection.
300      * <P>
301      * This value is specific to the JMS provider. It is either preconfigured by an administrator in a <CODE>
302      * ConnectionFactory</CODE> object or assigned dynamically by the application by calling the
303      * <code>setClientID</code> method.
304      *
305      * @return the unique client identifier
306      * @throws JMSException if the JMS provider fails to return the client ID for this connection due to some internal
307      *                      error.
308      */
309     public String getClientID() throws JMSException {
310         checkClosed();
311         return this.clientID;
312     }
313 
314 
315     /***
316      * Sets the client identifier for this connection.
317      * <P>
318      * The preferred way to assign a JMS client's client identifier is for it to be configured in a client-specific
319      * <CODE>ConnectionFactory</CODE> object and transparently assigned to the <CODE>Connection</CODE> object it
320      * creates.
321      * <P>
322      * Alternatively, a client can set a connection's client identifier using a provider-specific value. The facility to
323      * set a connection's client identifier explicitly is not a mechanism for overriding the identifier that has been
324      * administratively configured. It is provided for the case where no administratively specified identifier exists.
325      * If one does exist, an attempt to change it by setting it must throw an <CODE>IllegalStateException</CODE>. If
326      * a client sets the client identifier explicitly, it must do so immediately after it creates the connection and
327      * before any other action on the connection is taken. After this point, setting the client identifier is a
328      * programming error that should throw an <CODE>IllegalStateException</CODE>.
329      * <P>
330      * The purpose of the client identifier is to associate a connection and its objects with a state maintained on
331      * behalf of the client by a provider. The only such state identified by the JMS API is that required to support
332      * durable subscriptions.
333      * <P>
334      * If another connection with the same <code>clientID</code> is already running when this method is called, the
335      * JMS provider should detect the duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
336      *
337      * @param newClientID the unique client identifier
338      * @throws JMSException if the JMS provider fails to set the client ID for this connection due to some internal
339      *                      error.
340      * @throws javax.jms.InvalidClientIDException
341      *                      if the JMS client specifies an invalid or duplicate client ID.
342      * @throws javax.jms.IllegalStateException
343      *                      if the JMS client attempts to set a connection's client ID at the wrong
344      *                      time or when it has been administratively configured.
345      */
346     public void setClientID(String newClientID) throws JMSException {
347         if (this.clientIDSet) {
348             throw new IllegalStateException("The clientID has already been set");
349         }
350         if (this.isConnectionInfoSentToBroker) {
351             throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
352         }
353         checkClosed();
354         this.clientID = newClientID;
355         this.userSpecifiedClientID = true;
356     }
357 
358     /***
359      * Gets the metadata for this connection.
360      *
361      * @return the connection metadata
362      * @throws JMSException if the JMS provider fails to get the connection metadata for this connection.
363      * @see javax.jms.ConnectionMetaData
364      */
365     public ConnectionMetaData getMetaData() throws JMSException {
366         checkClosed();
367         return this.connectionMetaData;
368     }
369 
370     /***
371      * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not every <CODE>Connection</CODE> has an
372      * <CODE>ExceptionListener</CODE> associated with it.
373      *
374      * @return the <CODE>ExceptionListener</CODE> for this connection, or null. if no <CODE>ExceptionListener</CODE>
375      *         is associated with this connection.
376      * @throws JMSException if the JMS provider fails to get the <CODE>ExceptionListener</CODE> for this connection.
377      * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
378      */
379     public ExceptionListener getExceptionListener() throws JMSException {
380         checkClosed();
381         return this.exceptionListener;
382     }
383 
384     /***
385      * Sets an exception listener for this connection.
386      * <P>
387      * If a JMS provider detects a serious problem with a connection, it informs the connection's <CODE>
388      * ExceptionListener</CODE>, if one has been registered. It does this by calling the listener's <CODE>onException
389      * </CODE> method, passing it a <CODE>JMSException</CODE> object describing the problem.
390      * <P>
391      * An exception listener allows a client to be notified of a problem asynchronously. Some connections only consume
392      * messages, so they would have no other way to learn their connection has failed.
393      * <P>
394      * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
395      * <P>
396      * A JMS provider should attempt to resolve connection problems itself before it notifies the client of them.
397      *
398      * @param listener the exception listener
399      * @throws JMSException if the JMS provider fails to set the exception listener for this connection.
400      */
401     public void setExceptionListener(ExceptionListener listener) throws JMSException {
402         checkClosed();
403         this.exceptionListener = listener;
404         this.transportChannel.setExceptionListener(listener);
405     }
406 
407     /***
408      * Starts (or restarts) a connection's delivery of incoming messages. A call to <CODE>start</CODE> on a connection
409      * that has already been started is ignored.
410      *
411      * @throws JMSException if the JMS provider fails to start message delivery due to some internal error.
412      * @see javax.jms.Connection#stop()
413      */
414     public void start() throws JMSException {
415         checkClosed();
416         if (started.commit(false, true)) {
417             sendConnectionInfoToBroker();
418             for (Iterator i = sessions.iterator(); i.hasNext();) {
419                 ActiveMQSession s = (ActiveMQSession) i.next();
420                 s.start();
421             }
422         }
423     }
424 
425     /***
426      * @return true if this Connection is started
427      */
428     protected boolean isStarted() {
429         return started.get();
430     }
431 
432     /***
433      * Temporarily stops a connection's delivery of incoming messages. Delivery can be restarted using the connection's
434      * <CODE>start</CODE> method. When the connection is stopped, delivery to all the connection's message consumers
435      * is inhibited: synchronous receives block, and messages are not delivered to message listeners.
436      * <P>
437      * This call blocks until receives and/or message listeners in progress have completed.
438      * <P>
439      * Stopping a connection has no effect on its ability to send messages. A call to <CODE>stop</CODE> on a
440      * connection that has already been stopped is ignored.
441      * <P>
442      * A call to <CODE>stop</CODE> must not return until delivery of messages has paused. This means that a client can
443      * rely on the fact that none of its message listeners will be called and that all threads of control waiting for
444      * <CODE>receive</CODE> calls to return will not return with a message until the connection is restarted. The
445      * receive timers for a stopped connection continue to advance, so receives may time out while the connection is
446      * stopped.
447      * <P>
448      * If message listeners are running when <CODE>stop</CODE> is invoked, the <CODE>stop</CODE> call must wait
449      * until all of them have returned before it may return. While these message listeners are completing, they must
450      * have the full services of the connection available to them.
451      *
452      * @throws JMSException if the JMS provider fails to stop message delivery due to some internal error.
453      * @see javax.jms.Connection#start()
454      */
455     public void stop() throws JMSException {
456         checkClosed();
457         if (started.commit(true, false)) {
458             for (Iterator i = sessions.iterator(); i.hasNext();) {
459                 ActiveMQSession s = (ActiveMQSession) i.next();
460                 s.stop();
461             }
462             sendConnectionInfoToBroker(2000, closed.get());
463             transportChannel.stop();
464         }
465     }
466 
467     /***
468      * Closes the connection.
469      * <P>
470      * Since a provider typically allocates significant resources outside the JVM on behalf of a connection, clients
471      * should close these resources when they are not needed. Relying on garbage collection to eventually reclaim these
472      * resources may not be timely enough.
473      * <P>
474      * There is no need to close the sessions, producers, and consumers of a closed connection.
475      * <P>
476      * Closing a connection causes all temporary destinations to be deleted.
477      * <P>
478      * When this method is invoked, it should not return until message processing has been shut down in an orderly
479      * fashion. This means that all message listeners that may have been running have returned, and that all pending
480      * receives have returned. A close terminates all pending message receives on the connection's sessions' consumers.
481      * The receives may return with a message or with null, depending on whether there was a message available at the
482      * time of the close. If one or more of the connection's sessions' message listeners is processing a message at the
483      * time when connection <CODE>close</CODE> is invoked, all the facilities of the connection and its sessions must
484      * remain available to those listeners until they return control to the JMS provider.
485      * <P>
486      * Closing a connection causes any of its sessions' transactions in progress to be rolled back. In the case where a
487      * session's work is coordinated by an external transaction manager, a session's <CODE>commit</CODE> and <CODE>
488      * rollback</CODE> methods are not used and the result of a closed session's work is determined later by the
489      * transaction manager. Closing a connection does NOT force an acknowledgment of client-acknowledged sessions.
490      * <P>
491      * Invoking the <CODE>acknowledge</CODE> method of a received message from a closed connection's session must
492      * throw an <CODE>IllegalStateException</CODE>. Closing a closed connection must NOT throw an exception.
493      *
494      * @throws JMSException if the JMS provider fails to close the connection due to some internal error. For example, a
495      *                      failure to release resources or to close a socket connection can cause this exception to be thrown.
496      */
497     public synchronized void close() throws JMSException {
498         this.transportChannel.setPendingStop(true);
499         if (!closed.get()) {
500             boundedQueueManager.removeCapacityEventListener(this);
501             try {
502                 for (Iterator i = this.sessions.iterator(); i.hasNext();) {
503                     ActiveMQSession s = (ActiveMQSession) i.next();
504                     s.close();
505                 }
506                 for (Iterator i = this.connectionConsumers.iterator(); i.hasNext();) {
507                     ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer) i.next();
508                     c.close();
509                 }
510                 try {
511                     sendConnectionInfoToBroker(sendCloseTimeout, true);
512                 }
513                 catch (TimeoutExpiredException e) {
514                     log.warn("Failed to send close to broker, timeout expired of: " + sendCloseTimeout + " millis");
515                 }
516                 this.connectionConsumers.clear();
517                 this.messageDispatchers.clear();
518                 this.transportChannel.stop();
519             }
520             finally {
521                 this.sessions.clear();
522                 started.set(false);
523                 factory.onConnectionClose(this);
524             }
525             closed.set(true);
526         }
527     }
528 
529 
530     /***
531      * simply throws an exception if the Connection is already closed
532      *
533      * @throws JMSException
534      */
535     protected synchronized void checkClosed() throws JMSException {
536         if (!startedTransport) {
537             startedTransport = true;
538             this.transportChannel.start();
539         }
540         if (this.closed.get()) {
541             throw new IllegalStateException("The Connection is closed");
542         }
543     }
544 
545     /***
546      * Creates a connection consumer for this connection (optional operation). This is an expert facility not used by
547      * regular JMS clients.
548      *
549      * @param destination     the destination to access
550      * @param messageSelector only messages with properties matching the message selector expression are delivered. A
551      *                        value of null or an empty string indicates that there is no message selector for the message consumer.
552      * @param sessionPool     the server session pool to associate with this connection consumer
553      * @param maxMessages     the maximum number of messages that can be assigned to a server session at one time
554      * @return the connection consumer
555      * @throws JMSException if the <CODE>Connection</CODE> object fails to create a connection consumer due to some
556      *                      internal error or invalid arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
557      * @throws javax.jms.InvalidDestinationException
558      *                      if an invalid destination is specified.
559      * @throws javax.jms.InvalidSelectorException
560      *                      if the message selector is invalid.
561      * @see javax.jms.ConnectionConsumer
562      * @since 1.1
563      */
564     public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
565                                                        ServerSessionPool sessionPool, int maxMessages) throws JMSException {
566         checkClosed();
567         ConsumerInfo info = new ConsumerInfo();
568         info.setId(this.packetIdGenerator.generateId());
569         info.setConsumerId(consumerIdGenerator.generateId());
570         info.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
571         info.setSelector(messageSelector);
572         return new ActiveMQConnectionConsumer(this, sessionPool, info, maxMessages);
573     }
574 
575     /***
576      * Create a durable connection consumer for this connection (optional operation). This is an expert facility not
577      * used by regular JMS clients.
578      *
579      * @param topic            topic to access
580      * @param subscriptionName durable subscription name
581      * @param messageSelector  only messages with properties matching the message selector expression are delivered. A
582      *                         value of null or an empty string indicates that there is no message selector for the message consumer.
583      * @param sessionPool      the server session pool to associate with this durable connection consumer
584      * @param maxMessages      the maximum number of messages that can be assigned to a server session at one time
585      * @return the durable connection consumer
586      * @throws JMSException if the <CODE>Connection</CODE> object fails to create a connection consumer due to some
587      *                      internal error or invalid arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
588      * @throws javax.jms.InvalidDestinationException
589      *                      if an invalid destination is specified.
590      * @throws javax.jms.InvalidSelectorException
591      *                      if the message selector is invalid.
592      * @see javax.jms.ConnectionConsumer
593      * @since 1.1
594      */
595     public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
596                                                               String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
597         checkClosed();
598         ConsumerInfo info = new ConsumerInfo();
599         info.setId(this.packetIdGenerator.generateId());
600         info.setConsumerId(this.consumerIdGenerator.generateId());
601         info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
602         info.setSelector(messageSelector);
603         info.setConsumerName(subscriptionName);
604         return new ActiveMQConnectionConsumer(this, sessionPool, info, maxMessages);
605     }
606 
607     /***
608      * Implementation of the PacketListener interface - consume a packet
609      *
610      * @param packet - the Packet to consume
611      * @see org.codehaus.activemq.message.PacketListener#consume(org.codehaus.activemq.message.Packet)
612      */
613     public void consume(Packet packet) {
614         if (!closed.get() && packet != null) {
615             if (packet.isJMSMessage()) {
616                 ActiveMQMessage message = (ActiveMQMessage) packet;
617                 message.setReadOnly(true);
618                 message.setProducerID(clientID);
619 
620                 try {
621                     int count = 0;
622                     for (Iterator i = this.messageDispatchers.iterator(); i.hasNext();) {
623                         ActiveMQMessageDispatcher dispatcher = (ActiveMQMessageDispatcher) i.next();
624                         if (dispatcher.isTarget(message)) {
625                             if (count > 0) {
626                                 //separate message for each Session etc.
627                                 message = message.deepCopy();
628                             }
629                             dispatcher.dispatch(message);
630                             count++;
631                         }
632                     }
633                 }
634                 catch (JMSException jmsEx) {
635                     handleAsyncException(jmsEx);
636                 }
637             }
638             else if (packet.getPacketType() == Packet.CAPACITY_INFO) {
639                 CapacityInfo info = (CapacityInfo) packet;
640                 flowControlSleepTime = info.getFlowControlTimeout();
641                 //System.out.println("SET FLOW TIMEOUT = " + flowControlSleepTime + " FOR " + info);
642             }
643         }
644     }
645 
646     /***
647      * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
648      */
649     public void onException(JMSException jmsEx) {
650         //Got an exception propagated up from the transport channel
651         handleAsyncException(jmsEx);
652         isTransportOK = false;
653         try {
654             close();
655         }
656         catch (JMSException ex) {
657             log.warn("Got an exception closing the connection", ex);
658         }
659     }
660 
661     /***
662      * Creates a <CODE>TopicSession</CODE> object.
663      *
664      * @param transacted      indicates whether the session is transacted
665      * @param acknowledgeMode indicates whether the consumer or the client will acknowledge any messages it receives;
666      *                        ignored if the session is transacted. Legal values are <code>Session.AUTO_ACKNOWLEDGE</code>,
667      *                        <code>Session.CLIENT_ACKNOWLEDGE</code>, and <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
668      * @return a newly created topic session
669      * @throws JMSException if the <CODE>TopicConnection</CODE> object fails to create a session due to some internal
670      *                      error or lack of support for the specific transaction and acknowledgement mode.
671      * @see Session#AUTO_ACKNOWLEDGE
672      * @see Session#CLIENT_ACKNOWLEDGE
673      * @see Session#DUPS_OK_ACKNOWLEDGE
674      */
675     public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
676         checkClosed();
677         return new ActiveMQSession(this, (transacted ? Session.SESSION_TRANSACTED : acknowledgeMode));
678     }
679 
680     /***
681      * Creates a connection consumer for this connection (optional operation). This is an expert facility not used by
682      * regular JMS clients.
683      *
684      * @param topic           the topic to access
685      * @param messageSelector only messages with properties matching the message selector expression are delivered. A
686      *                        value of null or an empty string indicates that there is no message selector for the message consumer.
687      * @param sessionPool     the server session pool to associate with this connection consumer
688      * @param maxMessages     the maximum number of messages that can be assigned to a server session at one time
689      * @return the connection consumer
690      * @throws JMSException                if the <CODE>TopicConnection</CODE> object fails to create a connection consumer due to
691      *                                     some internal error or invalid arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
692      * @throws InvalidDestinationException if an invalid topic is specified.
693      * @throws InvalidSelectorException    if the message selector is invalid.
694      * @see javax.jms.ConnectionConsumer
695      */
696     public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector,
697                                                        ServerSessionPool sessionPool, int maxMessages) throws JMSException {
698         checkClosed();
699         ConsumerInfo info = new ConsumerInfo();
700         info.setId(this.packetIdGenerator.generateId());
701         info.setConsumerId(this.consumerIdGenerator.generateId());
702         info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
703         info.setSelector(messageSelector);
704         return new ActiveMQConnectionConsumer(this, sessionPool, info, maxMessages);
705     }
706 
707     /***
708      * Creates a <CODE>QueueSession</CODE> object.
709      *
710      * @param transacted      indicates whether the session is transacted
711      * @param acknowledgeMode indicates whether the consumer or the client will acknowledge any messages it receives;
712      *                        ignored if the session is transacted. Legal values are <code>Session.AUTO_ACKNOWLEDGE</code>,
713      *                        <code>Session.CLIENT_ACKNOWLEDGE</code>, and <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
714      * @return a newly created queue session
715      * @throws JMSException if the <CODE>QueueConnection</CODE> object fails to create a session due to some internal
716      *                      error or lack of support for the specific transaction and acknowledgement mode.
717      * @see Session#AUTO_ACKNOWLEDGE
718      * @see Session#CLIENT_ACKNOWLEDGE
719      * @see Session#DUPS_OK_ACKNOWLEDGE
720      */
721     public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
722         checkClosed();
723         return new ActiveMQSession(this, (transacted ? Session.SESSION_TRANSACTED : acknowledgeMode));
724     }
725 
726     /***
727      * Creates a connection consumer for this connection (optional operation). This is an expert facility not used by
728      * regular JMS clients.
729      *
730      * @param queue           the queue to access
731      * @param messageSelector only messages with properties matching the message selector expression are delivered. A
732      *                        value of null or an empty string indicates that there is no message selector for the message consumer.
733      * @param sessionPool     the server session pool to associate with this connection consumer
734      * @param maxMessages     the maximum number of messages that can be assigned to a server session at one time
735      * @return the connection consumer
736      * @throws JMSException                if the <CODE>QueueConnection</CODE> object fails to create a connection consumer due to
737      *                                     some internal error or invalid arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
738      * @throws InvalidDestinationException if an invalid queue is specified.
739      * @throws InvalidSelectorException    if the message selector is invalid.
740      * @see javax.jms.ConnectionConsumer
741      */
742     public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector,
743                                                        ServerSessionPool sessionPool, int maxMessages) throws JMSException {
744         checkClosed();
745         ConsumerInfo info = new ConsumerInfo();
746         info.setId(this.packetIdGenerator.generateId());
747         info.setConsumerId(this.consumerIdGenerator.generateId());
748         info.setDestination(ActiveMQMessageTransformation.transformDestination(queue));
749         info.setSelector(messageSelector);
750         return new ActiveMQConnectionConsumer(this, sessionPool, info, maxMessages);
751     }
752 
753     /***
754      * Ensures that the clientID was manually specified and not auto-generated. If the clientID was not specified this
755      * method will throw an exception. This method is used to ensure that the clientID + durableSubscriber name are used
756      * correctly.
757      *
758      * @throws JMSException
759      */
760     public void checkClientIDWasManuallySpecified() throws JMSException {
761         if (!userSpecifiedClientID) {
762             throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
763         }
764     }
765 
766     /***
767      * handle disconnect/reconnect events
768      *
769      * @param event
770      */
771     public void statusChanged(TransportStatusEvent event) {
772         log.info("channel status changed: " + event);
773         if (event.getChannelStatus() == TransportStatusEvent.RECONNECTED) {
774             doReconnect();
775         }
776         else if (event.getChannelStatus() == TransportStatusEvent.DISCONNECTED) {
777             clearMessagesInProgress();
778         }
779     }
780 
781 
782     /***
783      * send a Packet through the Connection - for internal use only
784      *
785      * @param packet
786      * @throws JMSException
787      */
788     public void asyncSendPacket(Packet packet) throws JMSException {
789         asyncSendPacket(packet, true);
790     }
791 
792     /***
793      * send a Packet through the Connection - for internal use only
794      *
795      * @param packet
796      * @param doSendWhileReconnecting
797      * @throws JMSException
798      */
799     public void asyncSendPacket(Packet packet, boolean doSendWhileReconnecting) throws JMSException {
800         if (isTransportOK && !closed.get() && (doSendWhileReconnecting || transportChannel.isTransportConnected())) {
801             packet.setReceiptRequired(false);
802             if (packet.isJMSMessage() && flowControlSleepTime > 0) {
803                 try {
804                     Thread.sleep(flowControlSleepTime);
805                 }
806                 catch (InterruptedException e) {
807                 }
808             }
809             this.transportChannel.asyncSend(packet);
810         }
811     }
812 
813     /***
814      * send a Packet through a Connection - for internal use only
815      *
816      * @param packet
817      * @throws JMSException
818      */
819     public void syncSendPacket(Packet packet) throws JMSException {
820         syncSendPacket(packet, 0);
821     }
822 
823     /***
824      * Send a packet through a Connection - for internal use only
825      *
826      * @param packet
827      * @param timeout
828      * @throws JMSException
829      */
830     public void syncSendPacket(Packet packet, int timeout) throws JMSException {
831         if (isTransportOK && !closed.get()) {
832             Receipt receipt;
833             packet.setReceiptRequired(true);
834             receipt = this.transportChannel.send(packet, timeout);
835             if (receipt != null) {
836                 if (receipt.isFailed()) {
837                     Throwable e = receipt.getException();
838                     if (e != null) {
839                         throw JMSExceptionHelper.newJMSException(e);
840                     }
841                     throw new JMSException("syncSendPacket failed with unknown exception");
842                 }
843             }
844         }
845         else {
846             throw new JMSException("syncSendTimedOut");
847         }
848     }
849     
850     /***
851      * send a Packet and get a receipt
852      * @param packet
853      * @return
854      * @throws JMSException
855      */
856 
857 
858     // Properties
859     //-------------------------------------------------------------------------
860 
861     /***
862      * @return Returns the prefetchPolicy.
863      */
864     public ActiveMQPrefetchPolicy getPrefetchPolicy() {
865         return prefetchPolicy;
866     }
867 
868     /***
869      * @param prefetchPolicy The prefetchPolicy to set.
870      */
871     public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
872         this.prefetchPolicy = prefetchPolicy;
873     }
874 
875     public int getSendCloseTimeout() {
876         return sendCloseTimeout;
877     }
878 
879     public void setSendCloseTimeout(int sendCloseTimeout) {
880         this.sendCloseTimeout = sendCloseTimeout;
881     }
882 
883     public int getSendConnectionInfoTimeout() {
884         return sendConnectionInfoTimeout;
885     }
886 
887     public void setSendConnectionInfoTimeout(int sendConnectionInfoTimeout) {
888         this.sendConnectionInfoTimeout = sendConnectionInfoTimeout;
889     }
890 
891     public Receipt syncSendRequest(Packet packet) throws JMSException {
892         checkClosed();
893         if (isTransportOK && !closed.get()) {
894             Receipt receipt;
895             packet.setReceiptRequired(true);
896             if (packet.getId() == null || packet.getId().length() == 0) {
897                 packet.setId(this.packetIdGenerator.generateId());
898             }
899             receipt = this.transportChannel.send(packet);
900             if (receipt.isFailed()) {
901                 Throwable e = receipt.getException();
902                 if (e != null) {
903                     throw (JMSException) new JMSException(e.getMessage()).initCause(e);
904                 }
905                 throw new JMSException("syncSendPacket failed with unknown exception");
906             }
907             return receipt;
908         }
909         else {
910             throw new JMSException("Connection closed.");
911         }
912     }
913 
914     public TransportChannel getTransportChannel() {
915         return transportChannel;
916     }
917 
918 
919     /***
920      * Returns the clientID of the connection, forcing one to be generated if one has not yet been configured
921      */
922     public String getInitializedClientID() throws JMSException {
923         ensureClientIDInitialised();
924         return this.clientID;
925     }
926 
927 
928     // Implementation methods
929     //-------------------------------------------------------------------------
930 
931     /***
932      * Used internally for adding Sessions to the Connection
933      *
934      * @param session
935      * @throws JMSException
936      */
937     protected void addSession(ActiveMQSession session) throws JMSException {
938         this.sessions.add(session);
939         addMessageDispatcher(session);
940         if (started.get()) {
941             session.start();
942         }
943         SessionInfo info = createSessionInfo(session);
944         info.setStarted(true);
945         asyncSendPacket(info);
946     }
947 
948     /***
949      * Used interanlly for removing Sessions from a Connection
950      *
951      * @param session
952      * @throws JMSException
953      */
954     protected void removeSession(ActiveMQSession session) throws JMSException {
955         this.sessions.remove(session);
956         removeMessageDispatcher(session);
957         SessionInfo info = createSessionInfo(session);
958         info.setStarted(false);
959         asyncSendPacket(info, false);
960     }
961 
962     private SessionInfo createSessionInfo(ActiveMQSession session) {
963         SessionInfo info = new SessionInfo();
964         info.setId(packetIdGenerator.generateId());
965         info.setClientId(clientID);
966         info.setSessionId(session.getSessionId());
967         info.setStartTime(session.getStartTime());
968         return info;
969     }
970 
971     /***
972      * Add a ConnectionConsumer
973      *
974      * @param connectionConsumer
975      * @throws JMSException
976      */
977     protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
978         this.connectionConsumers.add(connectionConsumer);
979         addMessageDispatcher(connectionConsumer);
980     }
981 
982     /***
983      * Remove a ConnectionConsumer
984      *
985      * @param connectionConsumer
986      */
987     protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
988         this.connectionConsumers.add(connectionConsumer);
989         removeMessageDispatcher(connectionConsumer);
990     }
991 
992     /***
993      * Add a Message dispatcher to receive messages from the Broker
994      *
995      * @param messageDispatch
996      * @throws JMSException if an internal error
997      */
998     protected void addMessageDispatcher(ActiveMQMessageDispatcher messageDispatch) throws JMSException {
999         this.messageDispatchers.add(messageDispatch);
1000     }
1001 
1002     /***
1003      * Remove a Message dispatcher
1004      *
1005      * @param messageDispatcher
1006      */
1007     protected void removeMessageDispatcher(ActiveMQMessageDispatcher messageDispatcher) {
1008         this.messageDispatchers.remove(messageDispatcher);
1009     }
1010 
1011     /***
1012      * Used for handling async exceptions
1013      *
1014      * @param jmsEx
1015      */
1016     protected void handleAsyncException(JMSException jmsEx) {
1017         if (this.exceptionListener != null) {
1018             this.exceptionListener.onException(jmsEx);
1019         }
1020         else {
1021             log.warn("async exception with no exception listener", jmsEx);
1022         }
1023     }
1024 
1025     protected void sendConnectionInfoToBroker() throws JMSException {
1026         sendConnectionInfoToBroker(sendConnectionInfoTimeout, closed.get());
1027     }
1028 
1029     /***
1030      * Send the ConnectionInfo to the Broker
1031      *
1032      * @param timeout
1033      * @param isClosed
1034      * @throws JMSException
1035      */
1036     protected void sendConnectionInfoToBroker(int timeout, boolean isClosed) throws JMSException {
1037         if (!isConnectionInfoSentToBroker) {
1038             this.isConnectionInfoSentToBroker = true;
1039         }
1040         else {
1041             if (!isClosed) {
1042                 // we must have done a send() and then a start()
1043                 // so ignore this operation
1044                 return;
1045             }
1046         }
1047         ensureClientIDInitialised();
1048         ConnectionInfo info = new ConnectionInfo();
1049         info.setClientId(this.clientID);
1050         info.setHostName(IdGenerator.getHostName());
1051         info.setUserName(userName);
1052         info.setPassword(password);
1053         info.setId(packetIdGenerator.generateId());
1054         info.setStartTime(startTime);
1055         info.setStarted(started.get());
1056         info.setClosed(isClosed);
1057         info.setClientVersion(connectionMetaData.getProviderVersion());
1058         info.setWireFormatVersion(transportChannel.getCurrentWireFormatVersion());
1059         syncSendPacket(info, timeout);
1060     }
1061 
1062     /***
1063      * Set the maximum amount of memory this Connection should use for buffered inbound messages
1064      *
1065      * @param newMemoryLimit the new memory limit in bytes
1066      */
1067     public void setConnectionMemoryLimit(int newMemoryLimit) {
1068         boundedQueueManager.setValueLimit(newMemoryLimit);
1069     }
1070 
1071     /***
1072      * Get the current value for the maximum amount of memory this Connection should use for buffered inbound messages
1073      *
1074      * @return the current limit in bytes
1075      */
1076     public int getConnectionMemoryLimit() {
1077         return (int) boundedQueueManager.getValueLimit();
1078     }
1079 
1080     /***
1081      * CapacityMonitorEventListener implementation called when the capacity of a CapacityService changes
1082      *
1083      * @param event
1084      */
1085     public void capacityChanged(CapacityMonitorEvent event) {
1086         //send the event to broker ...
1087         CapacityInfo info = new CapacityInfo();
1088         info.setId(packetIdGenerator.generateId());
1089         info.setResourceName(event.getMonitorName());
1090         info.setCapacity(event.getCapacity());
1091         //System.out.println("Cap changed: " + event);
1092         try {
1093             asyncSendPacket(info, false);
1094         }
1095         catch (JMSException e) {
1096             JMSException jmsEx = new JMSException("failed to send change in capacity");
1097             jmsEx.setLinkedException(e);
1098             handleAsyncException(jmsEx);
1099         }
1100     }
1101 
1102     /***
1103      * @return a number unique for this connection
1104      */
1105     protected int getNextConsumerNumber() {
1106         return this.consumerNumberGenerator.increment();
1107     }
1108 
1109     protected String generateSessionId() {
1110         return this.sessionIdGenerator.generateId();
1111     }
1112 
1113     protected void ensureClientIDInitialised() {
1114         if (this.clientID == null) {
1115             this.clientID = this.clientIdGenerator.generateId();
1116         }
1117         transportChannel.setClientID(clientID);
1118         this.clientIDSet = true;
1119     }
1120 
1121     protected MemoryBoundedQueue getMemoryBoundedQueue(String name) {
1122         return boundedQueueManager.getMemoryBoundedQueue(name);
1123     }
1124 
1125     protected void doReconnect() {
1126         try {
1127             //send the Connection info again
1128             this.isConnectionInfoSentToBroker = false;
1129             sendConnectionInfoToBroker();
1130             for (Iterator iter = sessions.iterator(); iter.hasNext();) {
1131                 ActiveMQSession session = (ActiveMQSession) iter.next();
1132                 SessionInfo sessionInfo = createSessionInfo(session);
1133                 sessionInfo.setStarted(true);
1134                 asyncSendPacket(sessionInfo, false);
1135                 //send consumers
1136                 for (Iterator consumersIterator = session.consumers.iterator(); consumersIterator.hasNext();) {
1137                     ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) consumersIterator.next();
1138                     ConsumerInfo consumerInfo = session.createConsumerInfo(consumer);
1139                     consumerInfo.setStarted(true);
1140                     asyncSendPacket(consumerInfo, false);
1141                 }
1142                 //send producers
1143                 for (Iterator producersIterator = session.producers.iterator(); producersIterator.hasNext();) {
1144                     ActiveMQMessageProducer producer = (ActiveMQMessageProducer) producersIterator.next();
1145                     ProducerInfo producerInfo = session.createProducerInfo(producer);
1146                     producerInfo.setStarted(true);
1147                     asyncSendPacket(producerInfo, false);
1148                 }
1149             }
1150         }
1151         catch (JMSException jmsEx) {
1152             log.error("Failed to do reconnection");
1153             handleAsyncException(jmsEx);
1154             isTransportOK = false;
1155         }
1156     }
1157 
1158     /***
1159      * @return Returns the useAsyncSend.
1160      */
1161     public boolean isUseAsyncSend() {
1162         return useAsyncSend;
1163     }
1164 
1165     /***
1166      * @param useAsyncSend The useAsyncSend to set.
1167      */
1168     public void setUseAsyncSend(boolean useAsyncSend) {
1169         this.useAsyncSend = useAsyncSend;
1170     }
1171 
1172     protected void clearMessagesInProgress() {
1173         for (Iterator i = sessions.iterator(); i.hasNext();) {
1174             ActiveMQSession session = (ActiveMQSession) i.next();
1175             session.clearMessagesInProgress();
1176         }
1177     }
1178 }