001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.io.File; 020import java.io.InputStream; 021import java.io.Serializable; 022import java.net.URL; 023import java.util.Collections; 024import java.util.Iterator; 025import java.util.List; 026import java.util.concurrent.CopyOnWriteArrayList; 027import java.util.concurrent.ThreadPoolExecutor; 028import java.util.concurrent.atomic.AtomicBoolean; 029import java.util.concurrent.atomic.AtomicInteger; 030 031import javax.jms.BytesMessage; 032import javax.jms.Destination; 033import javax.jms.IllegalStateException; 034import javax.jms.InvalidDestinationException; 035import javax.jms.InvalidSelectorException; 036import javax.jms.JMSException; 037import javax.jms.MapMessage; 038import javax.jms.Message; 039import javax.jms.MessageConsumer; 040import javax.jms.MessageListener; 041import javax.jms.MessageProducer; 042import javax.jms.ObjectMessage; 043import javax.jms.Queue; 044import javax.jms.QueueBrowser; 045import javax.jms.QueueReceiver; 046import javax.jms.QueueSender; 047import javax.jms.QueueSession; 048import javax.jms.Session; 049import javax.jms.StreamMessage; 050import javax.jms.TemporaryQueue; 051import javax.jms.TemporaryTopic; 052import javax.jms.TextMessage; 053import javax.jms.Topic; 054import javax.jms.TopicPublisher; 055import javax.jms.TopicSession; 056import javax.jms.TopicSubscriber; 057import javax.jms.TransactionRolledBackException; 058 059import org.apache.activemq.blob.BlobDownloader; 060import org.apache.activemq.blob.BlobTransferPolicy; 061import org.apache.activemq.blob.BlobUploader; 062import org.apache.activemq.command.ActiveMQBlobMessage; 063import org.apache.activemq.command.ActiveMQBytesMessage; 064import org.apache.activemq.command.ActiveMQDestination; 065import org.apache.activemq.command.ActiveMQMapMessage; 066import org.apache.activemq.command.ActiveMQMessage; 067import org.apache.activemq.command.ActiveMQObjectMessage; 068import org.apache.activemq.command.ActiveMQQueue; 069import org.apache.activemq.command.ActiveMQStreamMessage; 070import org.apache.activemq.command.ActiveMQTempDestination; 071import org.apache.activemq.command.ActiveMQTempQueue; 072import org.apache.activemq.command.ActiveMQTempTopic; 073import org.apache.activemq.command.ActiveMQTextMessage; 074import org.apache.activemq.command.ActiveMQTopic; 075import org.apache.activemq.command.Command; 076import org.apache.activemq.command.ConsumerId; 077import org.apache.activemq.command.MessageAck; 078import org.apache.activemq.command.MessageDispatch; 079import org.apache.activemq.command.MessageId; 080import org.apache.activemq.command.ProducerId; 081import org.apache.activemq.command.RemoveInfo; 082import org.apache.activemq.command.Response; 083import org.apache.activemq.command.SessionId; 084import org.apache.activemq.command.SessionInfo; 085import org.apache.activemq.command.TransactionId; 086import org.apache.activemq.management.JMSSessionStatsImpl; 087import org.apache.activemq.management.StatsCapable; 088import org.apache.activemq.management.StatsImpl; 089import org.apache.activemq.thread.Scheduler; 090import org.apache.activemq.transaction.Synchronization; 091import org.apache.activemq.usage.MemoryUsage; 092import org.apache.activemq.util.Callback; 093import org.apache.activemq.util.LongSequenceGenerator; 094import org.slf4j.Logger; 095import org.slf4j.LoggerFactory; 096 097/** 098 * <P> 099 * A <CODE>Session</CODE> object is a single-threaded context for producing 100 * and consuming messages. Although it may allocate provider resources outside 101 * the Java virtual machine (JVM), it is considered a lightweight JMS object. 102 * <P> 103 * A session serves several purposes: 104 * <UL> 105 * <LI>It is a factory for its message producers and consumers. 106 * <LI>It supplies provider-optimized message factories. 107 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and 108 * <CODE>TemporaryQueues</CODE>. 109 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE> 110 * objects for those clients that need to dynamically manipulate 111 * provider-specific destination names. 112 * <LI>It supports a single series of transactions that combine work spanning 113 * its producers and consumers into atomic units. 114 * <LI>It defines a serial order for the messages it consumes and the messages 115 * it produces. 116 * <LI>It retains messages it consumes until they have been acknowledged. 117 * <LI>It serializes execution of message listeners registered with its message 118 * consumers. 119 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>. 120 * </UL> 121 * <P> 122 * A session can create and service multiple message producers and consumers. 123 * <P> 124 * One typical use is to have a thread block on a synchronous 125 * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then 126 * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s. 127 * <P> 128 * If a client desires to have one thread produce messages while others consume 129 * them, the client should use a separate session for its producing thread. 130 * <P> 131 * Once a connection has been started, any session with one or more registered 132 * message listeners is dedicated to the thread of control that delivers 133 * messages to it. It is erroneous for client code to use this session or any of 134 * its constituent objects from another thread of control. The only exception to 135 * this rule is the use of the session or connection <CODE>close</CODE> 136 * method. 137 * <P> 138 * It should be easy for most clients to partition their work naturally into 139 * sessions. This model allows clients to start simply and incrementally add 140 * message processing complexity as their need for concurrency grows. 141 * <P> 142 * The <CODE>close</CODE> method is the only session method that can be called 143 * while some other session method is being executed in another thread. 144 * <P> 145 * A session may be specified as transacted. Each transacted session supports a 146 * single series of transactions. Each transaction groups a set of message sends 147 * and a set of message receives into an atomic unit of work. In effect, 148 * transactions organize a session's input message stream and output message 149 * stream into series of atomic units. When a transaction commits, its atomic 150 * unit of input is acknowledged and its associated atomic unit of output is 151 * sent. If a transaction rollback is done, the transaction's sent messages are 152 * destroyed and the session's input is automatically recovered. 153 * <P> 154 * The content of a transaction's input and output units is simply those 155 * messages that have been produced and consumed within the session's current 156 * transaction. 157 * <P> 158 * A transaction is completed using either its session's <CODE>commit</CODE> 159 * method or its session's <CODE>rollback </CODE> method. The completion of a 160 * session's current transaction automatically begins the next. The result is 161 * that a transacted session always has a current transaction within which its 162 * work is done. 163 * <P> 164 * The Java Transaction Service (JTS) or some other transaction monitor may be 165 * used to combine a session's transaction with transactions on other resources 166 * (databases, other JMS sessions, etc.). Since Java distributed transactions 167 * are controlled via the Java Transaction API (JTA), use of the session's 168 * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is 169 * prohibited. 170 * <P> 171 * The JMS API does not require support for JTA; however, it does define how a 172 * provider supplies this support. 173 * <P> 174 * Although it is also possible for a JMS client to handle distributed 175 * transactions directly, it is unlikely that many JMS clients will do this. 176 * Support for JTA in the JMS API is targeted at systems vendors who will be 177 * integrating the JMS API into their application server products. 178 * 179 * 180 * @see javax.jms.Session 181 * @see javax.jms.QueueSession 182 * @see javax.jms.TopicSession 183 * @see javax.jms.XASession 184 */ 185public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher { 186 187 /** 188 * Only acknowledge an individual message - using message.acknowledge() 189 * as opposed to CLIENT_ACKNOWLEDGE which 190 * acknowledges all messages consumed by a session at when acknowledge() 191 * is called 192 */ 193 public static final int INDIVIDUAL_ACKNOWLEDGE = 4; 194 public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE; 195 196 public static interface DeliveryListener { 197 void beforeDelivery(ActiveMQSession session, Message msg); 198 199 void afterDelivery(ActiveMQSession session, Message msg); 200 } 201 202 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class); 203 private final ThreadPoolExecutor connectionExecutor; 204 205 protected int acknowledgementMode; 206 protected final ActiveMQConnection connection; 207 protected final SessionInfo info; 208 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 209 protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator(); 210 protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator(); 211 protected final ActiveMQSessionExecutor executor; 212 protected final AtomicBoolean started = new AtomicBoolean(false); 213 214 protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>(); 215 protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>(); 216 217 protected boolean closed; 218 private volatile boolean synchronizationRegistered; 219 protected boolean asyncDispatch; 220 protected boolean sessionAsyncDispatch; 221 protected final boolean debug; 222 protected Object sendMutex = new Object(); 223 private final AtomicBoolean clearInProgress = new AtomicBoolean(); 224 225 private MessageListener messageListener; 226 private final JMSSessionStatsImpl stats; 227 private TransactionContext transactionContext; 228 private DeliveryListener deliveryListener; 229 private MessageTransformer transformer; 230 private BlobTransferPolicy blobTransferPolicy; 231 private long lastDeliveredSequenceId; 232 233 /** 234 * Construct the Session 235 * 236 * @param connection 237 * @param sessionId 238 * @param acknowledgeMode n.b if transacted - the acknowledgeMode == 239 * Session.SESSION_TRANSACTED 240 * @param asyncDispatch 241 * @param sessionAsyncDispatch 242 * @throws JMSException on internal error 243 */ 244 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException { 245 this.debug = LOG.isDebugEnabled(); 246 this.connection = connection; 247 this.acknowledgementMode = acknowledgeMode; 248 this.asyncDispatch = asyncDispatch; 249 this.sessionAsyncDispatch = sessionAsyncDispatch; 250 this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue()); 251 setTransactionContext(new TransactionContext(connection)); 252 stats = new JMSSessionStatsImpl(producers, consumers); 253 this.connection.asyncSendPacket(info); 254 setTransformer(connection.getTransformer()); 255 setBlobTransferPolicy(connection.getBlobTransferPolicy()); 256 this.connectionExecutor=connection.getExecutor(); 257 this.executor = new ActiveMQSessionExecutor(this); 258 connection.addSession(this); 259 if (connection.isStarted()) { 260 start(); 261 } 262 263 } 264 265 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException { 266 this(connection, sessionId, acknowledgeMode, asyncDispatch, true); 267 } 268 269 /** 270 * Sets the transaction context of the session. 271 * 272 * @param transactionContext - provides the means to control a JMS 273 * transaction. 274 */ 275 public void setTransactionContext(TransactionContext transactionContext) { 276 this.transactionContext = transactionContext; 277 } 278 279 /** 280 * Returns the transaction context of the session. 281 * 282 * @return transactionContext - session's transaction context. 283 */ 284 public TransactionContext getTransactionContext() { 285 return transactionContext; 286 } 287 288 /* 289 * (non-Javadoc) 290 * 291 * @see org.apache.activemq.management.StatsCapable#getStats() 292 */ 293 @Override 294 public StatsImpl getStats() { 295 return stats; 296 } 297 298 /** 299 * Returns the session's statistics. 300 * 301 * @return stats - session's statistics. 302 */ 303 public JMSSessionStatsImpl getSessionStats() { 304 return stats; 305 } 306 307 /** 308 * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE> 309 * object is used to send a message containing a stream of uninterpreted 310 * bytes. 311 * 312 * @return the an ActiveMQBytesMessage 313 * @throws JMSException if the JMS provider fails to create this message due 314 * to some internal error. 315 */ 316 @Override 317 public BytesMessage createBytesMessage() throws JMSException { 318 ActiveMQBytesMessage message = new ActiveMQBytesMessage(); 319 configureMessage(message); 320 return message; 321 } 322 323 /** 324 * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE> 325 * object is used to send a self-defining set of name-value pairs, where 326 * names are <CODE>String</CODE> objects and values are primitive values 327 * in the Java programming language. 328 * 329 * @return an ActiveMQMapMessage 330 * @throws JMSException if the JMS provider fails to create this message due 331 * to some internal error. 332 */ 333 @Override 334 public MapMessage createMapMessage() throws JMSException { 335 ActiveMQMapMessage message = new ActiveMQMapMessage(); 336 configureMessage(message); 337 return message; 338 } 339 340 /** 341 * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE> 342 * interface is the root interface of all JMS messages. A 343 * <CODE>Message</CODE> object holds all the standard message header 344 * information. It can be sent when a message containing only header 345 * information is sufficient. 346 * 347 * @return an ActiveMQMessage 348 * @throws JMSException if the JMS provider fails to create this message due 349 * to some internal error. 350 */ 351 @Override 352 public Message createMessage() throws JMSException { 353 ActiveMQMessage message = new ActiveMQMessage(); 354 configureMessage(message); 355 return message; 356 } 357 358 /** 359 * Creates an <CODE>ObjectMessage</CODE> object. An 360 * <CODE>ObjectMessage</CODE> object is used to send a message that 361 * contains a serializable Java object. 362 * 363 * @return an ActiveMQObjectMessage 364 * @throws JMSException if the JMS provider fails to create this message due 365 * to some internal error. 366 */ 367 @Override 368 public ObjectMessage createObjectMessage() throws JMSException { 369 ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 370 configureMessage(message); 371 return message; 372 } 373 374 /** 375 * Creates an initialized <CODE>ObjectMessage</CODE> object. An 376 * <CODE>ObjectMessage</CODE> object is used to send a message that 377 * contains a serializable Java object. 378 * 379 * @param object the object to use to initialize this message 380 * @return an ActiveMQObjectMessage 381 * @throws JMSException if the JMS provider fails to create this message due 382 * to some internal error. 383 */ 384 @Override 385 public ObjectMessage createObjectMessage(Serializable object) throws JMSException { 386 ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 387 configureMessage(message); 388 message.setObject(object); 389 return message; 390 } 391 392 /** 393 * Creates a <CODE>StreamMessage</CODE> object. A 394 * <CODE>StreamMessage</CODE> object is used to send a self-defining 395 * stream of primitive values in the Java programming language. 396 * 397 * @return an ActiveMQStreamMessage 398 * @throws JMSException if the JMS provider fails to create this message due 399 * to some internal error. 400 */ 401 @Override 402 public StreamMessage createStreamMessage() throws JMSException { 403 ActiveMQStreamMessage message = new ActiveMQStreamMessage(); 404 configureMessage(message); 405 return message; 406 } 407 408 /** 409 * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> 410 * object is used to send a message containing a <CODE>String</CODE> 411 * object. 412 * 413 * @return an ActiveMQTextMessage 414 * @throws JMSException if the JMS provider fails to create this message due 415 * to some internal error. 416 */ 417 @Override 418 public TextMessage createTextMessage() throws JMSException { 419 ActiveMQTextMessage message = new ActiveMQTextMessage(); 420 configureMessage(message); 421 return message; 422 } 423 424 /** 425 * Creates an initialized <CODE>TextMessage</CODE> object. A 426 * <CODE>TextMessage</CODE> object is used to send a message containing a 427 * <CODE>String</CODE>. 428 * 429 * @param text the string used to initialize this message 430 * @return an ActiveMQTextMessage 431 * @throws JMSException if the JMS provider fails to create this message due 432 * to some internal error. 433 */ 434 @Override 435 public TextMessage createTextMessage(String text) throws JMSException { 436 ActiveMQTextMessage message = new ActiveMQTextMessage(); 437 message.setText(text); 438 configureMessage(message); 439 return message; 440 } 441 442 /** 443 * Creates an initialized <CODE>BlobMessage</CODE> object. A 444 * <CODE>BlobMessage</CODE> object is used to send a message containing a 445 * <CODE>URL</CODE> which points to some network addressible BLOB. 446 * 447 * @param url the network addressable URL used to pass directly to the 448 * consumer 449 * @return a BlobMessage 450 * @throws JMSException if the JMS provider fails to create this message due 451 * to some internal error. 452 */ 453 public BlobMessage createBlobMessage(URL url) throws JMSException { 454 return createBlobMessage(url, false); 455 } 456 457 /** 458 * Creates an initialized <CODE>BlobMessage</CODE> object. A 459 * <CODE>BlobMessage</CODE> object is used to send a message containing a 460 * <CODE>URL</CODE> which points to some network addressible BLOB. 461 * 462 * @param url the network addressable URL used to pass directly to the 463 * consumer 464 * @param deletedByBroker indicates whether or not the resource is deleted 465 * by the broker when the message is acknowledged 466 * @return a BlobMessage 467 * @throws JMSException if the JMS provider fails to create this message due 468 * to some internal error. 469 */ 470 public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException { 471 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 472 configureMessage(message); 473 message.setURL(url); 474 message.setDeletedByBroker(deletedByBroker); 475 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy())); 476 return message; 477 } 478 479 /** 480 * Creates an initialized <CODE>BlobMessage</CODE> object. A 481 * <CODE>BlobMessage</CODE> object is used to send a message containing 482 * the <CODE>File</CODE> content. Before the message is sent the file 483 * conent will be uploaded to the broker or some other remote repository 484 * depending on the {@link #getBlobTransferPolicy()}. 485 * 486 * @param file the file to be uploaded to some remote repo (or the broker) 487 * depending on the strategy 488 * @return a BlobMessage 489 * @throws JMSException if the JMS provider fails to create this message due 490 * to some internal error. 491 */ 492 public BlobMessage createBlobMessage(File file) throws JMSException { 493 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 494 configureMessage(message); 495 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file)); 496 message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy()))); 497 message.setDeletedByBroker(true); 498 message.setName(file.getName()); 499 return message; 500 } 501 502 /** 503 * Creates an initialized <CODE>BlobMessage</CODE> object. A 504 * <CODE>BlobMessage</CODE> object is used to send a message containing 505 * the <CODE>File</CODE> content. Before the message is sent the file 506 * conent will be uploaded to the broker or some other remote repository 507 * depending on the {@link #getBlobTransferPolicy()}. 508 * 509 * @param in the stream to be uploaded to some remote repo (or the broker) 510 * depending on the strategy 511 * @return a BlobMessage 512 * @throws JMSException if the JMS provider fails to create this message due 513 * to some internal error. 514 */ 515 public BlobMessage createBlobMessage(InputStream in) throws JMSException { 516 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 517 configureMessage(message); 518 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in)); 519 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy())); 520 message.setDeletedByBroker(true); 521 return message; 522 } 523 524 /** 525 * Indicates whether the session is in transacted mode. 526 * 527 * @return true if the session is in transacted mode 528 * @throws JMSException if there is some internal error. 529 */ 530 @Override 531 public boolean getTransacted() throws JMSException { 532 checkClosed(); 533 return isTransacted(); 534 } 535 536 /** 537 * Returns the acknowledgement mode of the session. The acknowledgement mode 538 * is set at the time that the session is created. If the session is 539 * transacted, the acknowledgement mode is ignored. 540 * 541 * @return If the session is not transacted, returns the current 542 * acknowledgement mode for the session. If the session is 543 * transacted, returns SESSION_TRANSACTED. 544 * @throws JMSException 545 * @see javax.jms.Connection#createSession(boolean,int) 546 * @since 1.1 exception JMSException if there is some internal error. 547 */ 548 @Override 549 public int getAcknowledgeMode() throws JMSException { 550 checkClosed(); 551 return this.acknowledgementMode; 552 } 553 554 /** 555 * Commits all messages done in this transaction and releases any locks 556 * currently held. 557 * 558 * @throws JMSException if the JMS provider fails to commit the transaction 559 * due to some internal error. 560 * @throws TransactionRolledBackException if the transaction is rolled back 561 * due to some internal error during commit. 562 * @throws javax.jms.IllegalStateException if the method is not called by a 563 * transacted session. 564 */ 565 @Override 566 public void commit() throws JMSException { 567 checkClosed(); 568 if (!getTransacted()) { 569 throw new javax.jms.IllegalStateException("Not a transacted session"); 570 } 571 if (LOG.isDebugEnabled()) { 572 LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId()); 573 } 574 transactionContext.commit(); 575 } 576 577 /** 578 * Rolls back any messages done in this transaction and releases any locks 579 * currently held. 580 * 581 * @throws JMSException if the JMS provider fails to roll back the 582 * transaction due to some internal error. 583 * @throws javax.jms.IllegalStateException if the method is not called by a 584 * transacted session. 585 */ 586 @Override 587 public void rollback() throws JMSException { 588 checkClosed(); 589 if (!getTransacted()) { 590 throw new javax.jms.IllegalStateException("Not a transacted session"); 591 } 592 if (LOG.isDebugEnabled()) { 593 LOG.debug(getSessionId() + " Transaction Rollback, txid:" + transactionContext.getTransactionId()); 594 } 595 transactionContext.rollback(); 596 } 597 598 /** 599 * Closes the session. 600 * <P> 601 * Since a provider may allocate some resources on behalf of a session 602 * outside the JVM, clients should close the resources when they are not 603 * needed. Relying on garbage collection to eventually reclaim these 604 * resources may not be timely enough. 605 * <P> 606 * There is no need to close the producers and consumers of a closed 607 * session. 608 * <P> 609 * This call will block until a <CODE>receive</CODE> call or message 610 * listener in progress has completed. A blocked message consumer 611 * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session 612 * is closed. 613 * <P> 614 * Closing a transacted session must roll back the transaction in progress. 615 * <P> 616 * This method is the only <CODE>Session</CODE> method that can be called 617 * concurrently. 618 * <P> 619 * Invoking any other <CODE>Session</CODE> method on a closed session must 620 * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a 621 * closed session must <I>not </I> throw an exception. 622 * 623 * @throws JMSException if the JMS provider fails to close the session due 624 * to some internal error. 625 */ 626 @Override 627 public void close() throws JMSException { 628 if (!closed) { 629 if (getTransactionContext().isInXATransaction()) { 630 if (!synchronizationRegistered) { 631 synchronizationRegistered = true; 632 getTransactionContext().addSynchronization(new Synchronization() { 633 634 @Override 635 public void afterCommit() throws Exception { 636 doClose(); 637 synchronizationRegistered = false; 638 } 639 640 @Override 641 public void afterRollback() throws Exception { 642 doClose(); 643 synchronizationRegistered = false; 644 } 645 }); 646 } 647 648 } else { 649 doClose(); 650 } 651 } 652 } 653 654 private void doClose() throws JMSException { 655 boolean interrupted = Thread.interrupted(); 656 dispose(); 657 RemoveInfo removeCommand = info.createRemoveCommand(); 658 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 659 connection.asyncSendPacket(removeCommand); 660 if (interrupted) { 661 Thread.currentThread().interrupt(); 662 } 663 } 664 665 final AtomicInteger clearRequestsCounter = new AtomicInteger(0); 666 void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) { 667 clearRequestsCounter.incrementAndGet(); 668 executor.clearMessagesInProgress(); 669 // we are called from inside the transport reconnection logic which involves us 670 // clearing all the connections' consumers dispatch and delivered lists. So rather 671 // than trying to grab a mutex (which could be already owned by the message listener 672 // calling the send or an ack) we allow it to complete in a separate thread via the 673 // scheduler and notify us via connection.transportInterruptionProcessingComplete() 674 // 675 // We must be careful though not to allow multiple calls to this method from a 676 // connection that is having issue becoming fully established from causing a large 677 // build up of scheduled tasks to clear the same consumers over and over. 678 if (consumers.isEmpty()) { 679 return; 680 } 681 682 if (clearInProgress.compareAndSet(false, true)) { 683 for (final ActiveMQMessageConsumer consumer : consumers) { 684 consumer.inProgressClearRequired(); 685 transportInterruptionProcessingComplete.incrementAndGet(); 686 try { 687 connection.getScheduler().executeAfterDelay(new Runnable() { 688 @Override 689 public void run() { 690 consumer.clearMessagesInProgress(); 691 }}, 0l); 692 } catch (JMSException e) { 693 connection.onClientInternalException(e); 694 } 695 } 696 697 try { 698 connection.getScheduler().executeAfterDelay(new Runnable() { 699 @Override 700 public void run() { 701 clearInProgress.set(false); 702 }}, 0l); 703 } catch (JMSException e) { 704 connection.onClientInternalException(e); 705 } 706 } 707 } 708 709 void deliverAcks() { 710 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 711 ActiveMQMessageConsumer consumer = iter.next(); 712 consumer.deliverAcks(); 713 } 714 } 715 716 public synchronized void dispose() throws JMSException { 717 if (!closed) { 718 719 try { 720 executor.stop(); 721 722 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 723 ActiveMQMessageConsumer consumer = iter.next(); 724 consumer.setFailureError(connection.getFirstFailureError()); 725 consumer.dispose(); 726 lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId()); 727 } 728 consumers.clear(); 729 730 for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) { 731 ActiveMQMessageProducer producer = iter.next(); 732 producer.dispose(); 733 } 734 producers.clear(); 735 736 try { 737 if (getTransactionContext().isInLocalTransaction()) { 738 rollback(); 739 } 740 } catch (JMSException e) { 741 } 742 743 } finally { 744 connection.removeSession(this); 745 this.transactionContext = null; 746 closed = true; 747 } 748 } 749 } 750 751 /** 752 * Checks that the session is not closed then configures the message 753 */ 754 protected void configureMessage(ActiveMQMessage message) throws IllegalStateException { 755 checkClosed(); 756 message.setConnection(connection); 757 } 758 759 /** 760 * Check if the session is closed. It is used for ensuring that the session 761 * is open before performing various operations. 762 * 763 * @throws IllegalStateException if the Session is closed 764 */ 765 protected void checkClosed() throws IllegalStateException { 766 if (closed) { 767 throw new IllegalStateException("The Session is closed"); 768 } 769 } 770 771 /** 772 * Checks if the session is closed. 773 * 774 * @return true if the session is closed, false otherwise. 775 */ 776 public boolean isClosed() { 777 return closed; 778 } 779 780 /** 781 * Stops message delivery in this session, and restarts message delivery 782 * with the oldest unacknowledged message. 783 * <P> 784 * All consumers deliver messages in a serial order. Acknowledging a 785 * received message automatically acknowledges all messages that have been 786 * delivered to the client. 787 * <P> 788 * Restarting a session causes it to take the following actions: 789 * <UL> 790 * <LI>Stop message delivery 791 * <LI>Mark all messages that might have been delivered but not 792 * acknowledged as "redelivered" 793 * <LI>Restart the delivery sequence including all unacknowledged messages 794 * that had been previously delivered. Redelivered messages do not have to 795 * be delivered in exactly their original delivery order. 796 * </UL> 797 * 798 * @throws JMSException if the JMS provider fails to stop and restart 799 * message delivery due to some internal error. 800 * @throws IllegalStateException if the method is called by a transacted 801 * session. 802 */ 803 @Override 804 public void recover() throws JMSException { 805 806 checkClosed(); 807 if (getTransacted()) { 808 throw new IllegalStateException("This session is transacted"); 809 } 810 811 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 812 ActiveMQMessageConsumer c = iter.next(); 813 c.rollback(); 814 } 815 816 } 817 818 /** 819 * Returns the session's distinguished message listener (optional). 820 * 821 * @return the message listener associated with this session 822 * @throws JMSException if the JMS provider fails to get the message 823 * listener due to an internal error. 824 * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener) 825 * @see javax.jms.ServerSessionPool 826 * @see javax.jms.ServerSession 827 */ 828 @Override 829 public MessageListener getMessageListener() throws JMSException { 830 checkClosed(); 831 return this.messageListener; 832 } 833 834 /** 835 * Sets the session's distinguished message listener (optional). 836 * <P> 837 * When the distinguished message listener is set, no other form of message 838 * receipt in the session can be used; however, all forms of sending 839 * messages are still supported. 840 * <P> 841 * If this session has been closed, then an {@link IllegalStateException} is 842 * thrown, if trying to set a new listener. However setting the listener 843 * to <tt>null</tt> is allowed, to clear the listener, even if this session 844 * has been closed prior. 845 * <P> 846 * This is an expert facility not used by regular JMS clients. 847 * 848 * @param listener the message listener to associate with this session 849 * @throws JMSException if the JMS provider fails to set the message 850 * listener due to an internal error. 851 * @see javax.jms.Session#getMessageListener() 852 * @see javax.jms.ServerSessionPool 853 * @see javax.jms.ServerSession 854 */ 855 @Override 856 public void setMessageListener(MessageListener listener) throws JMSException { 857 // only check for closed if we set a new listener, as we allow to clear 858 // the listener, such as when an application is shutting down, and is 859 // no longer using a message listener on this session 860 if (listener != null) { 861 checkClosed(); 862 } 863 this.messageListener = listener; 864 865 if (listener != null) { 866 executor.setDispatchedBySessionPool(true); 867 } 868 } 869 870 /** 871 * Optional operation, intended to be used only by Application Servers, not 872 * by ordinary JMS clients. 873 * 874 * @see javax.jms.ServerSession 875 */ 876 @Override 877 public void run() { 878 MessageDispatch messageDispatch; 879 while ((messageDispatch = executor.dequeueNoWait()) != null) { 880 final MessageDispatch md = messageDispatch; 881 ActiveMQMessage message = (ActiveMQMessage)md.getMessage(); 882 883 MessageAck earlyAck = null; 884 if (message.isExpired()) { 885 earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1); 886 } else if (connection.isDuplicate(ActiveMQSession.this, message)) { 887 LOG.debug("{} got duplicate: {}", this, message.getMessageId()); 888 earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); 889 earlyAck.setFirstMessageId(md.getMessage().getMessageId()); 890 earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this)); 891 } 892 if (earlyAck != null) { 893 try { 894 asyncSendPacket(earlyAck); 895 } catch (Throwable t) { 896 LOG.error("error dispatching ack: {} ", earlyAck, t); 897 connection.onClientInternalException(t); 898 } finally { 899 continue; 900 } 901 } 902 903 if (isClientAcknowledge()||isIndividualAcknowledge()) { 904 message.setAcknowledgeCallback(new Callback() { 905 @Override 906 public void execute() throws Exception { 907 } 908 }); 909 } 910 911 if (deliveryListener != null) { 912 deliveryListener.beforeDelivery(this, message); 913 } 914 915 md.setDeliverySequenceId(getNextDeliveryId()); 916 917 final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); 918 try { 919 ack.setFirstMessageId(md.getMessage().getMessageId()); 920 doStartTransaction(); 921 ack.setTransactionId(getTransactionContext().getTransactionId()); 922 if (ack.getTransactionId() != null) { 923 getTransactionContext().addSynchronization(new Synchronization() { 924 925 final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get()); 926 @Override 927 public void beforeEnd() throws Exception { 928 // validate our consumer so we don't push stale acks that get ignored 929 if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) { 930 LOG.debug("forcing rollback - {} consumer no longer active on {}", ack, connection); 931 throw new TransactionRolledBackException("consumer " + ack.getConsumerId() + " no longer active on " + connection); 932 } 933 LOG.trace("beforeEnd ack {}", ack); 934 sendAck(ack); 935 } 936 937 @Override 938 public void afterRollback() throws Exception { 939 LOG.trace("rollback {}", ack, new Throwable("here")); 940 md.getMessage().onMessageRolledBack(); 941 // ensure we don't filter this as a duplicate 942 connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage()); 943 944 // don't redeliver if we have been interrupted b/c the broker will redeliver on reconnect 945 if (clearRequestsCounter.get() > clearRequestCount) { 946 LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport()); 947 return; 948 } 949 950 // validate our consumer so we don't push stale acks that get ignored or redeliver what will be redispatched 951 if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) { 952 LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport()); 953 return; 954 } 955 956 RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy(); 957 int redeliveryCounter = md.getMessage().getRedeliveryCounter(); 958 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES 959 && redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) { 960 // We need to NACK the messages so that they get 961 // sent to the 962 // DLQ. 963 // Acknowledge the last message. 964 MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); 965 ack.setFirstMessageId(md.getMessage().getMessageId()); 966 ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy)); 967 asyncSendPacket(ack); 968 969 } else { 970 971 MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1); 972 ack.setFirstMessageId(md.getMessage().getMessageId()); 973 asyncSendPacket(ack); 974 975 // Figure out how long we should wait to resend 976 // this message. 977 long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay(); 978 for (int i = 0; i < redeliveryCounter; i++) { 979 redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay); 980 } 981 connection.getScheduler().executeAfterDelay(new Runnable() { 982 983 @Override 984 public void run() { 985 ((ActiveMQDispatcher)md.getConsumer()).dispatch(md); 986 } 987 }, redeliveryDelay); 988 } 989 } 990 }); 991 } 992 993 LOG.trace("{} onMessage({})", this, message.getMessageId()); 994 messageListener.onMessage(message); 995 996 } catch (Throwable e) { 997 LOG.error("error dispatching message: ", e); 998 // A problem while invoking the MessageListener does not 999 // in general indicate a problem with the connection to the broker, i.e. 1000 // it will usually be sufficient to let the afterDelivery() method either 1001 // commit or roll back in order to deal with the exception. 1002 // However, we notify any registered client internal exception listener 1003 // of the problem. 1004 connection.onClientInternalException(e); 1005 } finally { 1006 if (ack.getTransactionId() == null) { 1007 try { 1008 asyncSendPacket(ack); 1009 } catch (Throwable e) { 1010 connection.onClientInternalException(e); 1011 } 1012 } 1013 } 1014 1015 if (deliveryListener != null) { 1016 deliveryListener.afterDelivery(this, message); 1017 } 1018 } 1019 } 1020 1021 /** 1022 * Creates a <CODE>MessageProducer</CODE> to send messages to the 1023 * specified destination. 1024 * <P> 1025 * A client uses a <CODE>MessageProducer</CODE> object to send messages to 1026 * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both 1027 * inherit from <CODE>Destination</CODE>, they can be used in the 1028 * destination parameter to create a <CODE>MessageProducer</CODE> object. 1029 * 1030 * @param destination the <CODE>Destination</CODE> to send to, or null if 1031 * this is a producer which does not have a specified 1032 * destination. 1033 * @return the MessageProducer 1034 * @throws JMSException if the session fails to create a MessageProducer due 1035 * to some internal error. 1036 * @throws InvalidDestinationException if an invalid destination is 1037 * specified. 1038 * @since 1.1 1039 */ 1040 @Override 1041 public MessageProducer createProducer(Destination destination) throws JMSException { 1042 checkClosed(); 1043 if (destination instanceof CustomDestination) { 1044 CustomDestination customDestination = (CustomDestination)destination; 1045 return customDestination.createProducer(this); 1046 } 1047 int timeSendOut = connection.getSendTimeout(); 1048 return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut); 1049 } 1050 1051 /** 1052 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. 1053 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from 1054 * <CODE>Destination</CODE>, they can be used in the destination 1055 * parameter to create a <CODE>MessageConsumer</CODE>. 1056 * 1057 * @param destination the <CODE>Destination</CODE> to access. 1058 * @return the MessageConsumer 1059 * @throws JMSException if the session fails to create a consumer due to 1060 * some internal error. 1061 * @throws InvalidDestinationException if an invalid destination is 1062 * specified. 1063 * @since 1.1 1064 */ 1065 @Override 1066 public MessageConsumer createConsumer(Destination destination) throws JMSException { 1067 return createConsumer(destination, (String) null); 1068 } 1069 1070 /** 1071 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, 1072 * using a message selector. Since <CODE> Queue</CODE> and 1073 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they 1074 * can be used in the destination parameter to create a 1075 * <CODE>MessageConsumer</CODE>. 1076 * <P> 1077 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1078 * that have been sent to a destination. 1079 * 1080 * @param destination the <CODE>Destination</CODE> to access 1081 * @param messageSelector only messages with properties matching the message 1082 * selector expression are delivered. A value of null or an 1083 * empty string indicates that there is no message selector 1084 * for the message consumer. 1085 * @return the MessageConsumer 1086 * @throws JMSException if the session fails to create a MessageConsumer due 1087 * to some internal error. 1088 * @throws InvalidDestinationException if an invalid destination is 1089 * specified. 1090 * @throws InvalidSelectorException if the message selector is invalid. 1091 * @since 1.1 1092 */ 1093 @Override 1094 public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { 1095 return createConsumer(destination, messageSelector, false); 1096 } 1097 1098 /** 1099 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. 1100 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from 1101 * <CODE>Destination</CODE>, they can be used in the destination 1102 * parameter to create a <CODE>MessageConsumer</CODE>. 1103 * 1104 * @param destination the <CODE>Destination</CODE> to access. 1105 * @param messageListener the listener to use for async consumption of messages 1106 * @return the MessageConsumer 1107 * @throws JMSException if the session fails to create a consumer due to 1108 * some internal error. 1109 * @throws InvalidDestinationException if an invalid destination is 1110 * specified. 1111 * @since 1.1 1112 */ 1113 public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException { 1114 return createConsumer(destination, null, messageListener); 1115 } 1116 1117 /** 1118 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, 1119 * using a message selector. Since <CODE> Queue</CODE> and 1120 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they 1121 * can be used in the destination parameter to create a 1122 * <CODE>MessageConsumer</CODE>. 1123 * <P> 1124 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1125 * that have been sent to a destination. 1126 * 1127 * @param destination the <CODE>Destination</CODE> to access 1128 * @param messageSelector only messages with properties matching the message 1129 * selector expression are delivered. A value of null or an 1130 * empty string indicates that there is no message selector 1131 * for the message consumer. 1132 * @param messageListener the listener to use for async consumption of messages 1133 * @return the MessageConsumer 1134 * @throws JMSException if the session fails to create a MessageConsumer due 1135 * to some internal error. 1136 * @throws InvalidDestinationException if an invalid destination is 1137 * specified. 1138 * @throws InvalidSelectorException if the message selector is invalid. 1139 * @since 1.1 1140 */ 1141 public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException { 1142 return createConsumer(destination, messageSelector, false, messageListener); 1143 } 1144 1145 /** 1146 * Creates <CODE>MessageConsumer</CODE> for the specified destination, 1147 * using a message selector. This method can specify whether messages 1148 * published by its own connection should be delivered to it, if the 1149 * destination is a topic. 1150 * <P> 1151 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from 1152 * <CODE>Destination</CODE>, they can be used in the destination 1153 * parameter to create a <CODE>MessageConsumer</CODE>. 1154 * <P> 1155 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1156 * that have been published to a destination. 1157 * <P> 1158 * In some cases, a connection may both publish and subscribe to a topic. 1159 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to 1160 * inhibit the delivery of messages published by its own connection. The 1161 * default value for this attribute is False. The <CODE>noLocal</CODE> 1162 * value must be supported by destinations that are topics. 1163 * 1164 * @param destination the <CODE>Destination</CODE> to access 1165 * @param messageSelector only messages with properties matching the message 1166 * selector expression are delivered. A value of null or an 1167 * empty string indicates that there is no message selector 1168 * for the message consumer. 1169 * @param noLocal - if true, and the destination is a topic, inhibits the 1170 * delivery of messages published by its own connection. The 1171 * behavior for <CODE>NoLocal</CODE> is not specified if 1172 * the destination is a queue. 1173 * @return the MessageConsumer 1174 * @throws JMSException if the session fails to create a MessageConsumer due 1175 * to some internal error. 1176 * @throws InvalidDestinationException if an invalid destination is 1177 * specified. 1178 * @throws InvalidSelectorException if the message selector is invalid. 1179 * @since 1.1 1180 */ 1181 @Override 1182 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { 1183 return createConsumer(destination, messageSelector, noLocal, null); 1184 } 1185 1186 /** 1187 * Creates <CODE>MessageConsumer</CODE> for the specified destination, 1188 * using a message selector. This method can specify whether messages 1189 * published by its own connection should be delivered to it, if the 1190 * destination is a topic. 1191 * <P> 1192 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from 1193 * <CODE>Destination</CODE>, they can be used in the destination 1194 * parameter to create a <CODE>MessageConsumer</CODE>. 1195 * <P> 1196 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1197 * that have been published to a destination. 1198 * <P> 1199 * In some cases, a connection may both publish and subscribe to a topic. 1200 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to 1201 * inhibit the delivery of messages published by its own connection. The 1202 * default value for this attribute is False. The <CODE>noLocal</CODE> 1203 * value must be supported by destinations that are topics. 1204 * 1205 * @param destination the <CODE>Destination</CODE> to access 1206 * @param messageSelector only messages with properties matching the message 1207 * selector expression are delivered. A value of null or an 1208 * empty string indicates that there is no message selector 1209 * for the message consumer. 1210 * @param noLocal - if true, and the destination is a topic, inhibits the 1211 * delivery of messages published by its own connection. The 1212 * behavior for <CODE>NoLocal</CODE> is not specified if 1213 * the destination is a queue. 1214 * @param messageListener the listener to use for async consumption of messages 1215 * @return the MessageConsumer 1216 * @throws JMSException if the session fails to create a MessageConsumer due 1217 * to some internal error. 1218 * @throws InvalidDestinationException if an invalid destination is 1219 * specified. 1220 * @throws InvalidSelectorException if the message selector is invalid. 1221 * @since 1.1 1222 */ 1223 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException { 1224 checkClosed(); 1225 1226 if (destination instanceof CustomDestination) { 1227 CustomDestination customDestination = (CustomDestination)destination; 1228 return customDestination.createConsumer(this, messageSelector, noLocal); 1229 } 1230 1231 ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy(); 1232 int prefetch = 0; 1233 if (destination instanceof Topic) { 1234 prefetch = prefetchPolicy.getTopicPrefetch(); 1235 } else { 1236 prefetch = prefetchPolicy.getQueuePrefetch(); 1237 } 1238 ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination); 1239 return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector, 1240 prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener); 1241 } 1242 1243 /** 1244 * Creates a queue identity given a <CODE>Queue</CODE> name. 1245 * <P> 1246 * This facility is provided for the rare cases where clients need to 1247 * dynamically manipulate queue identity. It allows the creation of a queue 1248 * identity with a provider-specific name. Clients that depend on this 1249 * ability are not portable. 1250 * <P> 1251 * Note that this method is not for creating the physical queue. The 1252 * physical creation of queues is an administrative task and is not to be 1253 * initiated by the JMS API. The one exception is the creation of temporary 1254 * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE> 1255 * method. 1256 * 1257 * @param queueName the name of this <CODE>Queue</CODE> 1258 * @return a <CODE>Queue</CODE> with the given name 1259 * @throws JMSException if the session fails to create a queue due to some 1260 * internal error. 1261 * @since 1.1 1262 */ 1263 @Override 1264 public Queue createQueue(String queueName) throws JMSException { 1265 checkClosed(); 1266 if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) { 1267 return new ActiveMQTempQueue(queueName); 1268 } 1269 return new ActiveMQQueue(queueName); 1270 } 1271 1272 /** 1273 * Creates a topic identity given a <CODE>Topic</CODE> name. 1274 * <P> 1275 * This facility is provided for the rare cases where clients need to 1276 * dynamically manipulate topic identity. This allows the creation of a 1277 * topic identity with a provider-specific name. Clients that depend on this 1278 * ability are not portable. 1279 * <P> 1280 * Note that this method is not for creating the physical topic. The 1281 * physical creation of topics is an administrative task and is not to be 1282 * initiated by the JMS API. The one exception is the creation of temporary 1283 * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE> 1284 * method. 1285 * 1286 * @param topicName the name of this <CODE>Topic</CODE> 1287 * @return a <CODE>Topic</CODE> with the given name 1288 * @throws JMSException if the session fails to create a topic due to some 1289 * internal error. 1290 * @since 1.1 1291 */ 1292 @Override 1293 public Topic createTopic(String topicName) throws JMSException { 1294 checkClosed(); 1295 if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) { 1296 return new ActiveMQTempTopic(topicName); 1297 } 1298 return new ActiveMQTopic(topicName); 1299 } 1300 1301 /** 1302 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1303 * the specified queue. 1304 * 1305 * @param queue the <CODE>queue</CODE> to access 1306 * @exception InvalidDestinationException if an invalid destination is 1307 * specified 1308 * @since 1.1 1309 */ 1310 /** 1311 * Creates a durable subscriber to the specified topic. 1312 * <P> 1313 * If a client needs to receive all the messages published on a topic, 1314 * including the ones published while the subscriber is inactive, it uses a 1315 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a 1316 * record of this durable subscription and insures that all messages from 1317 * the topic's publishers are retained until they are acknowledged by this 1318 * durable subscriber or they have expired. 1319 * <P> 1320 * Sessions with durable subscribers must always provide the same client 1321 * identifier. In addition, each client must specify a name that uniquely 1322 * identifies (within client identifier) each durable subscription it 1323 * creates. Only one session at a time can have a 1324 * <CODE>TopicSubscriber</CODE> for a particular durable subscription. 1325 * <P> 1326 * A client can change an existing durable subscription by creating a 1327 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic 1328 * and/or message selector. Changing a durable subscriber is equivalent to 1329 * unsubscribing (deleting) the old one and creating a new one. 1330 * <P> 1331 * In some cases, a connection may both publish and subscribe to a topic. 1332 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1333 * inhibit the delivery of messages published by its own connection. The 1334 * default value for this attribute is false. 1335 * 1336 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to 1337 * @param name the name used to identify this subscription 1338 * @return the TopicSubscriber 1339 * @throws JMSException if the session fails to create a subscriber due to 1340 * some internal error. 1341 * @throws InvalidDestinationException if an invalid topic is specified. 1342 * @since 1.1 1343 */ 1344 @Override 1345 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { 1346 checkClosed(); 1347 return createDurableSubscriber(topic, name, null, false); 1348 } 1349 1350 /** 1351 * Creates a durable subscriber to the specified topic, using a message 1352 * selector and specifying whether messages published by its own connection 1353 * should be delivered to it. 1354 * <P> 1355 * If a client needs to receive all the messages published on a topic, 1356 * including the ones published while the subscriber is inactive, it uses a 1357 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a 1358 * record of this durable subscription and insures that all messages from 1359 * the topic's publishers are retained until they are acknowledged by this 1360 * durable subscriber or they have expired. 1361 * <P> 1362 * Sessions with durable subscribers must always provide the same client 1363 * identifier. In addition, each client must specify a name which uniquely 1364 * identifies (within client identifier) each durable subscription it 1365 * creates. Only one session at a time can have a 1366 * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An 1367 * inactive durable subscriber is one that exists but does not currently 1368 * have a message consumer associated with it. 1369 * <P> 1370 * A client can change an existing durable subscription by creating a 1371 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic 1372 * and/or message selector. Changing a durable subscriber is equivalent to 1373 * unsubscribing (deleting) the old one and creating a new one. 1374 * 1375 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to 1376 * @param name the name used to identify this subscription 1377 * @param messageSelector only messages with properties matching the message 1378 * selector expression are delivered. A value of null or an 1379 * empty string indicates that there is no message selector 1380 * for the message consumer. 1381 * @param noLocal if set, inhibits the delivery of messages published by its 1382 * own connection 1383 * @return the Queue Browser 1384 * @throws JMSException if the session fails to create a subscriber due to 1385 * some internal error. 1386 * @throws InvalidDestinationException if an invalid topic is specified. 1387 * @throws InvalidSelectorException if the message selector is invalid. 1388 * @since 1.1 1389 */ 1390 @Override 1391 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { 1392 checkClosed(); 1393 1394 if (topic == null) { 1395 throw new InvalidDestinationException("Topic cannot be null"); 1396 } 1397 1398 if (topic instanceof CustomDestination) { 1399 CustomDestination customDestination = (CustomDestination)topic; 1400 return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal); 1401 } 1402 1403 connection.checkClientIDWasManuallySpecified(); 1404 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1405 int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch(); 1406 int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit(); 1407 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit, 1408 noLocal, false, asyncDispatch); 1409 } 1410 1411 /** 1412 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1413 * the specified queue. 1414 * 1415 * @param queue the <CODE>queue</CODE> to access 1416 * @return the Queue Browser 1417 * @throws JMSException if the session fails to create a browser due to some 1418 * internal error. 1419 * @throws InvalidDestinationException if an invalid destination is 1420 * specified 1421 * @since 1.1 1422 */ 1423 @Override 1424 public QueueBrowser createBrowser(Queue queue) throws JMSException { 1425 checkClosed(); 1426 return createBrowser(queue, null); 1427 } 1428 1429 /** 1430 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1431 * the specified queue using a message selector. 1432 * 1433 * @param queue the <CODE>queue</CODE> to access 1434 * @param messageSelector only messages with properties matching the message 1435 * selector expression are delivered. A value of null or an 1436 * empty string indicates that there is no message selector 1437 * for the message consumer. 1438 * @return the Queue Browser 1439 * @throws JMSException if the session fails to create a browser due to some 1440 * internal error. 1441 * @throws InvalidDestinationException if an invalid destination is 1442 * specified 1443 * @throws InvalidSelectorException if the message selector is invalid. 1444 * @since 1.1 1445 */ 1446 @Override 1447 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { 1448 checkClosed(); 1449 return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch); 1450 } 1451 1452 /** 1453 * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that 1454 * of the <CODE>Connection</CODE> unless it is deleted earlier. 1455 * 1456 * @return a temporary queue identity 1457 * @throws JMSException if the session fails to create a temporary queue due 1458 * to some internal error. 1459 * @since 1.1 1460 */ 1461 @Override 1462 public TemporaryQueue createTemporaryQueue() throws JMSException { 1463 checkClosed(); 1464 return (TemporaryQueue)connection.createTempDestination(false); 1465 } 1466 1467 /** 1468 * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that 1469 * of the <CODE>Connection</CODE> unless it is deleted earlier. 1470 * 1471 * @return a temporary topic identity 1472 * @throws JMSException if the session fails to create a temporary topic due 1473 * to some internal error. 1474 * @since 1.1 1475 */ 1476 @Override 1477 public TemporaryTopic createTemporaryTopic() throws JMSException { 1478 checkClosed(); 1479 return (TemporaryTopic)connection.createTempDestination(true); 1480 } 1481 1482 /** 1483 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from 1484 * the specified queue. 1485 * 1486 * @param queue the <CODE>Queue</CODE> to access 1487 * @return 1488 * @throws JMSException if the session fails to create a receiver due to 1489 * some internal error. 1490 * @throws JMSException 1491 * @throws InvalidDestinationException if an invalid queue is specified. 1492 */ 1493 @Override 1494 public QueueReceiver createReceiver(Queue queue) throws JMSException { 1495 checkClosed(); 1496 return createReceiver(queue, null); 1497 } 1498 1499 /** 1500 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from 1501 * the specified queue using a message selector. 1502 * 1503 * @param queue the <CODE>Queue</CODE> to access 1504 * @param messageSelector only messages with properties matching the message 1505 * selector expression are delivered. A value of null or an 1506 * empty string indicates that there is no message selector 1507 * for the message consumer. 1508 * @return QueueReceiver 1509 * @throws JMSException if the session fails to create a receiver due to 1510 * some internal error. 1511 * @throws InvalidDestinationException if an invalid queue is specified. 1512 * @throws InvalidSelectorException if the message selector is invalid. 1513 */ 1514 @Override 1515 public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { 1516 checkClosed(); 1517 1518 if (queue instanceof CustomDestination) { 1519 CustomDestination customDestination = (CustomDestination)queue; 1520 return customDestination.createReceiver(this, messageSelector); 1521 } 1522 1523 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1524 return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(), 1525 prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch); 1526 } 1527 1528 /** 1529 * Creates a <CODE>QueueSender</CODE> object to send messages to the 1530 * specified queue. 1531 * 1532 * @param queue the <CODE>Queue</CODE> to access, or null if this is an 1533 * unidentified producer 1534 * @return QueueSender 1535 * @throws JMSException if the session fails to create a sender due to some 1536 * internal error. 1537 * @throws InvalidDestinationException if an invalid queue is specified. 1538 */ 1539 @Override 1540 public QueueSender createSender(Queue queue) throws JMSException { 1541 checkClosed(); 1542 if (queue instanceof CustomDestination) { 1543 CustomDestination customDestination = (CustomDestination)queue; 1544 return customDestination.createSender(this); 1545 } 1546 int timeSendOut = connection.getSendTimeout(); 1547 return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut); 1548 } 1549 1550 /** 1551 * Creates a nondurable subscriber to the specified topic. <p/> 1552 * <P> 1553 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages 1554 * that have been published to a topic. <p/> 1555 * <P> 1556 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They 1557 * receive only messages that are published while they are active. <p/> 1558 * <P> 1559 * In some cases, a connection may both publish and subscribe to a topic. 1560 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1561 * inhibit the delivery of messages published by its own connection. The 1562 * default value for this attribute is false. 1563 * 1564 * @param topic the <CODE>Topic</CODE> to subscribe to 1565 * @return TopicSubscriber 1566 * @throws JMSException if the session fails to create a subscriber due to 1567 * some internal error. 1568 * @throws InvalidDestinationException if an invalid topic is specified. 1569 */ 1570 @Override 1571 public TopicSubscriber createSubscriber(Topic topic) throws JMSException { 1572 checkClosed(); 1573 return createSubscriber(topic, null, false); 1574 } 1575 1576 /** 1577 * Creates a nondurable subscriber to the specified topic, using a message 1578 * selector or specifying whether messages published by its own connection 1579 * should be delivered to it. <p/> 1580 * <P> 1581 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages 1582 * that have been published to a topic. <p/> 1583 * <P> 1584 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They 1585 * receive only messages that are published while they are active. <p/> 1586 * <P> 1587 * Messages filtered out by a subscriber's message selector will never be 1588 * delivered to the subscriber. From the subscriber's perspective, they do 1589 * not exist. <p/> 1590 * <P> 1591 * In some cases, a connection may both publish and subscribe to a topic. 1592 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1593 * inhibit the delivery of messages published by its own connection. The 1594 * default value for this attribute is false. 1595 * 1596 * @param topic the <CODE>Topic</CODE> to subscribe to 1597 * @param messageSelector only messages with properties matching the message 1598 * selector expression are delivered. A value of null or an 1599 * empty string indicates that there is no message selector 1600 * for the message consumer. 1601 * @param noLocal if set, inhibits the delivery of messages published by its 1602 * own connection 1603 * @return TopicSubscriber 1604 * @throws JMSException if the session fails to create a subscriber due to 1605 * some internal error. 1606 * @throws InvalidDestinationException if an invalid topic is specified. 1607 * @throws InvalidSelectorException if the message selector is invalid. 1608 */ 1609 @Override 1610 public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { 1611 checkClosed(); 1612 1613 if (topic instanceof CustomDestination) { 1614 CustomDestination customDestination = (CustomDestination)topic; 1615 return customDestination.createSubscriber(this, messageSelector, noLocal); 1616 } 1617 1618 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1619 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy 1620 .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch); 1621 } 1622 1623 /** 1624 * Creates a publisher for the specified topic. <p/> 1625 * <P> 1626 * A client uses a <CODE>TopicPublisher</CODE> object to publish messages 1627 * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on 1628 * a topic, it defines a new sequence of messages that have no ordering 1629 * relationship with the messages it has previously sent. 1630 * 1631 * @param topic the <CODE>Topic</CODE> to publish to, or null if this is 1632 * an unidentified producer 1633 * @return TopicPublisher 1634 * @throws JMSException if the session fails to create a publisher due to 1635 * some internal error. 1636 * @throws InvalidDestinationException if an invalid topic is specified. 1637 */ 1638 @Override 1639 public TopicPublisher createPublisher(Topic topic) throws JMSException { 1640 checkClosed(); 1641 1642 if (topic instanceof CustomDestination) { 1643 CustomDestination customDestination = (CustomDestination)topic; 1644 return customDestination.createPublisher(this); 1645 } 1646 int timeSendOut = connection.getSendTimeout(); 1647 return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut); 1648 } 1649 1650 /** 1651 * Unsubscribes a durable subscription that has been created by a client. 1652 * <P> 1653 * This method deletes the state being maintained on behalf of the 1654 * subscriber by its provider. 1655 * <P> 1656 * It is erroneous for a client to delete a durable subscription while there 1657 * is an active <CODE>MessageConsumer </CODE> or 1658 * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed 1659 * message is part of a pending transaction or has not been acknowledged in 1660 * the session. 1661 * 1662 * @param name the name used to identify this subscription 1663 * @throws JMSException if the session fails to unsubscribe to the durable 1664 * subscription due to some internal error. 1665 * @throws InvalidDestinationException if an invalid subscription name is 1666 * specified. 1667 * @since 1.1 1668 */ 1669 @Override 1670 public void unsubscribe(String name) throws JMSException { 1671 checkClosed(); 1672 connection.unsubscribe(name); 1673 } 1674 1675 @Override 1676 public void dispatch(MessageDispatch messageDispatch) { 1677 try { 1678 executor.execute(messageDispatch); 1679 } catch (InterruptedException e) { 1680 Thread.currentThread().interrupt(); 1681 connection.onClientInternalException(e); 1682 } 1683 } 1684 1685 /** 1686 * Acknowledges all consumed messages of the session of this consumed 1687 * message. 1688 * <P> 1689 * All consumed JMS messages support the <CODE>acknowledge</CODE> method 1690 * for use when a client has specified that its JMS session's consumed 1691 * messages are to be explicitly acknowledged. By invoking 1692 * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges 1693 * all messages consumed by the session that the message was delivered to. 1694 * <P> 1695 * Calls to <CODE>acknowledge</CODE> are ignored for both transacted 1696 * sessions and sessions specified to use implicit acknowledgement modes. 1697 * <P> 1698 * A client may individually acknowledge each message as it is consumed, or 1699 * it may choose to acknowledge messages as an application-defined group 1700 * (which is done by calling acknowledge on the last received message of the 1701 * group, thereby acknowledging all messages consumed by the session.) 1702 * <P> 1703 * Messages that have been received but not acknowledged may be redelivered. 1704 * 1705 * @throws JMSException if the JMS provider fails to acknowledge the 1706 * messages due to some internal error. 1707 * @throws javax.jms.IllegalStateException if this method is called on a 1708 * closed session. 1709 * @see javax.jms.Session#CLIENT_ACKNOWLEDGE 1710 */ 1711 public void acknowledge() throws JMSException { 1712 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1713 ActiveMQMessageConsumer c = iter.next(); 1714 c.acknowledge(); 1715 } 1716 } 1717 1718 /** 1719 * Add a message consumer. 1720 * 1721 * @param consumer - message consumer. 1722 * @throws JMSException 1723 */ 1724 protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException { 1725 this.consumers.add(consumer); 1726 if (consumer.isDurableSubscriber()) { 1727 stats.onCreateDurableSubscriber(); 1728 } 1729 this.connection.addDispatcher(consumer.getConsumerId(), this); 1730 } 1731 1732 /** 1733 * Remove the message consumer. 1734 * 1735 * @param consumer - consumer to be removed. 1736 * @throws JMSException 1737 */ 1738 protected void removeConsumer(ActiveMQMessageConsumer consumer) { 1739 this.connection.removeDispatcher(consumer.getConsumerId()); 1740 if (consumer.isDurableSubscriber()) { 1741 stats.onRemoveDurableSubscriber(); 1742 } 1743 this.consumers.remove(consumer); 1744 this.connection.removeDispatcher(consumer); 1745 } 1746 1747 /** 1748 * Adds a message producer. 1749 * 1750 * @param producer - message producer to be added. 1751 * @throws JMSException 1752 */ 1753 protected void addProducer(ActiveMQMessageProducer producer) throws JMSException { 1754 this.producers.add(producer); 1755 this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer); 1756 } 1757 1758 /** 1759 * Removes a message producer. 1760 * 1761 * @param producer - message producer to be removed. 1762 * @throws JMSException 1763 */ 1764 protected void removeProducer(ActiveMQMessageProducer producer) { 1765 this.connection.removeProducer(producer.getProducerInfo().getProducerId()); 1766 this.producers.remove(producer); 1767 } 1768 1769 /** 1770 * Start this Session. 1771 * 1772 * @throws JMSException 1773 */ 1774 protected void start() throws JMSException { 1775 started.set(true); 1776 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1777 ActiveMQMessageConsumer c = iter.next(); 1778 c.start(); 1779 } 1780 executor.start(); 1781 } 1782 1783 /** 1784 * Stops this session. 1785 * 1786 * @throws JMSException 1787 */ 1788 protected void stop() throws JMSException { 1789 1790 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1791 ActiveMQMessageConsumer c = iter.next(); 1792 c.stop(); 1793 } 1794 1795 started.set(false); 1796 executor.stop(); 1797 } 1798 1799 /** 1800 * Returns the session id. 1801 * 1802 * @return value - session id. 1803 */ 1804 protected SessionId getSessionId() { 1805 return info.getSessionId(); 1806 } 1807 1808 /** 1809 * @return 1810 */ 1811 protected ConsumerId getNextConsumerId() { 1812 return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId()); 1813 } 1814 1815 /** 1816 * @return 1817 */ 1818 protected ProducerId getNextProducerId() { 1819 return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId()); 1820 } 1821 1822 /** 1823 * Sends the message for dispatch by the broker. 1824 * 1825 * 1826 * @param producer - message producer. 1827 * @param destination - message destination. 1828 * @param message - message to be sent. 1829 * @param deliveryMode - JMS messsage delivery mode. 1830 * @param priority - message priority. 1831 * @param timeToLive - message expiration. 1832 * @param producerWindow 1833 * @param onComplete 1834 * @throws JMSException 1835 */ 1836 protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, 1837 MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException { 1838 1839 checkClosed(); 1840 if (destination.isTemporary() && connection.isDeleted(destination)) { 1841 throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination); 1842 } 1843 synchronized (sendMutex) { 1844 // tell the Broker we are about to start a new transaction 1845 doStartTransaction(); 1846 TransactionId txid = transactionContext.getTransactionId(); 1847 long sequenceNumber = producer.getMessageSequence(); 1848 1849 //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11 1850 message.setJMSDeliveryMode(deliveryMode); 1851 long expiration = 0L; 1852 if (!producer.getDisableMessageTimestamp()) { 1853 long timeStamp = System.currentTimeMillis(); 1854 message.setJMSTimestamp(timeStamp); 1855 if (timeToLive > 0) { 1856 expiration = timeToLive + timeStamp; 1857 } 1858 } 1859 message.setJMSExpiration(expiration); 1860 message.setJMSPriority(priority); 1861 message.setJMSRedelivered(false); 1862 1863 // transform to our own message format here 1864 ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection); 1865 msg.setDestination(destination); 1866 msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber)); 1867 1868 // Set the message id. 1869 if (msg != message) { 1870 message.setJMSMessageID(msg.getMessageId().toString()); 1871 // Make sure the JMS destination is set on the foreign messages too. 1872 message.setJMSDestination(destination); 1873 } 1874 //clear the brokerPath in case we are re-sending this message 1875 msg.setBrokerPath(null); 1876 1877 msg.setTransactionId(txid); 1878 if (connection.isCopyMessageOnSend()) { 1879 msg = (ActiveMQMessage)msg.copy(); 1880 } 1881 msg.setConnection(connection); 1882 msg.onSend(); 1883 msg.setProducerId(msg.getMessageId().getProducerId()); 1884 if (LOG.isTraceEnabled()) { 1885 LOG.trace(getSessionId() + " sending message: " + msg); 1886 } 1887 if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) { 1888 this.connection.asyncSendPacket(msg); 1889 if (producerWindow != null) { 1890 // Since we defer lots of the marshaling till we hit the 1891 // wire, this might not 1892 // provide and accurate size. We may change over to doing 1893 // more aggressive marshaling, 1894 // to get more accurate sizes.. this is more important once 1895 // users start using producer window 1896 // flow control. 1897 int size = msg.getSize(); 1898 producerWindow.increaseUsage(size); 1899 } 1900 } else { 1901 if (sendTimeout > 0 && onComplete==null) { 1902 this.connection.syncSendPacket(msg,sendTimeout); 1903 }else { 1904 this.connection.syncSendPacket(msg, onComplete); 1905 } 1906 } 1907 1908 } 1909 } 1910 1911 /** 1912 * Send TransactionInfo to indicate transaction has started 1913 * 1914 * @throws JMSException if some internal error occurs 1915 */ 1916 protected void doStartTransaction() throws JMSException { 1917 if (getTransacted() && !transactionContext.isInXATransaction()) { 1918 transactionContext.begin(); 1919 } 1920 } 1921 1922 /** 1923 * Checks whether the session has unconsumed messages. 1924 * 1925 * @return true - if there are unconsumed messages. 1926 */ 1927 public boolean hasUncomsumedMessages() { 1928 return executor.hasUncomsumedMessages(); 1929 } 1930 1931 /** 1932 * Checks whether the session uses transactions. 1933 * 1934 * @return true - if the session uses transactions. 1935 */ 1936 public boolean isTransacted() { 1937 return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction()); 1938 } 1939 1940 /** 1941 * Checks whether the session used client acknowledgment. 1942 * 1943 * @return true - if the session uses client acknowledgment. 1944 */ 1945 protected boolean isClientAcknowledge() { 1946 return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE; 1947 } 1948 1949 /** 1950 * Checks whether the session used auto acknowledgment. 1951 * 1952 * @return true - if the session uses client acknowledgment. 1953 */ 1954 public boolean isAutoAcknowledge() { 1955 return acknowledgementMode == Session.AUTO_ACKNOWLEDGE; 1956 } 1957 1958 /** 1959 * Checks whether the session used dup ok acknowledgment. 1960 * 1961 * @return true - if the session uses client acknowledgment. 1962 */ 1963 public boolean isDupsOkAcknowledge() { 1964 return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE; 1965 } 1966 1967 public boolean isIndividualAcknowledge(){ 1968 return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE; 1969 } 1970 1971 /** 1972 * Returns the message delivery listener. 1973 * 1974 * @return deliveryListener - message delivery listener. 1975 */ 1976 public DeliveryListener getDeliveryListener() { 1977 return deliveryListener; 1978 } 1979 1980 /** 1981 * Sets the message delivery listener. 1982 * 1983 * @param deliveryListener - message delivery listener. 1984 */ 1985 public void setDeliveryListener(DeliveryListener deliveryListener) { 1986 this.deliveryListener = deliveryListener; 1987 } 1988 1989 /** 1990 * Returns the SessionInfo bean. 1991 * 1992 * @return info - SessionInfo bean. 1993 * @throws JMSException 1994 */ 1995 protected SessionInfo getSessionInfo() throws JMSException { 1996 SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue()); 1997 return info; 1998 } 1999 2000 /** 2001 * Send the asynchronus command. 2002 * 2003 * @param command - command to be executed. 2004 * @throws JMSException 2005 */ 2006 public void asyncSendPacket(Command command) throws JMSException { 2007 connection.asyncSendPacket(command); 2008 } 2009 2010 /** 2011 * Send the synchronus command. 2012 * 2013 * @param command - command to be executed. 2014 * @return Response 2015 * @throws JMSException 2016 */ 2017 public Response syncSendPacket(Command command) throws JMSException { 2018 return connection.syncSendPacket(command); 2019 } 2020 2021 public long getNextDeliveryId() { 2022 return deliveryIdGenerator.getNextSequenceId(); 2023 } 2024 2025 public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException { 2026 2027 List<MessageDispatch> c = unconsumedMessages.removeAll(); 2028 for (MessageDispatch md : c) { 2029 this.connection.rollbackDuplicate(dispatcher, md.getMessage()); 2030 } 2031 Collections.reverse(c); 2032 2033 for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) { 2034 MessageDispatch md = iter.next(); 2035 executor.executeFirst(md); 2036 } 2037 2038 } 2039 2040 public boolean isRunning() { 2041 return started.get(); 2042 } 2043 2044 public boolean isAsyncDispatch() { 2045 return asyncDispatch; 2046 } 2047 2048 public void setAsyncDispatch(boolean asyncDispatch) { 2049 this.asyncDispatch = asyncDispatch; 2050 } 2051 2052 /** 2053 * @return Returns the sessionAsyncDispatch. 2054 */ 2055 public boolean isSessionAsyncDispatch() { 2056 return sessionAsyncDispatch; 2057 } 2058 2059 /** 2060 * @param sessionAsyncDispatch The sessionAsyncDispatch to set. 2061 */ 2062 public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) { 2063 this.sessionAsyncDispatch = sessionAsyncDispatch; 2064 } 2065 2066 public MessageTransformer getTransformer() { 2067 return transformer; 2068 } 2069 2070 public ActiveMQConnection getConnection() { 2071 return connection; 2072 } 2073 2074 /** 2075 * Sets the transformer used to transform messages before they are sent on 2076 * to the JMS bus or when they are received from the bus but before they are 2077 * delivered to the JMS client 2078 */ 2079 public void setTransformer(MessageTransformer transformer) { 2080 this.transformer = transformer; 2081 } 2082 2083 public BlobTransferPolicy getBlobTransferPolicy() { 2084 return blobTransferPolicy; 2085 } 2086 2087 /** 2088 * Sets the policy used to describe how out-of-band BLOBs (Binary Large 2089 * OBjects) are transferred from producers to brokers to consumers 2090 */ 2091 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { 2092 this.blobTransferPolicy = blobTransferPolicy; 2093 } 2094 2095 public List<MessageDispatch> getUnconsumedMessages() { 2096 return executor.getUnconsumedMessages(); 2097 } 2098 2099 @Override 2100 public String toString() { 2101 return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "}"; 2102 } 2103 2104 public void checkMessageListener() throws JMSException { 2105 if (messageListener != null) { 2106 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); 2107 } 2108 for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) { 2109 ActiveMQMessageConsumer consumer = i.next(); 2110 if (consumer.hasMessageListener()) { 2111 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); 2112 } 2113 } 2114 } 2115 2116 protected void setOptimizeAcknowledge(boolean value) { 2117 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2118 ActiveMQMessageConsumer c = iter.next(); 2119 c.setOptimizeAcknowledge(value); 2120 } 2121 } 2122 2123 protected void setPrefetchSize(ConsumerId id, int prefetch) { 2124 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2125 ActiveMQMessageConsumer c = iter.next(); 2126 if (c.getConsumerId().equals(id)) { 2127 c.setPrefetchSize(prefetch); 2128 break; 2129 } 2130 } 2131 } 2132 2133 protected void close(ConsumerId id) { 2134 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2135 ActiveMQMessageConsumer c = iter.next(); 2136 if (c.getConsumerId().equals(id)) { 2137 try { 2138 c.close(); 2139 } catch (JMSException e) { 2140 LOG.warn("Exception closing consumer", e); 2141 } 2142 LOG.warn("Closed consumer on Command, " + id); 2143 break; 2144 } 2145 } 2146 } 2147 2148 public boolean isInUse(ActiveMQTempDestination destination) { 2149 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2150 ActiveMQMessageConsumer c = iter.next(); 2151 if (c.isInUse(destination)) { 2152 return true; 2153 } 2154 } 2155 return false; 2156 } 2157 2158 /** 2159 * highest sequence id of the last message delivered by this session. 2160 * Passed to the broker in the close command, maintained by dispose() 2161 * @return lastDeliveredSequenceId 2162 */ 2163 public long getLastDeliveredSequenceId() { 2164 return lastDeliveredSequenceId; 2165 } 2166 2167 protected void sendAck(MessageAck ack) throws JMSException { 2168 sendAck(ack,false); 2169 } 2170 2171 protected void sendAck(MessageAck ack, boolean lazy) throws JMSException { 2172 if (lazy || connection.isSendAcksAsync() || getTransacted()) { 2173 asyncSendPacket(ack); 2174 } else { 2175 syncSendPacket(ack); 2176 } 2177 } 2178 2179 protected Scheduler getScheduler() throws JMSException { 2180 return this.connection.getScheduler(); 2181 } 2182 2183 protected ThreadPoolExecutor getConnectionExecutor() { 2184 return this.connectionExecutor; 2185 } 2186}