1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq;
20
21 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
22 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.codehaus.activemq.management.JMSSessionStatsImpl;
26 import org.codehaus.activemq.management.StatsCapable;
27 import org.codehaus.activemq.message.*;
28 import org.codehaus.activemq.ra.LocalTransactionEventListener;
29 import org.codehaus.activemq.service.impl.DefaultQueueList;
30 import org.codehaus.activemq.util.IdGenerator;
31
32 import javax.jms.*;
33 import javax.jms.IllegalStateException;
34 import javax.management.j2ee.statistics.Stats;
35 import java.io.Serializable;
36 import java.util.Iterator;
37 import java.util.LinkedList;
38 import java.util.ListIterator;
39
40 /***
41 * <P>
42 * A <CODE>Session</CODE> object is a single-threaded context for producing and consuming messages. Although it may
43 * allocate provider resources outside the Java virtual machine (JVM), it is considered a lightweight JMS object.
44 * <P>
45 * A session serves several purposes:
46 * <UL>
47 * <LI>It is a factory for its message producers and consumers.
48 * <LI>It supplies provider-optimized message factories.
49 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and <CODE>TemporaryQueues</CODE>.
50 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE> objects for those clients that need to
51 * dynamically manipulate provider-specific destination names.
52 * <LI>It supports a single series of transactions that combine work spanning its producers and consumers into atomic
53 * units.
54 * <LI>It defines a serial order for the messages it consumes and the messages it produces.
55 * <LI>It retains messages it consumes until they have been acknowledged.
56 * <LI>It serializes execution of message listeners registered with its message consumers.
57 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
58 * </UL>
59 * <P>
60 * A session can create and service multiple message producers and consumers.
61 * <P>
62 * One typical use is to have a thread block on a synchronous <CODE>MessageConsumer</CODE> until a message arrives.
63 * The thread may then use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
64 * <P>
65 * If a client desires to have one thread produce messages while others consume them, the client should use a separate
66 * session for its producing thread.
67 * <P>
68 * Once a connection has been started, any session with one or more registered message listeners is dedicated to the
69 * thread of control that delivers messages to it. It is erroneous for client code to use this session or any of its
70 * constituent objects from another thread of control. The only exception to this rule is the use of the session or
71 * connection <CODE>close</CODE> method.
72 * <P>
73 * It should be easy for most clients to partition their work naturally into sessions. This model allows clients to
74 * start simply and incrementally add message processing complexity as their need for concurrency grows.
75 * <P>
76 * The <CODE>close</CODE> method is the only session method that can be called while some other session method is
77 * being executed in another thread.
78 * <P>
79 * A session may be specified as transacted. Each transacted session supports a single series of transactions. Each
80 * transaction groups a set of message sends and a set of message receives into an atomic unit of work. In effect,
81 * transactions organize a session's input message stream and output message stream into series of atomic units. When a
82 * transaction commits, its atomic unit of input is acknowledged and its associated atomic unit of output is sent. If a
83 * transaction rollback is done, the transaction's sent messages are destroyed and the session's input is automatically
84 * recovered.
85 * <P>
86 * The content of a transaction's input and output units is simply those messages that have been produced and consumed
87 * within the session's current transaction.
88 * <P>
89 * A transaction is completed using either its session's <CODE>commit</CODE> method or its session's <CODE>rollback
90 * </CODE> method. The completion of a session's current transaction automatically begins the next. The result is that a
91 * transacted session always has a current transaction within which its work is done.
92 * <P>
93 * The Java Transaction Service (JTS) or some other transaction monitor may be used to combine a session's transaction
94 * with transactions on other resources (databases, other JMS sessions, etc.). Since Java distributed transactions are
95 * controlled via the Java Transaction API (JTA), use of the session's <CODE>commit</CODE> and <CODE>rollback</CODE>
96 * methods in this context is prohibited.
97 * <P>
98 * The JMS API does not require support for JTA; however, it does define how a provider supplies this support.
99 * <P>
100 * Although it is also possible for a JMS client to handle distributed transactions directly, it is unlikely that many
101 * JMS clients will do this. Support for JTA in the JMS API is targeted at systems vendors who will be integrating the
102 * JMS API into their application server products.
103 *
104 * @version $Revision: 1.54 $
105 * @see javax.jms.Session
106 * @see javax.jms.QueueSession
107 * @see javax.jms.TopicSession
108 * @see javax.jms.XASession
109 */
110 public class ActiveMQSession
111 implements
112 Session,
113 QueueSession,
114 TopicSession,
115 ActiveMQMessageDispatcher,
116 MessageAcknowledge,
117 StatsCapable {
118 protected static final int CONSUMER_DISPATCH_UNSET = 1;
119 protected static final int CONSUMER_DISPATCH_ASYNC = 2;
120 protected static final int CONSUMER_DISPATCH_SYNC = 3;
121 private static final Log log = LogFactory.getLog(ActiveMQSession.class);
122 protected ActiveMQConnection connection;
123 private int acknowledgeMode;
124 protected CopyOnWriteArrayList consumers;
125 protected CopyOnWriteArrayList producers;
126 private IdGenerator transactionIdGenerator;
127 private IdGenerator temporaryDestinationGenerator;
128 protected IdGenerator packetIdGenerator;
129 private IdGenerator producerIdGenerator;
130 private IdGenerator consumerIdGenerator;
131 private MessageListener messageListener;
132 protected SynchronizedBoolean closed;
133 private SynchronizedBoolean startTransaction;
134 private String sessionId;
135 protected String currentTransactionId;
136 private long startTime;
137 private LocalTransactionEventListener localTransactionEventListener;
138 private DefaultQueueList deliveredMessages;
139 private ActiveMQSessionExecutor messageExecutor;
140 private JMSSessionStatsImpl stats;
141 private int consumerDispatchState;
142
143 /***
144 * Construct the Session
145 *
146 * @param theConnection
147 * @param theAcknowledgeMode n.b if transacted - the acknowledgeMode == Session.SESSION_TRANSACTED
148 * @throws JMSException on internal error
149 */
150 protected ActiveMQSession(ActiveMQConnection theConnection, int theAcknowledgeMode) throws JMSException {
151 this.connection = theConnection;
152 this.acknowledgeMode = theAcknowledgeMode;
153 this.consumers = new CopyOnWriteArrayList();
154 this.producers = new CopyOnWriteArrayList();
155 this.producerIdGenerator = new IdGenerator();
156 this.consumerIdGenerator = new IdGenerator();
157 this.transactionIdGenerator = new IdGenerator();
158 this.temporaryDestinationGenerator = new IdGenerator();
159 this.packetIdGenerator = new IdGenerator();
160 this.closed = new SynchronizedBoolean(false);
161 this.startTransaction = new SynchronizedBoolean(false);
162 this.sessionId = connection.generateSessionId();
163 this.startTime = System.currentTimeMillis();
164 this.deliveredMessages = new DefaultQueueList();
165 this.messageExecutor = new ActiveMQSessionExecutor(this, connection.getMemoryBoundedQueue(sessionId));
166 if (getTransacted()) {
167 this.currentTransactionId = getNextTransactionId();
168 }
169 connection.addSession(this);
170 stats = new JMSSessionStatsImpl(producers, consumers);
171 this.consumerDispatchState = CONSUMER_DISPATCH_UNSET;
172 }
173
174 public Stats getStats() {
175 return stats;
176 }
177
178 public JMSSessionStatsImpl getSessionStats() {
179 return stats;
180 }
181
182 /***
183 * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE> object is used to send a message
184 * containing a stream of uninterpreted bytes.
185 *
186 * @return the an ActiveMQBytesMessage
187 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
188 */
189 public BytesMessage createBytesMessage() throws JMSException {
190 checkClosed();
191 return new ActiveMQBytesMessage();
192 }
193
194 /***
195 * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE> object is used to send a self-defining
196 * set of name-value pairs, where names are <CODE>String</CODE> objects and values are primitive values in the
197 * Java programming language.
198 *
199 * @return an ActiveMQMapMessage
200 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
201 */
202 public MapMessage createMapMessage() throws JMSException {
203 checkClosed();
204 return new ActiveMQMapMessage();
205 }
206
207 /***
208 * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE> interface is the root interface of all JMS
209 * messages. A <CODE>Message</CODE> object holds all the standard message header information. It can be sent when
210 * a message containing only header information is sufficient.
211 *
212 * @return an ActiveMQMessage
213 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
214 */
215 public Message createMessage() throws JMSException {
216 checkClosed();
217 return new ActiveMQMessage();
218 }
219
220 /***
221 * Creates an <CODE>ObjectMessage</CODE> object. An <CODE>ObjectMessage</CODE> object is used to send a message
222 * that contains a serializable Java object.
223 *
224 * @return an ActiveMQObjectMessage
225 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
226 */
227 public ObjectMessage createObjectMessage() throws JMSException {
228 checkClosed();
229 return new ActiveMQObjectMessage();
230 }
231
232 /***
233 * Creates an initialized <CODE>ObjectMessage</CODE> object. An <CODE>ObjectMessage</CODE> object is used to
234 * send a message that contains a serializable Java object.
235 *
236 * @param object the object to use to initialize this message
237 * @return an ActiveMQObjectMessage
238 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
239 */
240 public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
241 checkClosed();
242 ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
243 msg.setObject(object);
244 return msg;
245 }
246
247 /***
248 * Creates a <CODE>StreamMessage</CODE> object. A <CODE>StreamMessage</CODE> object is used to send a
249 * self-defining stream of primitive values in the Java programming language.
250 *
251 * @return an ActiveMQStreamMessage
252 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
253 */
254 public StreamMessage createStreamMessage() throws JMSException {
255 checkClosed();
256 return new ActiveMQStreamMessage();
257 }
258
259 /***
260 * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> object is used to send a message
261 * containing a <CODE>String</CODE> object.
262 *
263 * @return an ActiveMQTextMessage
264 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
265 */
266 public TextMessage createTextMessage() throws JMSException {
267 checkClosed();
268 return new ActiveMQTextMessage();
269 }
270
271 /***
272 * Creates an initialized <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> object is used to send a
273 * message containing a <CODE>String</CODE>.
274 *
275 * @param text the string used to initialize this message
276 * @return an ActiveMQTextMessage
277 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
278 */
279 public TextMessage createTextMessage(String text) throws JMSException {
280 checkClosed();
281 ActiveMQTextMessage msg = new ActiveMQTextMessage();
282 msg.setText(text);
283 return msg;
284 }
285
286 /***
287 * Indicates whether the session is in transacted mode.
288 *
289 * @return true if the session is in transacted mode
290 * @throws JMSException if there is some internal error.
291 */
292 public boolean getTransacted() throws JMSException {
293 checkClosed();
294 return this.acknowledgeMode == Session.SESSION_TRANSACTED;
295 }
296
297 /***
298 * Returns the acknowledgement mode of the session. The acknowledgement mode is set at the time that the session is
299 * created. If the session is transacted, the acknowledgement mode is ignored.
300 *
301 * @return If the session is not transacted, returns the current acknowledgement mode for the session. If the
302 * session is transacted, returns SESSION_TRANSACTED.
303 * @throws JMSException
304 * @see javax.jms.Connection#createSession(boolean,int)
305 * @since 1.1 exception JMSException if there is some internal error.
306 */
307 public int getAcknowledgeMode() throws JMSException {
308 checkClosed();
309 return this.acknowledgeMode;
310 }
311
312 /***
313 * Commits all messages done in this transaction and releases any locks currently held.
314 *
315 * @throws JMSException if the JMS provider fails to commit the transaction due to some internal error.
316 * @throws TransactionRolledBackException if the transaction is rolled back due to some internal error during
317 * commit.
318 * @throws javax.jms.IllegalStateException
319 * if the method is not called by a transacted session.
320 */
321 public void commit() throws JMSException {
322 checkClosed();
323 if (!getTransacted()) {
324 throw new javax.jms.IllegalStateException("Not a transacted session");
325 }
326
327 if (this.startTransaction.commit(true, false)) {
328 TransactionInfo info = new TransactionInfo();
329 info.setId(this.packetIdGenerator.generateId());
330 info.setTransactionId(currentTransactionId);
331 info.setType(TransactionInfo.COMMIT);
332
333 this.currentTransactionId = getNextTransactionId();
334
335 this.connection.syncSendPacket(info);
336 if (localTransactionEventListener != null) {
337 localTransactionEventListener.commitEvent();
338 }
339 }
340 deliveredMessages.clear();
341 }
342
343 /***
344 * Rolls back any messages done in this transaction and releases any locks currently held.
345 *
346 * @throws JMSException if the JMS provider fails to roll back the transaction due to some internal error.
347 * @throws javax.jms.IllegalStateException
348 * if the method is not called by a transacted session.
349 */
350 public void rollback() throws JMSException {
351 checkClosed();
352 if (!getTransacted()) {
353 throw new javax.jms.IllegalStateException("Not a transacted session");
354 }
355
356 if (this.startTransaction.commit(true, false)) {
357 TransactionInfo info = new TransactionInfo();
358 info.setId(this.packetIdGenerator.generateId());
359 info.setTransactionId(currentTransactionId);
360 info.setType(TransactionInfo.ROLLBACK);
361
362 this.currentTransactionId = getNextTransactionId();
363 this.connection.asyncSendPacket(info);
364
365 if (localTransactionEventListener != null) {
366 localTransactionEventListener.rollbackEvent();
367 }
368 }
369 redeliverUnacknowledgedMessages(true);
370 deliveredMessages.clear();
371 }
372
373 /***
374 * Closes the session.
375 * <P>
376 * Since a provider may allocate some resources on behalf of a session outside the JVM, clients should close the
377 * resources when they are not needed. Relying on garbage collection to eventually reclaim these resources may not
378 * be timely enough.
379 * <P>
380 * There is no need to close the producers and consumers of a closed session.
381 * <P>
382 * This call will block until a <CODE>receive</CODE> call or message listener in progress has completed. A blocked
383 * message consumer <CODE>receive</CODE> call returns <CODE>null</CODE> when this session is closed.
384 * <P>
385 * Closing a transacted session must roll back the transaction in progress.
386 * <P>
387 * This method is the only <CODE>Session</CODE> method that can be called concurrently.
388 * <P>
389 * Invoking any other <CODE>Session</CODE> method on a closed session must throw a <CODE>
390 * JMSException.IllegalStateException</CODE>. Closing a closed session must <I>not </I> throw an exception.
391 *
392 * @throws JMSException if the JMS provider fails to close the session due to some internal error.
393 */
394 public void close() throws JMSException {
395 if (!this.closed.get()) {
396 if (getTransacted()) {
397 rollback();
398 }
399 doClose();
400 closed.set(true);
401 }
402 }
403
404 protected void doClose() throws JMSException {
405 doAcknowledge(true);
406 for (Iterator i = consumers.iterator(); i.hasNext();) {
407 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
408 consumer.close();
409 }
410 for (Iterator i = producers.iterator(); i.hasNext();) {
411 ActiveMQMessageProducer producer = (ActiveMQMessageProducer) i.next();
412 producer.close();
413 }
414 consumers.clear();
415 producers.clear();
416 this.connection.removeSession(this);
417 messageExecutor.close();
418 deliveredMessages.clear();
419 }
420
421 /***
422 * @throws IllegalStateException if the Session is closed
423 */
424 protected void checkClosed() throws IllegalStateException {
425 if (this.closed.get()) {
426 throw new IllegalStateException("The Consumer is closed");
427 }
428 }
429
430 /***
431 * Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message.
432 * <P>
433 * All consumers deliver messages in a serial order. Acknowledging a received message automatically acknowledges all
434 * messages that have been delivered to the client.
435 * <P>
436 * Restarting a session causes it to take the following actions:
437 * <UL>
438 * <LI>Stop message delivery
439 * <LI>Mark all messages that might have been delivered but not acknowledged as "redelivered"
440 * <LI>Restart the delivery sequence including all unacknowledged messages that had been previously delivered.
441 * Redelivered messages do not have to be delivered in exactly their original delivery order.
442 * </UL>
443 *
444 * @throws JMSException if the JMS provider fails to stop and restart message delivery due to some internal error.
445 * @throws IllegalStateException if the method is called by a transacted session.
446 */
447 public void recover() throws JMSException {
448 checkClosed();
449 if (getTransacted()) {
450 throw new IllegalStateException("This session is transacted");
451 }
452 redeliverUnacknowledgedMessages();
453 }
454
455 /***
456 * Returns the session's distinguished message listener (optional).
457 *
458 * @return the message listener associated with this session
459 * @throws JMSException if the JMS provider fails to get the message listener due to an internal error.
460 * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
461 * @see javax.jms.ServerSessionPool
462 * @see javax.jms.ServerSession
463 */
464 public MessageListener getMessageListener() throws JMSException {
465 checkClosed();
466 return this.messageListener;
467 }
468
469 /***
470 * Sets the session's distinguished message listener (optional).
471 * <P>
472 * When the distinguished message listener is set, no other form of message receipt in the session can be used;
473 * however, all forms of sending messages are still supported.
474 * <P>
475 * This is an expert facility not used by regular JMS clients.
476 *
477 * @param listener the message listener to associate with this session
478 * @throws JMSException if the JMS provider fails to set the message listener due to an internal error.
479 * @see javax.jms.Session#getMessageListener()
480 * @see javax.jms.ServerSessionPool
481 * @see javax.jms.ServerSession
482 */
483 public void setMessageListener(MessageListener listener) throws JMSException {
484 checkClosed();
485 this.messageListener = listener;
486 if (listener != null) {
487 messageExecutor.setDoDispatch(false);
488 }
489 }
490
491 /***
492 * Optional operation, intended to be used only by Application Servers, not by ordinary JMS clients.
493 *
494 * @see javax.jms.ServerSession
495 */
496 public void run() {
497 MessageListener listener = this.messageListener;
498 boolean doRemove = this.acknowledgeMode != Session.CLIENT_ACKNOWLEDGE;
499 ActiveMQMessage message;
500 while ((message = messageExecutor.dequeueNoWait()) != null) {
501 if (listener != null) {
502 try {
503 listener.onMessage(message);
504 this.messageDelivered(true, message, true);
505 }
506 catch (Throwable t) {
507 log.info("Caught :" + t, t);
508 this.messageDelivered(true, message, false);
509 }
510 }
511 else {
512 this.messageDelivered(true, message, false);
513 }
514 }
515 }
516
517 /***
518 * Creates a <CODE>MessageProducer</CODE> to send messages to the specified destination.
519 * <P>
520 * A client uses a <CODE>MessageProducer</CODE> object to send messages to a destination. Since <CODE>Queue
521 * </CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the
522 * destination parameter to create a <CODE>MessageProducer</CODE> object.
523 *
524 * @param destination the <CODE>Destination</CODE> to send to, or null if this is a producer which does not have a
525 * specified destination.
526 * @return the MessageProducer
527 * @throws JMSException if the session fails to create a MessageProducer due to some internal error.
528 * @throws InvalidDestinationException if an invalid destination is specified.
529 * @since 1.1
530 */
531 public MessageProducer createProducer(Destination destination) throws JMSException {
532 checkClosed();
533 return new ActiveMQMessageProducer(this, ActiveMQMessageTransformation.transformDestination(destination));
534 }
535
536 /***
537 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. Since <CODE>Queue</CODE> and <CODE>
538 * Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the destination parameter to
539 * create a <CODE>MessageConsumer</CODE>.
540 *
541 * @param destination the <CODE>Destination</CODE> to access.
542 * @return the MessageConsumer
543 * @throws JMSException if the session fails to create a consumer due to some internal error.
544 * @throws InvalidDestinationException if an invalid destination is specified.
545 * @since 1.1
546 */
547 public MessageConsumer createConsumer(Destination destination) throws JMSException {
548 checkClosed();
549 int prefetch = destination instanceof Topic ? connection.getPrefetchPolicy().getTopicPrefetch() : connection
550 .getPrefetchPolicy().getQueuePrefetch();
551 return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "",
552 "", this.connection.getNextConsumerNumber(), prefetch, false, false);
553 }
554
555 /***
556 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, using a message selector. Since <CODE>
557 * Queue</CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the
558 * destination parameter to create a <CODE>MessageConsumer</CODE>.
559 * <P>
560 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages that have been sent to a destination.
561 *
562 * @param destination the <CODE>Destination</CODE> to access
563 * @param messageSelector only messages with properties matching the message selector expression are delivered. A
564 * value of null or an empty string indicates that there is no message selector for the message consumer.
565 * @return the MessageConsumer
566 * @throws JMSException if the session fails to create a MessageConsumer due to some internal error.
567 * @throws InvalidDestinationException if an invalid destination is specified.
568 * @throws InvalidSelectorException if the message selector is invalid.
569 * @since 1.1
570 */
571 public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
572 checkClosed();
573 int prefetch = destination instanceof Topic ? connection.getPrefetchPolicy().getTopicPrefetch() : connection
574 .getPrefetchPolicy().getQueuePrefetch();
575 return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "",
576 messageSelector, this.connection.getNextConsumerNumber(), prefetch, false, false);
577 }
578
579 /***
580 * Creates <CODE>MessageConsumer</CODE> for the specified destination, using a message selector. This method can
581 * specify whether messages published by its own connection should be delivered to it, if the destination is a
582 * topic.
583 * <P>
584 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be
585 * used in the destination parameter to create a <CODE>MessageConsumer</CODE>.
586 * <P>
587 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages that have been published to a
588 * destination.
589 * <P>
590 * In some cases, a connection may both publish and subscribe to a topic. The consumer <CODE>NoLocal</CODE>
591 * attribute allows a consumer to inhibit the delivery of messages published by its own connection. The default
592 * value for this attribute is False. The <CODE>noLocal</CODE> value must be supported by destinations that are
593 * topics.
594 *
595 * @param destination the <CODE>Destination</CODE> to access
596 * @param messageSelector only messages with properties matching the message selector expression are delivered. A
597 * value of null or an empty string indicates that there is no message selector for the message consumer.
598 * @param NoLocal - if true, and the destination is a topic, inhibits the delivery of messages published by its own
599 * connection. The behavior for <CODE>NoLocal</CODE> is not specified if the destination is a queue.
600 * @return the MessageConsumer
601 * @throws JMSException if the session fails to create a MessageConsumer due to some internal error.
602 * @throws InvalidDestinationException if an invalid destination is specified.
603 * @throws InvalidSelectorException if the message selector is invalid.
604 * @since 1.1
605 */
606 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal)
607 throws JMSException {
608 checkClosed();
609 int prefetch = connection.getPrefetchPolicy().getTopicPrefetch();
610 return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "",
611 messageSelector, this.connection.getNextConsumerNumber(), prefetch, NoLocal, false);
612 }
613
614 /***
615 * Creates a queue identity given a <CODE>Queue</CODE> name.
616 * <P>
617 * This facility is provided for the rare cases where clients need to dynamically manipulate queue identity. It
618 * allows the creation of a queue identity with a provider-specific name. Clients that depend on this ability are
619 * not portable.
620 * <P>
621 * Note that this method is not for creating the physical queue. The physical creation of queues is an
622 * administrative task and is not to be initiated by the JMS API. The one exception is the creation of temporary
623 * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE> method.
624 *
625 * @param queueName the name of this <CODE>Queue</CODE>
626 * @return a <CODE>Queue</CODE> with the given name
627 * @throws JMSException if the session fails to create a queue due to some internal error.
628 * @since 1.1
629 */
630 public Queue createQueue(String queueName) throws JMSException {
631 checkClosed();
632 return new ActiveMQQueue(queueName);
633 }
634
635 /***
636 * Creates a topic identity given a <CODE>Topic</CODE> name.
637 * <P>
638 * This facility is provided for the rare cases where clients need to dynamically manipulate topic identity. This
639 * allows the creation of a topic identity with a provider-specific name. Clients that depend on this ability are
640 * not portable.
641 * <P>
642 * Note that this method is not for creating the physical topic. The physical creation of topics is an
643 * administrative task and is not to be initiated by the JMS API. The one exception is the creation of temporary
644 * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE> method.
645 *
646 * @param topicName the name of this <CODE>Topic</CODE>
647 * @return a <CODE>Topic</CODE> with the given name
648 * @throws JMSException if the session fails to create a topic due to some internal error.
649 * @since 1.1
650 */
651 public Topic createTopic(String topicName) throws JMSException {
652 checkClosed();
653 return new ActiveMQTopic(topicName);
654 }
655
656 /***
657 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue.
658 *
659 * @param queue the <CODE>queue</CODE> to access
660 * @exception InvalidDestinationException if an invalid destination is specified
661 * @since 1.1
662 */
663 /***
664 * Creates a durable subscriber to the specified topic.
665 * <P>
666 * If a client needs to receive all the messages published on a topic, including the ones published while the
667 * subscriber is inactive, it uses a durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a record of
668 * this durable subscription and insures that all messages from the topic's publishers are retained until they are
669 * acknowledged by this durable subscriber or they have expired.
670 * <P>
671 * Sessions with durable subscribers must always provide the same client identifier. In addition, each client must
672 * specify a name that uniquely identifies (within client identifier) each durable subscription it creates. Only one
673 * session at a time can have a <CODE>TopicSubscriber</CODE> for a particular durable subscription.
674 * <P>
675 * A client can change an existing durable subscription by creating a durable <CODE>TopicSubscriber</CODE> with
676 * the same name and a new topic and/or message selector. Changing a durable subscriber is equivalent to
677 * unsubscribing (deleting) the old one and creating a new one.
678 * <P>
679 * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
680 * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
681 * value for this attribute is false.
682 *
683 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
684 * @param name the name used to identify this subscription
685 * @return the TopicSubscriber
686 * @throws JMSException if the session fails to create a subscriber due to some internal error.
687 * @throws InvalidDestinationException if an invalid topic is specified.
688 * @since 1.1
689 */
690 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
691 checkClosed();
692 return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), name, "",
693 this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getDurableTopicPrefetch(),
694 false, false);
695 }
696
697 /***
698 * Creates a durable subscriber to the specified topic, using a message selector and specifying whether messages
699 * published by its own connection should be delivered to it.
700 * <P>
701 * If a client needs to receive all the messages published on a topic, including the ones published while the
702 * subscriber is inactive, it uses a durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a record of
703 * this durable subscription and insures that all messages from the topic's publishers are retained until they are
704 * acknowledged by this durable subscriber or they have expired.
705 * <P>
706 * Sessions with durable subscribers must always provide the same client identifier. In addition, each client must
707 * specify a name which uniquely identifies (within client identifier) each durable subscription it creates. Only
708 * one session at a time can have a <CODE>TopicSubscriber</CODE> for a particular durable subscription. An
709 * inactive durable subscriber is one that exists but does not currently have a message consumer associated with it.
710 * <P>
711 * A client can change an existing durable subscription by creating a durable <CODE>TopicSubscriber</CODE> with
712 * the same name and a new topic and/or message selector. Changing a durable subscriber is equivalent to
713 * unsubscribing (deleting) the old one and creating a new one.
714 *
715 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
716 * @param name the name used to identify this subscription
717 * @param messageSelector only messages with properties matching the message selector expression are delivered. A
718 * value of null or an empty string indicates that there is no message selector for the message consumer.
719 * @param noLocal if set, inhibits the delivery of messages published by its own connection
720 * @return the Queue Browser
721 * @throws JMSException if the session fails to create a subscriber due to some internal error.
722 * @throws InvalidDestinationException if an invalid topic is specified.
723 * @throws InvalidSelectorException if the message selector is invalid.
724 * @since 1.1
725 */
726 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
727 throws JMSException {
728 checkClosed();
729 return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), name,
730 messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy()
731 .getDurableTopicPrefetch(), noLocal, false);
732 }
733
734 /***
735 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue.
736 *
737 * @param queue the <CODE>queue</CODE> to access
738 * @return the Queue Browser
739 * @throws JMSException if the session fails to create a browser due to some internal error.
740 * @throws InvalidDestinationException if an invalid destination is specified
741 * @since 1.1
742 */
743 public QueueBrowser createBrowser(Queue queue) throws JMSException {
744 checkClosed();
745 return new ActiveMQQueueBrowser(this, ActiveMQMessageTransformation.transformDestination(queue), "",
746 this.connection.getNextConsumerNumber());
747 }
748
749 /***
750 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue using a message
751 * selector.
752 *
753 * @param queue the <CODE>queue</CODE> to access
754 * @param messageSelector only messages with properties matching the message selector expression are delivered. A
755 * value of null or an empty string indicates that there is no message selector for the message consumer.
756 * @return the Queue Browser
757 * @throws JMSException if the session fails to create a browser due to some internal error.
758 * @throws InvalidDestinationException if an invalid destination is specified
759 * @throws InvalidSelectorException if the message selector is invalid.
760 * @since 1.1
761 */
762 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
763 checkClosed();
764 return new ActiveMQQueueBrowser(this, ActiveMQMessageTransformation.transformDestination(queue),
765 messageSelector, this.connection.getNextConsumerNumber());
766 }
767
768 /***
769 * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that of the <CODE>Connection</CODE> unless
770 * it is deleted earlier.
771 *
772 * @return a temporary queue identity
773 * @throws JMSException if the session fails to create a temporary queue due to some internal error.
774 * @since 1.1
775 */
776 public TemporaryQueue createTemporaryQueue() throws JMSException {
777 checkClosed();
778 String tempQueueName = "TemporaryQueue-"
779 + ActiveMQDestination.createTemporaryName(this.connection.getInitializedClientID());
780 tempQueueName += this.temporaryDestinationGenerator.generateId();
781 return new ActiveMQTemporaryQueue(tempQueueName);
782 }
783
784 /***
785 * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that of the <CODE>Connection</CODE> unless
786 * it is deleted earlier.
787 *
788 * @return a temporary topic identity
789 * @throws JMSException if the session fails to create a temporary topic due to some internal error.
790 * @since 1.1
791 */
792 public TemporaryTopic createTemporaryTopic() throws JMSException {
793 checkClosed();
794 String tempTopicName = "TemporaryTopic-"
795 + ActiveMQDestination.createTemporaryName(this.connection.getInitializedClientID());
796 tempTopicName += this.temporaryDestinationGenerator.generateId();
797 return new ActiveMQTemporaryTopic(tempTopicName);
798 }
799
800 /***
801 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from the specified queue.
802 *
803 * @param queue the <CODE>Queue</CODE> to access
804 * @return @throws JMSException if the session fails to create a receiver due to some internal error.
805 * @throws JMSException
806 * @throws InvalidDestinationException if an invalid queue is specified.
807 */
808 public QueueReceiver createReceiver(Queue queue) throws JMSException {
809 checkClosed();
810 return new ActiveMQQueueReceiver(this, ActiveMQDestination.transformDestination(queue), "", this.connection
811 .getNextConsumerNumber(), this.connection.getPrefetchPolicy().getQueuePrefetch());
812 }
813
814 /***
815 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from the specified queue using a message
816 * selector.
817 *
818 * @param queue the <CODE>Queue</CODE> to access
819 * @param messageSelector only messages with properties matching the message selector expression are delivered. A
820 * value of null or an empty string indicates that there is no message selector for the message consumer.
821 * @return QueueReceiver
822 * @throws JMSException if the session fails to create a receiver due to some internal error.
823 * @throws InvalidDestinationException if an invalid queue is specified.
824 * @throws InvalidSelectorException if the message selector is invalid.
825 */
826 public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
827 checkClosed();
828 return new ActiveMQQueueReceiver(this, ActiveMQMessageTransformation.transformDestination(queue),
829 messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy()
830 .getQueuePrefetch());
831 }
832
833 /***
834 * Creates a <CODE>QueueSender</CODE> object to send messages to the specified queue.
835 *
836 * @param queue the <CODE>Queue</CODE> to access, or null if this is an unidentified producer
837 * @return QueueSender
838 * @throws JMSException if the session fails to create a sender due to some internal error.
839 * @throws InvalidDestinationException if an invalid queue is specified.
840 */
841 public QueueSender createSender(Queue queue) throws JMSException {
842 checkClosed();
843 return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue));
844 }
845
846 /***
847 * Creates a nondurable subscriber to the specified topic. <p/>
848 * <P>
849 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages that have been published to a topic.
850 * <p/>
851 * <P>
852 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They receive only messages that are published
853 * while they are active. <p/>
854 * <P>
855 * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
856 * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
857 * value for this attribute is false.
858 *
859 * @param topic the <CODE>Topic</CODE> to subscribe to
860 * @return TopicSubscriber
861 * @throws JMSException if the session fails to create a subscriber due to some internal error.
862 * @throws InvalidDestinationException if an invalid topic is specified.
863 */
864 public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
865 checkClosed();
866 return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), null, null,
867 this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getTopicPrefetch(), false,
868 false);
869 }
870
871 /***
872 * Creates a nondurable subscriber to the specified topic, using a message selector or specifying whether messages
873 * published by its own connection should be delivered to it. <p/>
874 * <P>
875 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages that have been published to a topic.
876 * <p/>
877 * <P>
878 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They receive only messages that are published
879 * while they are active. <p/>
880 * <P>
881 * Messages filtered out by a subscriber's message selector will never be delivered to the subscriber. From the
882 * subscriber's perspective, they do not exist. <p/>
883 * <P>
884 * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
885 * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
886 * value for this attribute is false.
887 *
888 * @param topic the <CODE>Topic</CODE> to subscribe to
889 * @param messageSelector only messages with properties matching the message selector expression are delivered. A
890 * value of null or an empty string indicates that there is no message selector for the message consumer.
891 * @param noLocal if set, inhibits the delivery of messages published by its own connection
892 * @return TopicSubscriber
893 * @throws JMSException if the session fails to create a subscriber due to some internal error.
894 * @throws InvalidDestinationException if an invalid topic is specified.
895 * @throws InvalidSelectorException if the message selector is invalid.
896 */
897 public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
898 checkClosed();
899 return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), null,
900 messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy()
901 .getTopicPrefetch(), noLocal, false);
902 }
903
904 /***
905 * Creates a publisher for the specified topic. <p/>
906 * <P>
907 * A client uses a <CODE>TopicPublisher</CODE> object to publish messages on a topic. Each time a client creates a
908 * <CODE>TopicPublisher</CODE> on a topic, it defines a new sequence of messages that have no ordering
909 * relationship with the messages it has previously sent.
910 *
911 * @param topic the <CODE>Topic</CODE> to publish to, or null if this is an unidentified producer
912 * @return TopicPublisher
913 * @throws JMSException if the session fails to create a publisher due to some internal error.
914 * @throws InvalidDestinationException if an invalid topic is specified.
915 */
916 public TopicPublisher createPublisher(Topic topic) throws JMSException {
917 checkClosed();
918 return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic));
919 }
920
921 /***
922 * Unsubscribes a durable subscription that has been created by a client.
923 * <P>
924 * This method deletes the state being maintained on behalf of the subscriber by its provider.
925 * <P>
926 * It is erroneous for a client to delete a durable subscription while there is an active <CODE>MessageConsumer
927 * </CODE> or <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed message is part of a pending
928 * transaction or has not been acknowledged in the session.
929 *
930 * @param name the name used to identify this subscription
931 * @throws JMSException if the session fails to unsubscribe to the durable subscription due to some internal error.
932 * @throws InvalidDestinationException if an invalid subscription name is specified.
933 * @since 1.1
934 */
935 public void unsubscribe(String name) throws JMSException {
936 checkClosed();
937 DurableUnsubscribe ds = new DurableUnsubscribe();
938 ds.setId(this.packetIdGenerator.generateId());
939 ds.setClientId(this.connection.getClientID());
940 ds.setSubscriberName(name);
941 this.connection.syncSendPacket(ds);
942 }
943
944 /***
945 * Tests to see if the Message Dispatcher is a target for this message
946 *
947 * @param message the message to test
948 * @return true if the Message Dispatcher can dispatch the message
949 */
950 public boolean isTarget(ActiveMQMessage message) {
951 for (Iterator i = this.consumers.iterator(); i.hasNext();) {
952 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
953 if (message.isConsumerTarget(consumer.getConsumerNumber())) {
954 return true;
955 }
956 }
957 return false;
958 }
959
960 /***
961 * Dispatch an ActiveMQMessage
962 *
963 * @param message
964 */
965 public void dispatch(ActiveMQMessage message) {
966 message.setMessageAcknowledge(this);
967 messageExecutor.execute(message);
968 }
969
970 /***
971 * Acknowledges all consumed messages of the session of this consumed message.
972 * <P>
973 * All consumed JMS messages support the <CODE>acknowledge</CODE> method for use when a client has specified that
974 * its JMS session's consumed messages are to be explicitly acknowledged. By invoking <CODE>acknowledge</CODE> on
975 * a consumed message, a client acknowledges all messages consumed by the session that the message was delivered to.
976 * <P>
977 * Calls to <CODE>acknowledge</CODE> are ignored for both transacted sessions and sessions specified to use
978 * implicit acknowledgement modes.
979 * <P>
980 * A client may individually acknowledge each message as it is consumed, or it may choose to acknowledge messages as
981 * an application-defined group (which is done by calling acknowledge on the last received message of the group,
982 * thereby acknowledging all messages consumed by the session.)
983 * <P>
984 * Messages that have been received but not acknowledged may be redelivered.
985 *
986 * @throws JMSException if the JMS provider fails to acknowledge the messages due to some internal error.
987 * @throws javax.jms.IllegalStateException
988 * if this method is called on a closed session.
989 * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
990 */
991 public void acknowledge() throws JMSException {
992 doAcknowledge(false);
993 }
994
995 protected void doAcknowledge(boolean isClosing) throws JMSException {
996 checkClosed();
997 if (this.acknowledgeMode == Session.CLIENT_ACKNOWLEDGE) {
998 ActiveMQMessage msg = null;
999 while ((msg = (ActiveMQMessage) deliveredMessages.removeFirst()) != null) {
1000 MessageAck ack = new MessageAck();
1001 ack.setConsumerId(msg.getConsumerId());
1002 ack.setMessageID(msg.getJMSMessageID());
1003 if (!isClosing){
1004 ack.setMessageRead(msg.isMessageConsumed());
1005 }
1006 ack.setId(packetIdGenerator.generateId());
1007 ack.setDestination(msg.getJMSActiveMQDestination());
1008 ack.setPersistent(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT);
1009 this.connection.asyncSendPacket(ack,false);
1010 }
1011 deliveredMessages.clear();
1012 }
1013 }
1014
1015 protected void messageDelivered(boolean sendAcknowledge, ActiveMQMessage message, boolean messageConsumed) {
1016 if (message != null && !closed.get()) {
1017 if (isClientAcknowledge() || (isTransacted() && message.isTransientConsumed())) {
1018 message.setMessageConsumed(messageConsumed);
1019 deliveredMessages.add(message);
1020 }
1021 if (sendAcknowledge) {
1022 try {
1023 doStartTransaction();
1024 MessageAck ack = new MessageAck();
1025 ack.setConsumerId(message.getConsumerId());
1026 ack.setTransactionId(this.currentTransactionId);
1027 ack.setMessageID(message.getJMSMessageID());
1028 ack.setMessageRead(messageConsumed);
1029 ack.setId(packetIdGenerator.generateId());
1030 ack.setXaTransacted(isXaTransacted());
1031 ack.setDestination(message.getJMSActiveMQDestination());
1032 ack.setPersistent(message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT);
1033 this.connection.asyncSendPacket(ack);
1034 }
1035 catch (JMSException e) {
1036 log.warn("failed to notify Broker that message is delivered", e);
1037 }
1038 }
1039 }
1040 }
1041
1042 /***
1043 * @param consumer
1044 * @throws JMSException
1045 */
1046 protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1047
1048 if (consumer.isDurableSubscriber()) {
1049 stats.onCreateDurableSubscriber();
1050 }
1051 consumer.setConsumerId(consumerIdGenerator.generateId());
1052 ConsumerInfo info = createConsumerInfo(consumer);
1053 info.setStarted(true);
1054 this.connection.syncSendPacket(info);
1055 this.consumers.add(consumer);
1056 }
1057
1058 /***
1059 * @param consumer
1060 * @throws JMSException
1061 */
1062 protected void removeConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1063 this.consumers.remove(consumer);
1064
1065 if (consumer.isDurableSubscriber()) {
1066 stats.onRemoveDurableSubscriber();
1067 }
1068 if (!closed.get()) {
1069 ConsumerInfo info = createConsumerInfo(consumer);
1070 info.setStarted(false);
1071 this.connection.asyncSendPacket(info, false);
1072 }
1073 }
1074
1075 protected ConsumerInfo createConsumerInfo(ActiveMQMessageConsumer consumer) throws JMSException {
1076 ConsumerInfo info = new ConsumerInfo();
1077 info.setConsumerId(consumer.consumerId);
1078 info.setClientId(connection.clientID);
1079 info.setSessionId(this.sessionId);
1080 info.setConsumerNo(consumer.consumerNumber);
1081 info.setPrefetchNumber(consumer.prefetchNumber);
1082 info.setDestination(consumer.destination);
1083 info.setId(this.packetIdGenerator.generateId());
1084 info.setNoLocal(consumer.noLocal);
1085 info.setBrowser(consumer.browser);
1086 info.setSelector(consumer.messageSelector);
1087 info.setStartTime(consumer.startTime);
1088 info.setConsumerName(consumer.consumerName);
1089 return info;
1090 }
1091
1092 /***
1093 * @param producer
1094 * @throws JMSException
1095 */
1096 protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
1097 producer.setProducerId(producerIdGenerator.generateId());
1098 ProducerInfo info = createProducerInfo(producer);
1099 info.setStarted(true);
1100 this.connection.syncSendPacket(info);
1101 this.producers.add(producer);
1102 }
1103
1104 /***
1105 * @param producer
1106 * @throws JMSException
1107 */
1108 protected void removeProducer(ActiveMQMessageProducer producer) throws JMSException {
1109 this.producers.remove(producer);
1110 if (!closed.get()) {
1111 ProducerInfo info = createProducerInfo(producer);
1112 info.setStarted(false);
1113 this.connection.asyncSendPacket(info, false);
1114 }
1115 }
1116
1117 protected ProducerInfo createProducerInfo(ActiveMQMessageProducer producer) throws JMSException {
1118 ProducerInfo info = new ProducerInfo();
1119 info.setProducerId(producer.getProducerId());
1120 info.setClientId(connection.clientID);
1121 info.setSessionId(this.sessionId);
1122 info.setDestination(producer.defaultDestination);
1123 info.setId(this.packetIdGenerator.generateId());
1124 info.setStartTime(producer.getStartTime());
1125 return info;
1126 }
1127
1128 /***
1129 * Start this Session
1130 */
1131 protected void start() {
1132 messageExecutor.start();
1133 }
1134
1135 /***
1136 * Stop this Session
1137 */
1138 protected void stop() {
1139 messageExecutor.stop();
1140 }
1141
1142 /***
1143 * @return Returns the sessionId.
1144 */
1145 protected String getSessionId() {
1146 return sessionId;
1147 }
1148
1149 /***
1150 * @param sessionId The sessionId to set.
1151 */
1152 protected void setSessionId(String sessionId) {
1153 this.sessionId = sessionId;
1154 }
1155
1156 /***
1157 * @return Returns the startTime.
1158 */
1159 protected long getStartTime() {
1160 return startTime;
1161 }
1162
1163 /***
1164 * @param startTime The startTime to set.
1165 */
1166 protected void setStartTime(long startTime) {
1167 this.startTime = startTime;
1168 }
1169
1170 /***
1171 * send the message for dispatch by the broker
1172 *
1173 * @param producer
1174 * @param destination
1175 * @param message
1176 * @param deliveryMode
1177 * @param priority
1178 * @param timeToLive
1179 * @throws JMSException
1180 */
1181 protected void send(ActiveMQMessageProducer producer, Destination destination, Message message, int deliveryMode,
1182 int priority, long timeToLive, boolean reuseMessageId) throws JMSException {
1183 checkClosed();
1184
1185 connection.sendConnectionInfoToBroker();
1186
1187 doStartTransaction();
1188 message.setJMSDestination(destination);
1189 message.setJMSDeliveryMode(deliveryMode);
1190 message.setJMSPriority(priority);
1191 long expiration = 0L;
1192 if (!producer.getDisableMessageTimestamp()) {
1193 long timeStamp = System.currentTimeMillis();
1194 message.setJMSTimestamp(timeStamp);
1195 if (timeToLive > 0) {
1196 expiration = timeToLive + timeStamp;
1197 }
1198 }
1199 message.setJMSExpiration(expiration);
1200 String id = message.getJMSMessageID();
1201 if ((id == null || id.length() == 0) || !producer.getDisableMessageID() && !reuseMessageId) {
1202 message.setJMSMessageID(producer.getIdGenerator().generateId());
1203 }
1204
1205 ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message);
1206 msg.prepareMessageBody();
1207 msg.setProducerID(producer.getProducerId());
1208 msg.setTransactionId(currentTransactionId);
1209 msg.setXaTransacted(isXaTransacted());
1210 msg.setJMSClientID(this.connection.clientID);
1211 msg.setJMSRedelivered(false);
1212 if (log.isDebugEnabled()) {
1213 log.debug("Sending message: " + msg);
1214 }
1215
1216 if (this.connection.isUseAsyncSend() || this.acknowledgeMode == Session.SESSION_TRANSACTED
1217 || deliveryMode == DeliveryMode.NON_PERSISTENT) {
1218 this.connection.asyncSendPacket(msg);
1219 }
1220 else {
1221 this.connection.syncSendPacket(msg);
1222 }
1223 }
1224
1225 /***
1226 * Send TransactionInfo to indicate transaction has started
1227 *
1228 * @throws JMSException if some internal error occurs
1229 */
1230 protected void doStartTransaction() throws JMSException {
1231 if (getTransacted()) {
1232 if (startTransaction.commit(false, true)) {
1233 TransactionInfo info = new TransactionInfo();
1234 info.setId(this.packetIdGenerator.generateId());
1235 info.setTransactionId(currentTransactionId);
1236 info.setType(TransactionInfo.START);
1237 this.connection.asyncSendPacket(info);
1238
1239 if (localTransactionEventListener != null) {
1240 localTransactionEventListener.beginEvent();
1241 }
1242 }
1243 }
1244 }
1245
1246 /***
1247 * @return Returns the localTransactionEventListener.
1248 */
1249 public LocalTransactionEventListener getLocalTransactionEventListener() {
1250 return localTransactionEventListener;
1251 }
1252
1253 /***
1254 * Used by the resource adapter to listen to transaction events.
1255 *
1256 * @param localTransactionEventListener The localTransactionEventListener to set.
1257 */
1258 public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) {
1259 this.localTransactionEventListener = localTransactionEventListener;
1260 }
1261
1262 protected boolean isXaTransacted() {
1263 return false;
1264 }
1265
1266
1267
1268 protected String getNextTransactionId() {
1269 return this.transactionIdGenerator.generateId();
1270 }
1271
1272 protected void setSessionConsumerDispatchState(int value) throws JMSException {
1273 if (consumerDispatchState != ActiveMQSession.CONSUMER_DISPATCH_UNSET && value != consumerDispatchState) {
1274 String errorStr = "Cannot mix consumer dispatching on a session - already: ";
1275 if (value == ActiveMQSession.CONSUMER_DISPATCH_SYNC) {
1276 errorStr += "synchronous";
1277 }
1278 else {
1279 errorStr += "asynchronous";
1280 }
1281 throw new IllegalStateException(errorStr);
1282 }
1283 consumerDispatchState = value;
1284 }
1285
1286 protected void redeliverUnacknowledgedMessages() {
1287 redeliverUnacknowledgedMessages(false);
1288 }
1289
1290 protected void redeliverUnacknowledgedMessages(boolean onlyDeliverTransientConsumed) {
1291 messageExecutor.stop();
1292 LinkedList replay = new LinkedList();
1293 Object obj = null;
1294 while ((obj = deliveredMessages.removeFirst()) != null) {
1295 replay.add(obj);
1296 }
1297 deliveredMessages.clear();
1298 if (!replay.isEmpty()) {
1299 for (ListIterator i = replay.listIterator(replay.size()); i.hasPrevious();) {
1300 ActiveMQMessage msg = (ActiveMQMessage) i.previous();
1301 if (!onlyDeliverTransientConsumed || msg.isTransientConsumed()) {
1302 msg.setJMSRedelivered(true);
1303 messageExecutor.executeFirst(msg);
1304 }
1305 }
1306 }
1307 replay.clear();
1308 messageExecutor.start();
1309 }
1310
1311 protected void clearMessagesInProgress() {
1312 messageExecutor.clearMessagesInProgress();
1313 for (Iterator i = consumers.iterator(); i.hasNext();) {
1314 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
1315 consumer.clearMessagesInProgress();
1316 }
1317 }
1318
1319 protected boolean isTransacted() {
1320 return this.acknowledgeMode == Session.SESSION_TRANSACTED;
1321 }
1322
1323 protected boolean isClientAcknowledge() {
1324 return this.acknowledgeMode == Session.CLIENT_ACKNOWLEDGE;
1325 }
1326 }