001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.io.File; 020import java.io.InputStream; 021import java.io.Serializable; 022import java.net.URL; 023import java.util.Collections; 024import java.util.Iterator; 025import java.util.List; 026import java.util.concurrent.CopyOnWriteArrayList; 027import java.util.concurrent.ThreadPoolExecutor; 028import java.util.concurrent.atomic.AtomicBoolean; 029import java.util.concurrent.atomic.AtomicInteger; 030 031import javax.jms.BytesMessage; 032import javax.jms.Destination; 033import javax.jms.IllegalStateException; 034import javax.jms.InvalidDestinationException; 035import javax.jms.InvalidSelectorException; 036import javax.jms.JMSException; 037import javax.jms.MapMessage; 038import javax.jms.Message; 039import javax.jms.MessageConsumer; 040import javax.jms.MessageListener; 041import javax.jms.MessageProducer; 042import javax.jms.ObjectMessage; 043import javax.jms.Queue; 044import javax.jms.QueueBrowser; 045import javax.jms.QueueReceiver; 046import javax.jms.QueueSender; 047import javax.jms.QueueSession; 048import javax.jms.Session; 049import javax.jms.StreamMessage; 050import javax.jms.TemporaryQueue; 051import javax.jms.TemporaryTopic; 052import javax.jms.TextMessage; 053import javax.jms.Topic; 054import javax.jms.TopicPublisher; 055import javax.jms.TopicSession; 056import javax.jms.TopicSubscriber; 057import javax.jms.TransactionRolledBackException; 058 059import org.apache.activemq.blob.BlobDownloader; 060import org.apache.activemq.blob.BlobTransferPolicy; 061import org.apache.activemq.blob.BlobUploader; 062import org.apache.activemq.command.ActiveMQBlobMessage; 063import org.apache.activemq.command.ActiveMQBytesMessage; 064import org.apache.activemq.command.ActiveMQDestination; 065import org.apache.activemq.command.ActiveMQMapMessage; 066import org.apache.activemq.command.ActiveMQMessage; 067import org.apache.activemq.command.ActiveMQObjectMessage; 068import org.apache.activemq.command.ActiveMQQueue; 069import org.apache.activemq.command.ActiveMQStreamMessage; 070import org.apache.activemq.command.ActiveMQTempDestination; 071import org.apache.activemq.command.ActiveMQTempQueue; 072import org.apache.activemq.command.ActiveMQTempTopic; 073import org.apache.activemq.command.ActiveMQTextMessage; 074import org.apache.activemq.command.ActiveMQTopic; 075import org.apache.activemq.command.Command; 076import org.apache.activemq.command.ConsumerId; 077import org.apache.activemq.command.MessageAck; 078import org.apache.activemq.command.MessageDispatch; 079import org.apache.activemq.command.MessageId; 080import org.apache.activemq.command.ProducerId; 081import org.apache.activemq.command.RemoveInfo; 082import org.apache.activemq.command.Response; 083import org.apache.activemq.command.SessionId; 084import org.apache.activemq.command.SessionInfo; 085import org.apache.activemq.command.TransactionId; 086import org.apache.activemq.management.JMSSessionStatsImpl; 087import org.apache.activemq.management.StatsCapable; 088import org.apache.activemq.management.StatsImpl; 089import org.apache.activemq.thread.Scheduler; 090import org.apache.activemq.transaction.Synchronization; 091import org.apache.activemq.usage.MemoryUsage; 092import org.apache.activemq.util.Callback; 093import org.apache.activemq.util.LongSequenceGenerator; 094import org.slf4j.Logger; 095import org.slf4j.LoggerFactory; 096 097/** 098 * <P> 099 * A <CODE>Session</CODE> object is a single-threaded context for producing 100 * and consuming messages. Although it may allocate provider resources outside 101 * the Java virtual machine (JVM), it is considered a lightweight JMS object. 102 * <P> 103 * A session serves several purposes: 104 * <UL> 105 * <LI>It is a factory for its message producers and consumers. 106 * <LI>It supplies provider-optimized message factories. 107 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and 108 * <CODE>TemporaryQueues</CODE>. 109 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE> 110 * objects for those clients that need to dynamically manipulate 111 * provider-specific destination names. 112 * <LI>It supports a single series of transactions that combine work spanning 113 * its producers and consumers into atomic units. 114 * <LI>It defines a serial order for the messages it consumes and the messages 115 * it produces. 116 * <LI>It retains messages it consumes until they have been acknowledged. 117 * <LI>It serializes execution of message listeners registered with its message 118 * consumers. 119 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>. 120 * </UL> 121 * <P> 122 * A session can create and service multiple message producers and consumers. 123 * <P> 124 * One typical use is to have a thread block on a synchronous 125 * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then 126 * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s. 127 * <P> 128 * If a client desires to have one thread produce messages while others consume 129 * them, the client should use a separate session for its producing thread. 130 * <P> 131 * Once a connection has been started, any session with one or more registered 132 * message listeners is dedicated to the thread of control that delivers 133 * messages to it. It is erroneous for client code to use this session or any of 134 * its constituent objects from another thread of control. The only exception to 135 * this rule is the use of the session or connection <CODE>close</CODE> 136 * method. 137 * <P> 138 * It should be easy for most clients to partition their work naturally into 139 * sessions. This model allows clients to start simply and incrementally add 140 * message processing complexity as their need for concurrency grows. 141 * <P> 142 * The <CODE>close</CODE> method is the only session method that can be called 143 * while some other session method is being executed in another thread. 144 * <P> 145 * A session may be specified as transacted. Each transacted session supports a 146 * single series of transactions. Each transaction groups a set of message sends 147 * and a set of message receives into an atomic unit of work. In effect, 148 * transactions organize a session's input message stream and output message 149 * stream into series of atomic units. When a transaction commits, its atomic 150 * unit of input is acknowledged and its associated atomic unit of output is 151 * sent. If a transaction rollback is done, the transaction's sent messages are 152 * destroyed and the session's input is automatically recovered. 153 * <P> 154 * The content of a transaction's input and output units is simply those 155 * messages that have been produced and consumed within the session's current 156 * transaction. 157 * <P> 158 * A transaction is completed using either its session's <CODE>commit</CODE> 159 * method or its session's <CODE>rollback </CODE> method. The completion of a 160 * session's current transaction automatically begins the next. The result is 161 * that a transacted session always has a current transaction within which its 162 * work is done. 163 * <P> 164 * The Java Transaction Service (JTS) or some other transaction monitor may be 165 * used to combine a session's transaction with transactions on other resources 166 * (databases, other JMS sessions, etc.). Since Java distributed transactions 167 * are controlled via the Java Transaction API (JTA), use of the session's 168 * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is 169 * prohibited. 170 * <P> 171 * The JMS API does not require support for JTA; however, it does define how a 172 * provider supplies this support. 173 * <P> 174 * Although it is also possible for a JMS client to handle distributed 175 * transactions directly, it is unlikely that many JMS clients will do this. 176 * Support for JTA in the JMS API is targeted at systems vendors who will be 177 * integrating the JMS API into their application server products. 178 * 179 * 180 * @see javax.jms.Session 181 * @see javax.jms.QueueSession 182 * @see javax.jms.TopicSession 183 * @see javax.jms.XASession 184 */ 185public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher { 186 187 /** 188 * Only acknowledge an individual message - using message.acknowledge() 189 * as opposed to CLIENT_ACKNOWLEDGE which 190 * acknowledges all messages consumed by a session at when acknowledge() 191 * is called 192 */ 193 public static final int INDIVIDUAL_ACKNOWLEDGE = 4; 194 public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE; 195 196 public static interface DeliveryListener { 197 void beforeDelivery(ActiveMQSession session, Message msg); 198 199 void afterDelivery(ActiveMQSession session, Message msg); 200 } 201 202 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class); 203 private final ThreadPoolExecutor connectionExecutor; 204 205 protected int acknowledgementMode; 206 protected final ActiveMQConnection connection; 207 protected final SessionInfo info; 208 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 209 protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator(); 210 protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator(); 211 protected final ActiveMQSessionExecutor executor; 212 protected final AtomicBoolean started = new AtomicBoolean(false); 213 214 protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>(); 215 protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>(); 216 217 protected boolean closed; 218 private volatile boolean synchronizationRegistered; 219 protected boolean asyncDispatch; 220 protected boolean sessionAsyncDispatch; 221 protected final boolean debug; 222 protected Object sendMutex = new Object(); 223 private final AtomicBoolean clearInProgress = new AtomicBoolean(); 224 225 private MessageListener messageListener; 226 private final JMSSessionStatsImpl stats; 227 private TransactionContext transactionContext; 228 private DeliveryListener deliveryListener; 229 private MessageTransformer transformer; 230 private BlobTransferPolicy blobTransferPolicy; 231 private long lastDeliveredSequenceId; 232 233 /** 234 * Construct the Session 235 * 236 * @param connection 237 * @param sessionId 238 * @param acknowledgeMode n.b if transacted - the acknowledgeMode == 239 * Session.SESSION_TRANSACTED 240 * @param asyncDispatch 241 * @param sessionAsyncDispatch 242 * @throws JMSException on internal error 243 */ 244 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException { 245 this.debug = LOG.isDebugEnabled(); 246 this.connection = connection; 247 this.acknowledgementMode = acknowledgeMode; 248 this.asyncDispatch = asyncDispatch; 249 this.sessionAsyncDispatch = sessionAsyncDispatch; 250 this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue()); 251 setTransactionContext(new TransactionContext(connection)); 252 stats = new JMSSessionStatsImpl(producers, consumers); 253 this.connection.asyncSendPacket(info); 254 setTransformer(connection.getTransformer()); 255 setBlobTransferPolicy(connection.getBlobTransferPolicy()); 256 this.connectionExecutor=connection.getExecutor(); 257 this.executor = new ActiveMQSessionExecutor(this); 258 connection.addSession(this); 259 if (connection.isStarted()) { 260 start(); 261 } 262 263 } 264 265 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException { 266 this(connection, sessionId, acknowledgeMode, asyncDispatch, true); 267 } 268 269 /** 270 * Sets the transaction context of the session. 271 * 272 * @param transactionContext - provides the means to control a JMS 273 * transaction. 274 */ 275 public void setTransactionContext(TransactionContext transactionContext) { 276 this.transactionContext = transactionContext; 277 } 278 279 /** 280 * Returns the transaction context of the session. 281 * 282 * @return transactionContext - session's transaction context. 283 */ 284 public TransactionContext getTransactionContext() { 285 return transactionContext; 286 } 287 288 /* 289 * (non-Javadoc) 290 * 291 * @see org.apache.activemq.management.StatsCapable#getStats() 292 */ 293 @Override 294 public StatsImpl getStats() { 295 return stats; 296 } 297 298 /** 299 * Returns the session's statistics. 300 * 301 * @return stats - session's statistics. 302 */ 303 public JMSSessionStatsImpl getSessionStats() { 304 return stats; 305 } 306 307 /** 308 * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE> 309 * object is used to send a message containing a stream of uninterpreted 310 * bytes. 311 * 312 * @return the an ActiveMQBytesMessage 313 * @throws JMSException if the JMS provider fails to create this message due 314 * to some internal error. 315 */ 316 @Override 317 public BytesMessage createBytesMessage() throws JMSException { 318 ActiveMQBytesMessage message = new ActiveMQBytesMessage(); 319 configureMessage(message); 320 return message; 321 } 322 323 /** 324 * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE> 325 * object is used to send a self-defining set of name-value pairs, where 326 * names are <CODE>String</CODE> objects and values are primitive values 327 * in the Java programming language. 328 * 329 * @return an ActiveMQMapMessage 330 * @throws JMSException if the JMS provider fails to create this message due 331 * to some internal error. 332 */ 333 @Override 334 public MapMessage createMapMessage() throws JMSException { 335 ActiveMQMapMessage message = new ActiveMQMapMessage(); 336 configureMessage(message); 337 return message; 338 } 339 340 /** 341 * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE> 342 * interface is the root interface of all JMS messages. A 343 * <CODE>Message</CODE> object holds all the standard message header 344 * information. It can be sent when a message containing only header 345 * information is sufficient. 346 * 347 * @return an ActiveMQMessage 348 * @throws JMSException if the JMS provider fails to create this message due 349 * to some internal error. 350 */ 351 @Override 352 public Message createMessage() throws JMSException { 353 ActiveMQMessage message = new ActiveMQMessage(); 354 configureMessage(message); 355 return message; 356 } 357 358 /** 359 * Creates an <CODE>ObjectMessage</CODE> object. An 360 * <CODE>ObjectMessage</CODE> object is used to send a message that 361 * contains a serializable Java object. 362 * 363 * @return an ActiveMQObjectMessage 364 * @throws JMSException if the JMS provider fails to create this message due 365 * to some internal error. 366 */ 367 @Override 368 public ObjectMessage createObjectMessage() throws JMSException { 369 ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 370 configureMessage(message); 371 return message; 372 } 373 374 /** 375 * Creates an initialized <CODE>ObjectMessage</CODE> object. An 376 * <CODE>ObjectMessage</CODE> object is used to send a message that 377 * contains a serializable Java object. 378 * 379 * @param object the object to use to initialize this message 380 * @return an ActiveMQObjectMessage 381 * @throws JMSException if the JMS provider fails to create this message due 382 * to some internal error. 383 */ 384 @Override 385 public ObjectMessage createObjectMessage(Serializable object) throws JMSException { 386 ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 387 configureMessage(message); 388 message.setObject(object); 389 return message; 390 } 391 392 /** 393 * Creates a <CODE>StreamMessage</CODE> object. A 394 * <CODE>StreamMessage</CODE> object is used to send a self-defining 395 * stream of primitive values in the Java programming language. 396 * 397 * @return an ActiveMQStreamMessage 398 * @throws JMSException if the JMS provider fails to create this message due 399 * to some internal error. 400 */ 401 @Override 402 public StreamMessage createStreamMessage() throws JMSException { 403 ActiveMQStreamMessage message = new ActiveMQStreamMessage(); 404 configureMessage(message); 405 return message; 406 } 407 408 /** 409 * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> 410 * object is used to send a message containing a <CODE>String</CODE> 411 * object. 412 * 413 * @return an ActiveMQTextMessage 414 * @throws JMSException if the JMS provider fails to create this message due 415 * to some internal error. 416 */ 417 @Override 418 public TextMessage createTextMessage() throws JMSException { 419 ActiveMQTextMessage message = new ActiveMQTextMessage(); 420 configureMessage(message); 421 return message; 422 } 423 424 /** 425 * Creates an initialized <CODE>TextMessage</CODE> object. A 426 * <CODE>TextMessage</CODE> object is used to send a message containing a 427 * <CODE>String</CODE>. 428 * 429 * @param text the string used to initialize this message 430 * @return an ActiveMQTextMessage 431 * @throws JMSException if the JMS provider fails to create this message due 432 * to some internal error. 433 */ 434 @Override 435 public TextMessage createTextMessage(String text) throws JMSException { 436 ActiveMQTextMessage message = new ActiveMQTextMessage(); 437 message.setText(text); 438 configureMessage(message); 439 return message; 440 } 441 442 /** 443 * Creates an initialized <CODE>BlobMessage</CODE> object. A 444 * <CODE>BlobMessage</CODE> object is used to send a message containing a 445 * <CODE>URL</CODE> which points to some network addressible BLOB. 446 * 447 * @param url the network addressable URL used to pass directly to the 448 * consumer 449 * @return a BlobMessage 450 * @throws JMSException if the JMS provider fails to create this message due 451 * to some internal error. 452 */ 453 public BlobMessage createBlobMessage(URL url) throws JMSException { 454 return createBlobMessage(url, false); 455 } 456 457 /** 458 * Creates an initialized <CODE>BlobMessage</CODE> object. A 459 * <CODE>BlobMessage</CODE> object is used to send a message containing a 460 * <CODE>URL</CODE> which points to some network addressible BLOB. 461 * 462 * @param url the network addressable URL used to pass directly to the 463 * consumer 464 * @param deletedByBroker indicates whether or not the resource is deleted 465 * by the broker when the message is acknowledged 466 * @return a BlobMessage 467 * @throws JMSException if the JMS provider fails to create this message due 468 * to some internal error. 469 */ 470 public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException { 471 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 472 configureMessage(message); 473 message.setURL(url); 474 message.setDeletedByBroker(deletedByBroker); 475 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy())); 476 return message; 477 } 478 479 /** 480 * Creates an initialized <CODE>BlobMessage</CODE> object. A 481 * <CODE>BlobMessage</CODE> object is used to send a message containing 482 * the <CODE>File</CODE> content. Before the message is sent the file 483 * conent will be uploaded to the broker or some other remote repository 484 * depending on the {@link #getBlobTransferPolicy()}. 485 * 486 * @param file the file to be uploaded to some remote repo (or the broker) 487 * depending on the strategy 488 * @return a BlobMessage 489 * @throws JMSException if the JMS provider fails to create this message due 490 * to some internal error. 491 */ 492 public BlobMessage createBlobMessage(File file) throws JMSException { 493 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 494 configureMessage(message); 495 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file)); 496 message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy()))); 497 message.setDeletedByBroker(true); 498 message.setName(file.getName()); 499 return message; 500 } 501 502 /** 503 * Creates an initialized <CODE>BlobMessage</CODE> object. A 504 * <CODE>BlobMessage</CODE> object is used to send a message containing 505 * the <CODE>File</CODE> content. Before the message is sent the file 506 * conent will be uploaded to the broker or some other remote repository 507 * depending on the {@link #getBlobTransferPolicy()}. <br/> 508 * <p> 509 * The caller of this method is responsible for closing the 510 * input stream that is used, however the stream can not be closed 511 * until <b>after</b> the message has been sent. To have this class 512 * manage the stream and close it automatically, use the method 513 * {@link ActiveMQSession#createBlobMessage(File)} 514 * 515 * @param in the stream to be uploaded to some remote repo (or the broker) 516 * depending on the strategy 517 * @return a BlobMessage 518 * @throws JMSException if the JMS provider fails to create this message due 519 * to some internal error. 520 */ 521 public BlobMessage createBlobMessage(InputStream in) throws JMSException { 522 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 523 configureMessage(message); 524 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in)); 525 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy())); 526 message.setDeletedByBroker(true); 527 return message; 528 } 529 530 /** 531 * Indicates whether the session is in transacted mode. 532 * 533 * @return true if the session is in transacted mode 534 * @throws JMSException if there is some internal error. 535 */ 536 @Override 537 public boolean getTransacted() throws JMSException { 538 checkClosed(); 539 return isTransacted(); 540 } 541 542 /** 543 * Returns the acknowledgement mode of the session. The acknowledgement mode 544 * is set at the time that the session is created. If the session is 545 * transacted, the acknowledgement mode is ignored. 546 * 547 * @return If the session is not transacted, returns the current 548 * acknowledgement mode for the session. If the session is 549 * transacted, returns SESSION_TRANSACTED. 550 * @throws JMSException 551 * @see javax.jms.Connection#createSession(boolean,int) 552 * @since 1.1 exception JMSException if there is some internal error. 553 */ 554 @Override 555 public int getAcknowledgeMode() throws JMSException { 556 checkClosed(); 557 return this.acknowledgementMode; 558 } 559 560 /** 561 * Commits all messages done in this transaction and releases any locks 562 * currently held. 563 * 564 * @throws JMSException if the JMS provider fails to commit the transaction 565 * due to some internal error. 566 * @throws TransactionRolledBackException if the transaction is rolled back 567 * due to some internal error during commit. 568 * @throws javax.jms.IllegalStateException if the method is not called by a 569 * transacted session. 570 */ 571 @Override 572 public void commit() throws JMSException { 573 checkClosed(); 574 if (!getTransacted()) { 575 throw new javax.jms.IllegalStateException("Not a transacted session"); 576 } 577 if (LOG.isDebugEnabled()) { 578 LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId()); 579 } 580 transactionContext.commit(); 581 } 582 583 /** 584 * Rolls back any messages done in this transaction and releases any locks 585 * currently held. 586 * 587 * @throws JMSException if the JMS provider fails to roll back the 588 * transaction due to some internal error. 589 * @throws javax.jms.IllegalStateException if the method is not called by a 590 * transacted session. 591 */ 592 @Override 593 public void rollback() throws JMSException { 594 checkClosed(); 595 if (!getTransacted()) { 596 throw new javax.jms.IllegalStateException("Not a transacted session"); 597 } 598 if (LOG.isDebugEnabled()) { 599 LOG.debug(getSessionId() + " Transaction Rollback, txid:" + transactionContext.getTransactionId()); 600 } 601 transactionContext.rollback(); 602 } 603 604 /** 605 * Closes the session. 606 * <P> 607 * Since a provider may allocate some resources on behalf of a session 608 * outside the JVM, clients should close the resources when they are not 609 * needed. Relying on garbage collection to eventually reclaim these 610 * resources may not be timely enough. 611 * <P> 612 * There is no need to close the producers and consumers of a closed 613 * session. 614 * <P> 615 * This call will block until a <CODE>receive</CODE> call or message 616 * listener in progress has completed. A blocked message consumer 617 * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session 618 * is closed. 619 * <P> 620 * Closing a transacted session must roll back the transaction in progress. 621 * <P> 622 * This method is the only <CODE>Session</CODE> method that can be called 623 * concurrently. 624 * <P> 625 * Invoking any other <CODE>Session</CODE> method on a closed session must 626 * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a 627 * closed session must <I>not </I> throw an exception. 628 * 629 * @throws JMSException if the JMS provider fails to close the session due 630 * to some internal error. 631 */ 632 @Override 633 public void close() throws JMSException { 634 if (!closed) { 635 if (getTransactionContext().isInXATransaction()) { 636 if (!synchronizationRegistered) { 637 synchronizationRegistered = true; 638 getTransactionContext().addSynchronization(new Synchronization() { 639 640 @Override 641 public void afterCommit() throws Exception { 642 doClose(); 643 synchronizationRegistered = false; 644 } 645 646 @Override 647 public void afterRollback() throws Exception { 648 doClose(); 649 synchronizationRegistered = false; 650 } 651 }); 652 } 653 654 } else { 655 doClose(); 656 } 657 } 658 } 659 660 private void doClose() throws JMSException { 661 boolean interrupted = Thread.interrupted(); 662 dispose(); 663 RemoveInfo removeCommand = info.createRemoveCommand(); 664 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 665 connection.asyncSendPacket(removeCommand); 666 if (interrupted) { 667 Thread.currentThread().interrupt(); 668 } 669 } 670 671 final AtomicInteger clearRequestsCounter = new AtomicInteger(0); 672 void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) { 673 clearRequestsCounter.incrementAndGet(); 674 executor.clearMessagesInProgress(); 675 // we are called from inside the transport reconnection logic which involves us 676 // clearing all the connections' consumers dispatch and delivered lists. So rather 677 // than trying to grab a mutex (which could be already owned by the message listener 678 // calling the send or an ack) we allow it to complete in a separate thread via the 679 // scheduler and notify us via connection.transportInterruptionProcessingComplete() 680 // 681 // We must be careful though not to allow multiple calls to this method from a 682 // connection that is having issue becoming fully established from causing a large 683 // build up of scheduled tasks to clear the same consumers over and over. 684 if (consumers.isEmpty()) { 685 return; 686 } 687 688 if (clearInProgress.compareAndSet(false, true)) { 689 for (final ActiveMQMessageConsumer consumer : consumers) { 690 consumer.inProgressClearRequired(); 691 transportInterruptionProcessingComplete.incrementAndGet(); 692 try { 693 connection.getScheduler().executeAfterDelay(new Runnable() { 694 @Override 695 public void run() { 696 consumer.clearMessagesInProgress(); 697 }}, 0l); 698 } catch (JMSException e) { 699 connection.onClientInternalException(e); 700 } 701 } 702 703 try { 704 connection.getScheduler().executeAfterDelay(new Runnable() { 705 @Override 706 public void run() { 707 clearInProgress.set(false); 708 }}, 0l); 709 } catch (JMSException e) { 710 connection.onClientInternalException(e); 711 } 712 } 713 } 714 715 void deliverAcks() { 716 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 717 ActiveMQMessageConsumer consumer = iter.next(); 718 consumer.deliverAcks(); 719 } 720 } 721 722 public synchronized void dispose() throws JMSException { 723 if (!closed) { 724 725 try { 726 executor.stop(); 727 728 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 729 ActiveMQMessageConsumer consumer = iter.next(); 730 consumer.setFailureError(connection.getFirstFailureError()); 731 consumer.dispose(); 732 lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId()); 733 } 734 consumers.clear(); 735 736 for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) { 737 ActiveMQMessageProducer producer = iter.next(); 738 producer.dispose(); 739 } 740 producers.clear(); 741 742 try { 743 if (getTransactionContext().isInLocalTransaction()) { 744 rollback(); 745 } 746 } catch (JMSException e) { 747 } 748 749 } finally { 750 connection.removeSession(this); 751 this.transactionContext = null; 752 closed = true; 753 } 754 } 755 } 756 757 /** 758 * Checks that the session is not closed then configures the message 759 */ 760 protected void configureMessage(ActiveMQMessage message) throws IllegalStateException { 761 checkClosed(); 762 message.setConnection(connection); 763 } 764 765 /** 766 * Check if the session is closed. It is used for ensuring that the session 767 * is open before performing various operations. 768 * 769 * @throws IllegalStateException if the Session is closed 770 */ 771 protected void checkClosed() throws IllegalStateException { 772 if (closed) { 773 throw new IllegalStateException("The Session is closed"); 774 } 775 } 776 777 /** 778 * Checks if the session is closed. 779 * 780 * @return true if the session is closed, false otherwise. 781 */ 782 public boolean isClosed() { 783 return closed; 784 } 785 786 /** 787 * Stops message delivery in this session, and restarts message delivery 788 * with the oldest unacknowledged message. 789 * <P> 790 * All consumers deliver messages in a serial order. Acknowledging a 791 * received message automatically acknowledges all messages that have been 792 * delivered to the client. 793 * <P> 794 * Restarting a session causes it to take the following actions: 795 * <UL> 796 * <LI>Stop message delivery 797 * <LI>Mark all messages that might have been delivered but not 798 * acknowledged as "redelivered" 799 * <LI>Restart the delivery sequence including all unacknowledged messages 800 * that had been previously delivered. Redelivered messages do not have to 801 * be delivered in exactly their original delivery order. 802 * </UL> 803 * 804 * @throws JMSException if the JMS provider fails to stop and restart 805 * message delivery due to some internal error. 806 * @throws IllegalStateException if the method is called by a transacted 807 * session. 808 */ 809 @Override 810 public void recover() throws JMSException { 811 812 checkClosed(); 813 if (getTransacted()) { 814 throw new IllegalStateException("This session is transacted"); 815 } 816 817 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 818 ActiveMQMessageConsumer c = iter.next(); 819 c.rollback(); 820 } 821 822 } 823 824 /** 825 * Returns the session's distinguished message listener (optional). 826 * 827 * @return the message listener associated with this session 828 * @throws JMSException if the JMS provider fails to get the message 829 * listener due to an internal error. 830 * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener) 831 * @see javax.jms.ServerSessionPool 832 * @see javax.jms.ServerSession 833 */ 834 @Override 835 public MessageListener getMessageListener() throws JMSException { 836 checkClosed(); 837 return this.messageListener; 838 } 839 840 /** 841 * Sets the session's distinguished message listener (optional). 842 * <P> 843 * When the distinguished message listener is set, no other form of message 844 * receipt in the session can be used; however, all forms of sending 845 * messages are still supported. 846 * <P> 847 * If this session has been closed, then an {@link IllegalStateException} is 848 * thrown, if trying to set a new listener. However setting the listener 849 * to <tt>null</tt> is allowed, to clear the listener, even if this session 850 * has been closed prior. 851 * <P> 852 * This is an expert facility not used by regular JMS clients. 853 * 854 * @param listener the message listener to associate with this session 855 * @throws JMSException if the JMS provider fails to set the message 856 * listener due to an internal error. 857 * @see javax.jms.Session#getMessageListener() 858 * @see javax.jms.ServerSessionPool 859 * @see javax.jms.ServerSession 860 */ 861 @Override 862 public void setMessageListener(MessageListener listener) throws JMSException { 863 // only check for closed if we set a new listener, as we allow to clear 864 // the listener, such as when an application is shutting down, and is 865 // no longer using a message listener on this session 866 if (listener != null) { 867 checkClosed(); 868 } 869 this.messageListener = listener; 870 871 if (listener != null) { 872 executor.setDispatchedBySessionPool(true); 873 } 874 } 875 876 /** 877 * Optional operation, intended to be used only by Application Servers, not 878 * by ordinary JMS clients. 879 * 880 * @see javax.jms.ServerSession 881 */ 882 @Override 883 public void run() { 884 MessageDispatch messageDispatch; 885 while ((messageDispatch = executor.dequeueNoWait()) != null) { 886 final MessageDispatch md = messageDispatch; 887 ActiveMQMessage message = (ActiveMQMessage)md.getMessage(); 888 889 MessageAck earlyAck = null; 890 if (message.isExpired()) { 891 earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1); 892 } else if (connection.isDuplicate(ActiveMQSession.this, message)) { 893 LOG.debug("{} got duplicate: {}", this, message.getMessageId()); 894 earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); 895 earlyAck.setFirstMessageId(md.getMessage().getMessageId()); 896 earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this)); 897 } 898 if (earlyAck != null) { 899 try { 900 asyncSendPacket(earlyAck); 901 } catch (Throwable t) { 902 LOG.error("error dispatching ack: {} ", earlyAck, t); 903 connection.onClientInternalException(t); 904 } finally { 905 continue; 906 } 907 } 908 909 if (isClientAcknowledge()||isIndividualAcknowledge()) { 910 message.setAcknowledgeCallback(new Callback() { 911 @Override 912 public void execute() throws Exception { 913 } 914 }); 915 } 916 917 if (deliveryListener != null) { 918 deliveryListener.beforeDelivery(this, message); 919 } 920 921 md.setDeliverySequenceId(getNextDeliveryId()); 922 923 final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); 924 try { 925 ack.setFirstMessageId(md.getMessage().getMessageId()); 926 doStartTransaction(); 927 ack.setTransactionId(getTransactionContext().getTransactionId()); 928 if (ack.getTransactionId() != null) { 929 getTransactionContext().addSynchronization(new Synchronization() { 930 931 final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get()); 932 @Override 933 public void beforeEnd() throws Exception { 934 // validate our consumer so we don't push stale acks that get ignored 935 if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) { 936 LOG.debug("forcing rollback - {} consumer no longer active on {}", ack, connection); 937 throw new TransactionRolledBackException("consumer " + ack.getConsumerId() + " no longer active on " + connection); 938 } 939 LOG.trace("beforeEnd ack {}", ack); 940 sendAck(ack); 941 } 942 943 @Override 944 public void afterRollback() throws Exception { 945 LOG.trace("rollback {}", ack, new Throwable("here")); 946 // ensure we don't filter this as a duplicate 947 connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage()); 948 949 // don't redeliver if we have been interrupted b/c the broker will redeliver on reconnect 950 if (clearRequestsCounter.get() > clearRequestCount) { 951 LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport()); 952 return; 953 } 954 955 // validate our consumer so we don't push stale acks that get ignored or redeliver what will be redispatched 956 if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) { 957 LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport()); 958 return; 959 } 960 961 RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy(); 962 int redeliveryCounter = md.getMessage().getRedeliveryCounter(); 963 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES 964 && redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) { 965 // We need to NACK the messages so that they get 966 // sent to the 967 // DLQ. 968 // Acknowledge the last message. 969 MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); 970 ack.setFirstMessageId(md.getMessage().getMessageId()); 971 ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy)); 972 asyncSendPacket(ack); 973 974 } else { 975 976 MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1); 977 ack.setFirstMessageId(md.getMessage().getMessageId()); 978 asyncSendPacket(ack); 979 980 // Figure out how long we should wait to resend 981 // this message. 982 long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay(); 983 for (int i = 0; i < redeliveryCounter; i++) { 984 redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay); 985 } 986 connection.getScheduler().executeAfterDelay(new Runnable() { 987 988 @Override 989 public void run() { 990 ((ActiveMQDispatcher)md.getConsumer()).dispatch(md); 991 } 992 }, redeliveryDelay); 993 } 994 md.getMessage().onMessageRolledBack(); 995 } 996 }); 997 } 998 999 LOG.trace("{} onMessage({})", this, message.getMessageId()); 1000 messageListener.onMessage(message); 1001 1002 } catch (Throwable e) { 1003 LOG.error("error dispatching message: ", e); 1004 // A problem while invoking the MessageListener does not 1005 // in general indicate a problem with the connection to the broker, i.e. 1006 // it will usually be sufficient to let the afterDelivery() method either 1007 // commit or roll back in order to deal with the exception. 1008 // However, we notify any registered client internal exception listener 1009 // of the problem. 1010 connection.onClientInternalException(e); 1011 } finally { 1012 if (ack.getTransactionId() == null) { 1013 try { 1014 asyncSendPacket(ack); 1015 } catch (Throwable e) { 1016 connection.onClientInternalException(e); 1017 } 1018 } 1019 } 1020 1021 if (deliveryListener != null) { 1022 deliveryListener.afterDelivery(this, message); 1023 } 1024 } 1025 } 1026 1027 /** 1028 * Creates a <CODE>MessageProducer</CODE> to send messages to the 1029 * specified destination. 1030 * <P> 1031 * A client uses a <CODE>MessageProducer</CODE> object to send messages to 1032 * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both 1033 * inherit from <CODE>Destination</CODE>, they can be used in the 1034 * destination parameter to create a <CODE>MessageProducer</CODE> object. 1035 * 1036 * @param destination the <CODE>Destination</CODE> to send to, or null if 1037 * this is a producer which does not have a specified 1038 * destination. 1039 * @return the MessageProducer 1040 * @throws JMSException if the session fails to create a MessageProducer due 1041 * to some internal error. 1042 * @throws InvalidDestinationException if an invalid destination is 1043 * specified. 1044 * @since 1.1 1045 */ 1046 @Override 1047 public MessageProducer createProducer(Destination destination) throws JMSException { 1048 checkClosed(); 1049 if (destination instanceof CustomDestination) { 1050 CustomDestination customDestination = (CustomDestination)destination; 1051 return customDestination.createProducer(this); 1052 } 1053 int timeSendOut = connection.getSendTimeout(); 1054 return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut); 1055 } 1056 1057 /** 1058 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. 1059 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from 1060 * <CODE>Destination</CODE>, they can be used in the destination 1061 * parameter to create a <CODE>MessageConsumer</CODE>. 1062 * 1063 * @param destination the <CODE>Destination</CODE> to access. 1064 * @return the MessageConsumer 1065 * @throws JMSException if the session fails to create a consumer due to 1066 * some internal error. 1067 * @throws InvalidDestinationException if an invalid destination is 1068 * specified. 1069 * @since 1.1 1070 */ 1071 @Override 1072 public MessageConsumer createConsumer(Destination destination) throws JMSException { 1073 return createConsumer(destination, (String) null); 1074 } 1075 1076 /** 1077 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, 1078 * using a message selector. Since <CODE> Queue</CODE> and 1079 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they 1080 * can be used in the destination parameter to create a 1081 * <CODE>MessageConsumer</CODE>. 1082 * <P> 1083 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1084 * that have been sent to a destination. 1085 * 1086 * @param destination the <CODE>Destination</CODE> to access 1087 * @param messageSelector only messages with properties matching the message 1088 * selector expression are delivered. A value of null or an 1089 * empty string indicates that there is no message selector 1090 * for the message consumer. 1091 * @return the MessageConsumer 1092 * @throws JMSException if the session fails to create a MessageConsumer due 1093 * to some internal error. 1094 * @throws InvalidDestinationException if an invalid destination is 1095 * specified. 1096 * @throws InvalidSelectorException if the message selector is invalid. 1097 * @since 1.1 1098 */ 1099 @Override 1100 public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { 1101 return createConsumer(destination, messageSelector, false); 1102 } 1103 1104 /** 1105 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. 1106 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from 1107 * <CODE>Destination</CODE>, they can be used in the destination 1108 * parameter to create a <CODE>MessageConsumer</CODE>. 1109 * 1110 * @param destination the <CODE>Destination</CODE> to access. 1111 * @param messageListener the listener to use for async consumption of messages 1112 * @return the MessageConsumer 1113 * @throws JMSException if the session fails to create a consumer due to 1114 * some internal error. 1115 * @throws InvalidDestinationException if an invalid destination is 1116 * specified. 1117 * @since 1.1 1118 */ 1119 public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException { 1120 return createConsumer(destination, null, messageListener); 1121 } 1122 1123 /** 1124 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, 1125 * using a message selector. Since <CODE> Queue</CODE> and 1126 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they 1127 * can be used in the destination parameter to create a 1128 * <CODE>MessageConsumer</CODE>. 1129 * <P> 1130 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1131 * that have been sent to a destination. 1132 * 1133 * @param destination the <CODE>Destination</CODE> to access 1134 * @param messageSelector only messages with properties matching the message 1135 * selector expression are delivered. A value of null or an 1136 * empty string indicates that there is no message selector 1137 * for the message consumer. 1138 * @param messageListener the listener to use for async consumption of messages 1139 * @return the MessageConsumer 1140 * @throws JMSException if the session fails to create a MessageConsumer due 1141 * to some internal error. 1142 * @throws InvalidDestinationException if an invalid destination is 1143 * specified. 1144 * @throws InvalidSelectorException if the message selector is invalid. 1145 * @since 1.1 1146 */ 1147 public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException { 1148 return createConsumer(destination, messageSelector, false, messageListener); 1149 } 1150 1151 /** 1152 * Creates <CODE>MessageConsumer</CODE> for the specified destination, 1153 * using a message selector. This method can specify whether messages 1154 * published by its own connection should be delivered to it, if the 1155 * destination is a topic. 1156 * <P> 1157 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from 1158 * <CODE>Destination</CODE>, they can be used in the destination 1159 * parameter to create a <CODE>MessageConsumer</CODE>. 1160 * <P> 1161 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1162 * that have been published to a destination. 1163 * <P> 1164 * In some cases, a connection may both publish and subscribe to a topic. 1165 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to 1166 * inhibit the delivery of messages published by its own connection. The 1167 * default value for this attribute is False. The <CODE>noLocal</CODE> 1168 * value must be supported by destinations that are topics. 1169 * 1170 * @param destination the <CODE>Destination</CODE> to access 1171 * @param messageSelector only messages with properties matching the message 1172 * selector expression are delivered. A value of null or an 1173 * empty string indicates that there is no message selector 1174 * for the message consumer. 1175 * @param noLocal - if true, and the destination is a topic, inhibits the 1176 * delivery of messages published by its own connection. The 1177 * behavior for <CODE>NoLocal</CODE> is not specified if 1178 * the destination is a queue. 1179 * @return the MessageConsumer 1180 * @throws JMSException if the session fails to create a MessageConsumer due 1181 * to some internal error. 1182 * @throws InvalidDestinationException if an invalid destination is 1183 * specified. 1184 * @throws InvalidSelectorException if the message selector is invalid. 1185 * @since 1.1 1186 */ 1187 @Override 1188 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { 1189 return createConsumer(destination, messageSelector, noLocal, null); 1190 } 1191 1192 /** 1193 * Creates <CODE>MessageConsumer</CODE> for the specified destination, 1194 * using a message selector. This method can specify whether messages 1195 * published by its own connection should be delivered to it, if the 1196 * destination is a topic. 1197 * <P> 1198 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from 1199 * <CODE>Destination</CODE>, they can be used in the destination 1200 * parameter to create a <CODE>MessageConsumer</CODE>. 1201 * <P> 1202 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1203 * that have been published to a destination. 1204 * <P> 1205 * In some cases, a connection may both publish and subscribe to a topic. 1206 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to 1207 * inhibit the delivery of messages published by its own connection. The 1208 * default value for this attribute is False. The <CODE>noLocal</CODE> 1209 * value must be supported by destinations that are topics. 1210 * 1211 * @param destination the <CODE>Destination</CODE> to access 1212 * @param messageSelector only messages with properties matching the message 1213 * selector expression are delivered. A value of null or an 1214 * empty string indicates that there is no message selector 1215 * for the message consumer. 1216 * @param noLocal - if true, and the destination is a topic, inhibits the 1217 * delivery of messages published by its own connection. The 1218 * behavior for <CODE>NoLocal</CODE> is not specified if 1219 * the destination is a queue. 1220 * @param messageListener the listener to use for async consumption of messages 1221 * @return the MessageConsumer 1222 * @throws JMSException if the session fails to create a MessageConsumer due 1223 * to some internal error. 1224 * @throws InvalidDestinationException if an invalid destination is 1225 * specified. 1226 * @throws InvalidSelectorException if the message selector is invalid. 1227 * @since 1.1 1228 */ 1229 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException { 1230 checkClosed(); 1231 1232 if (destination instanceof CustomDestination) { 1233 CustomDestination customDestination = (CustomDestination)destination; 1234 return customDestination.createConsumer(this, messageSelector, noLocal); 1235 } 1236 1237 ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy(); 1238 int prefetch = 0; 1239 if (destination instanceof Topic) { 1240 prefetch = prefetchPolicy.getTopicPrefetch(); 1241 } else { 1242 prefetch = prefetchPolicy.getQueuePrefetch(); 1243 } 1244 ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination); 1245 return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector, 1246 prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener); 1247 } 1248 1249 /** 1250 * Creates a queue identity given a <CODE>Queue</CODE> name. 1251 * <P> 1252 * This facility is provided for the rare cases where clients need to 1253 * dynamically manipulate queue identity. It allows the creation of a queue 1254 * identity with a provider-specific name. Clients that depend on this 1255 * ability are not portable. 1256 * <P> 1257 * Note that this method is not for creating the physical queue. The 1258 * physical creation of queues is an administrative task and is not to be 1259 * initiated by the JMS API. The one exception is the creation of temporary 1260 * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE> 1261 * method. 1262 * 1263 * @param queueName the name of this <CODE>Queue</CODE> 1264 * @return a <CODE>Queue</CODE> with the given name 1265 * @throws JMSException if the session fails to create a queue due to some 1266 * internal error. 1267 * @since 1.1 1268 */ 1269 @Override 1270 public Queue createQueue(String queueName) throws JMSException { 1271 checkClosed(); 1272 if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) { 1273 return new ActiveMQTempQueue(queueName); 1274 } 1275 return new ActiveMQQueue(queueName); 1276 } 1277 1278 /** 1279 * Creates a topic identity given a <CODE>Topic</CODE> name. 1280 * <P> 1281 * This facility is provided for the rare cases where clients need to 1282 * dynamically manipulate topic identity. This allows the creation of a 1283 * topic identity with a provider-specific name. Clients that depend on this 1284 * ability are not portable. 1285 * <P> 1286 * Note that this method is not for creating the physical topic. The 1287 * physical creation of topics is an administrative task and is not to be 1288 * initiated by the JMS API. The one exception is the creation of temporary 1289 * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE> 1290 * method. 1291 * 1292 * @param topicName the name of this <CODE>Topic</CODE> 1293 * @return a <CODE>Topic</CODE> with the given name 1294 * @throws JMSException if the session fails to create a topic due to some 1295 * internal error. 1296 * @since 1.1 1297 */ 1298 @Override 1299 public Topic createTopic(String topicName) throws JMSException { 1300 checkClosed(); 1301 if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) { 1302 return new ActiveMQTempTopic(topicName); 1303 } 1304 return new ActiveMQTopic(topicName); 1305 } 1306 1307 /** 1308 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1309 * the specified queue. 1310 * 1311 * @param queue the <CODE>queue</CODE> to access 1312 * @exception InvalidDestinationException if an invalid destination is 1313 * specified 1314 * @since 1.1 1315 */ 1316 /** 1317 * Creates a durable subscriber to the specified topic. 1318 * <P> 1319 * If a client needs to receive all the messages published on a topic, 1320 * including the ones published while the subscriber is inactive, it uses a 1321 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a 1322 * record of this durable subscription and insures that all messages from 1323 * the topic's publishers are retained until they are acknowledged by this 1324 * durable subscriber or they have expired. 1325 * <P> 1326 * Sessions with durable subscribers must always provide the same client 1327 * identifier. In addition, each client must specify a name that uniquely 1328 * identifies (within client identifier) each durable subscription it 1329 * creates. Only one session at a time can have a 1330 * <CODE>TopicSubscriber</CODE> for a particular durable subscription. 1331 * <P> 1332 * A client can change an existing durable subscription by creating a 1333 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic 1334 * and/or message selector. Changing a durable subscriber is equivalent to 1335 * unsubscribing (deleting) the old one and creating a new one. 1336 * <P> 1337 * In some cases, a connection may both publish and subscribe to a topic. 1338 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1339 * inhibit the delivery of messages published by its own connection. The 1340 * default value for this attribute is false. 1341 * 1342 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to 1343 * @param name the name used to identify this subscription 1344 * @return the TopicSubscriber 1345 * @throws JMSException if the session fails to create a subscriber due to 1346 * some internal error. 1347 * @throws InvalidDestinationException if an invalid topic is specified. 1348 * @since 1.1 1349 */ 1350 @Override 1351 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { 1352 checkClosed(); 1353 return createDurableSubscriber(topic, name, null, false); 1354 } 1355 1356 /** 1357 * Creates a durable subscriber to the specified topic, using a message 1358 * selector and specifying whether messages published by its own connection 1359 * should be delivered to it. 1360 * <P> 1361 * If a client needs to receive all the messages published on a topic, 1362 * including the ones published while the subscriber is inactive, it uses a 1363 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a 1364 * record of this durable subscription and insures that all messages from 1365 * the topic's publishers are retained until they are acknowledged by this 1366 * durable subscriber or they have expired. 1367 * <P> 1368 * Sessions with durable subscribers must always provide the same client 1369 * identifier. In addition, each client must specify a name which uniquely 1370 * identifies (within client identifier) each durable subscription it 1371 * creates. Only one session at a time can have a 1372 * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An 1373 * inactive durable subscriber is one that exists but does not currently 1374 * have a message consumer associated with it. 1375 * <P> 1376 * A client can change an existing durable subscription by creating a 1377 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic 1378 * and/or message selector. Changing a durable subscriber is equivalent to 1379 * unsubscribing (deleting) the old one and creating a new one. 1380 * 1381 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to 1382 * @param name the name used to identify this subscription 1383 * @param messageSelector only messages with properties matching the message 1384 * selector expression are delivered. A value of null or an 1385 * empty string indicates that there is no message selector 1386 * for the message consumer. 1387 * @param noLocal if set, inhibits the delivery of messages published by its 1388 * own connection 1389 * @return the Queue Browser 1390 * @throws JMSException if the session fails to create a subscriber due to 1391 * some internal error. 1392 * @throws InvalidDestinationException if an invalid topic is specified. 1393 * @throws InvalidSelectorException if the message selector is invalid. 1394 * @since 1.1 1395 */ 1396 @Override 1397 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { 1398 checkClosed(); 1399 1400 if (topic == null) { 1401 throw new InvalidDestinationException("Topic cannot be null"); 1402 } 1403 1404 if (topic instanceof CustomDestination) { 1405 CustomDestination customDestination = (CustomDestination)topic; 1406 return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal); 1407 } 1408 1409 connection.checkClientIDWasManuallySpecified(); 1410 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1411 int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch(); 1412 int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit(); 1413 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit, 1414 noLocal, false, asyncDispatch); 1415 } 1416 1417 /** 1418 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1419 * the specified queue. 1420 * 1421 * @param queue the <CODE>queue</CODE> to access 1422 * @return the Queue Browser 1423 * @throws JMSException if the session fails to create a browser due to some 1424 * internal error. 1425 * @throws InvalidDestinationException if an invalid destination is 1426 * specified 1427 * @since 1.1 1428 */ 1429 @Override 1430 public QueueBrowser createBrowser(Queue queue) throws JMSException { 1431 checkClosed(); 1432 return createBrowser(queue, null); 1433 } 1434 1435 /** 1436 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1437 * the specified queue using a message selector. 1438 * 1439 * @param queue the <CODE>queue</CODE> to access 1440 * @param messageSelector only messages with properties matching the message 1441 * selector expression are delivered. A value of null or an 1442 * empty string indicates that there is no message selector 1443 * for the message consumer. 1444 * @return the Queue Browser 1445 * @throws JMSException if the session fails to create a browser due to some 1446 * internal error. 1447 * @throws InvalidDestinationException if an invalid destination is 1448 * specified 1449 * @throws InvalidSelectorException if the message selector is invalid. 1450 * @since 1.1 1451 */ 1452 @Override 1453 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { 1454 checkClosed(); 1455 return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch); 1456 } 1457 1458 /** 1459 * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that 1460 * of the <CODE>Connection</CODE> unless it is deleted earlier. 1461 * 1462 * @return a temporary queue identity 1463 * @throws JMSException if the session fails to create a temporary queue due 1464 * to some internal error. 1465 * @since 1.1 1466 */ 1467 @Override 1468 public TemporaryQueue createTemporaryQueue() throws JMSException { 1469 checkClosed(); 1470 return (TemporaryQueue)connection.createTempDestination(false); 1471 } 1472 1473 /** 1474 * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that 1475 * of the <CODE>Connection</CODE> unless it is deleted earlier. 1476 * 1477 * @return a temporary topic identity 1478 * @throws JMSException if the session fails to create a temporary topic due 1479 * to some internal error. 1480 * @since 1.1 1481 */ 1482 @Override 1483 public TemporaryTopic createTemporaryTopic() throws JMSException { 1484 checkClosed(); 1485 return (TemporaryTopic)connection.createTempDestination(true); 1486 } 1487 1488 /** 1489 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from 1490 * the specified queue. 1491 * 1492 * @param queue the <CODE>Queue</CODE> to access 1493 * @return 1494 * @throws JMSException if the session fails to create a receiver due to 1495 * some internal error. 1496 * @throws JMSException 1497 * @throws InvalidDestinationException if an invalid queue is specified. 1498 */ 1499 @Override 1500 public QueueReceiver createReceiver(Queue queue) throws JMSException { 1501 checkClosed(); 1502 return createReceiver(queue, null); 1503 } 1504 1505 /** 1506 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from 1507 * the specified queue using a message selector. 1508 * 1509 * @param queue the <CODE>Queue</CODE> to access 1510 * @param messageSelector only messages with properties matching the message 1511 * selector expression are delivered. A value of null or an 1512 * empty string indicates that there is no message selector 1513 * for the message consumer. 1514 * @return QueueReceiver 1515 * @throws JMSException if the session fails to create a receiver due to 1516 * some internal error. 1517 * @throws InvalidDestinationException if an invalid queue is specified. 1518 * @throws InvalidSelectorException if the message selector is invalid. 1519 */ 1520 @Override 1521 public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { 1522 checkClosed(); 1523 1524 if (queue instanceof CustomDestination) { 1525 CustomDestination customDestination = (CustomDestination)queue; 1526 return customDestination.createReceiver(this, messageSelector); 1527 } 1528 1529 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1530 return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(), 1531 prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch); 1532 } 1533 1534 /** 1535 * Creates a <CODE>QueueSender</CODE> object to send messages to the 1536 * specified queue. 1537 * 1538 * @param queue the <CODE>Queue</CODE> to access, or null if this is an 1539 * unidentified producer 1540 * @return QueueSender 1541 * @throws JMSException if the session fails to create a sender due to some 1542 * internal error. 1543 * @throws InvalidDestinationException if an invalid queue is specified. 1544 */ 1545 @Override 1546 public QueueSender createSender(Queue queue) throws JMSException { 1547 checkClosed(); 1548 if (queue instanceof CustomDestination) { 1549 CustomDestination customDestination = (CustomDestination)queue; 1550 return customDestination.createSender(this); 1551 } 1552 int timeSendOut = connection.getSendTimeout(); 1553 return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut); 1554 } 1555 1556 /** 1557 * Creates a nondurable subscriber to the specified topic. <p/> 1558 * <P> 1559 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages 1560 * that have been published to a topic. <p/> 1561 * <P> 1562 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They 1563 * receive only messages that are published while they are active. <p/> 1564 * <P> 1565 * In some cases, a connection may both publish and subscribe to a topic. 1566 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1567 * inhibit the delivery of messages published by its own connection. The 1568 * default value for this attribute is false. 1569 * 1570 * @param topic the <CODE>Topic</CODE> to subscribe to 1571 * @return TopicSubscriber 1572 * @throws JMSException if the session fails to create a subscriber due to 1573 * some internal error. 1574 * @throws InvalidDestinationException if an invalid topic is specified. 1575 */ 1576 @Override 1577 public TopicSubscriber createSubscriber(Topic topic) throws JMSException { 1578 checkClosed(); 1579 return createSubscriber(topic, null, false); 1580 } 1581 1582 /** 1583 * Creates a nondurable subscriber to the specified topic, using a message 1584 * selector or specifying whether messages published by its own connection 1585 * should be delivered to it. <p/> 1586 * <P> 1587 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages 1588 * that have been published to a topic. <p/> 1589 * <P> 1590 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They 1591 * receive only messages that are published while they are active. <p/> 1592 * <P> 1593 * Messages filtered out by a subscriber's message selector will never be 1594 * delivered to the subscriber. From the subscriber's perspective, they do 1595 * not exist. <p/> 1596 * <P> 1597 * In some cases, a connection may both publish and subscribe to a topic. 1598 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1599 * inhibit the delivery of messages published by its own connection. The 1600 * default value for this attribute is false. 1601 * 1602 * @param topic the <CODE>Topic</CODE> to subscribe to 1603 * @param messageSelector only messages with properties matching the message 1604 * selector expression are delivered. A value of null or an 1605 * empty string indicates that there is no message selector 1606 * for the message consumer. 1607 * @param noLocal if set, inhibits the delivery of messages published by its 1608 * own connection 1609 * @return TopicSubscriber 1610 * @throws JMSException if the session fails to create a subscriber due to 1611 * some internal error. 1612 * @throws InvalidDestinationException if an invalid topic is specified. 1613 * @throws InvalidSelectorException if the message selector is invalid. 1614 */ 1615 @Override 1616 public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { 1617 checkClosed(); 1618 1619 if (topic instanceof CustomDestination) { 1620 CustomDestination customDestination = (CustomDestination)topic; 1621 return customDestination.createSubscriber(this, messageSelector, noLocal); 1622 } 1623 1624 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1625 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy 1626 .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch); 1627 } 1628 1629 /** 1630 * Creates a publisher for the specified topic. <p/> 1631 * <P> 1632 * A client uses a <CODE>TopicPublisher</CODE> object to publish messages 1633 * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on 1634 * a topic, it defines a new sequence of messages that have no ordering 1635 * relationship with the messages it has previously sent. 1636 * 1637 * @param topic the <CODE>Topic</CODE> to publish to, or null if this is 1638 * an unidentified producer 1639 * @return TopicPublisher 1640 * @throws JMSException if the session fails to create a publisher due to 1641 * some internal error. 1642 * @throws InvalidDestinationException if an invalid topic is specified. 1643 */ 1644 @Override 1645 public TopicPublisher createPublisher(Topic topic) throws JMSException { 1646 checkClosed(); 1647 1648 if (topic instanceof CustomDestination) { 1649 CustomDestination customDestination = (CustomDestination)topic; 1650 return customDestination.createPublisher(this); 1651 } 1652 int timeSendOut = connection.getSendTimeout(); 1653 return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut); 1654 } 1655 1656 /** 1657 * Unsubscribes a durable subscription that has been created by a client. 1658 * <P> 1659 * This method deletes the state being maintained on behalf of the 1660 * subscriber by its provider. 1661 * <P> 1662 * It is erroneous for a client to delete a durable subscription while there 1663 * is an active <CODE>MessageConsumer </CODE> or 1664 * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed 1665 * message is part of a pending transaction or has not been acknowledged in 1666 * the session. 1667 * 1668 * @param name the name used to identify this subscription 1669 * @throws JMSException if the session fails to unsubscribe to the durable 1670 * subscription due to some internal error. 1671 * @throws InvalidDestinationException if an invalid subscription name is 1672 * specified. 1673 * @since 1.1 1674 */ 1675 @Override 1676 public void unsubscribe(String name) throws JMSException { 1677 checkClosed(); 1678 connection.unsubscribe(name); 1679 } 1680 1681 @Override 1682 public void dispatch(MessageDispatch messageDispatch) { 1683 try { 1684 executor.execute(messageDispatch); 1685 } catch (InterruptedException e) { 1686 Thread.currentThread().interrupt(); 1687 connection.onClientInternalException(e); 1688 } 1689 } 1690 1691 /** 1692 * Acknowledges all consumed messages of the session of this consumed 1693 * message. 1694 * <P> 1695 * All consumed JMS messages support the <CODE>acknowledge</CODE> method 1696 * for use when a client has specified that its JMS session's consumed 1697 * messages are to be explicitly acknowledged. By invoking 1698 * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges 1699 * all messages consumed by the session that the message was delivered to. 1700 * <P> 1701 * Calls to <CODE>acknowledge</CODE> are ignored for both transacted 1702 * sessions and sessions specified to use implicit acknowledgement modes. 1703 * <P> 1704 * A client may individually acknowledge each message as it is consumed, or 1705 * it may choose to acknowledge messages as an application-defined group 1706 * (which is done by calling acknowledge on the last received message of the 1707 * group, thereby acknowledging all messages consumed by the session.) 1708 * <P> 1709 * Messages that have been received but not acknowledged may be redelivered. 1710 * 1711 * @throws JMSException if the JMS provider fails to acknowledge the 1712 * messages due to some internal error. 1713 * @throws javax.jms.IllegalStateException if this method is called on a 1714 * closed session. 1715 * @see javax.jms.Session#CLIENT_ACKNOWLEDGE 1716 */ 1717 public void acknowledge() throws JMSException { 1718 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1719 ActiveMQMessageConsumer c = iter.next(); 1720 c.acknowledge(); 1721 } 1722 } 1723 1724 /** 1725 * Add a message consumer. 1726 * 1727 * @param consumer - message consumer. 1728 * @throws JMSException 1729 */ 1730 protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException { 1731 this.consumers.add(consumer); 1732 if (consumer.isDurableSubscriber()) { 1733 stats.onCreateDurableSubscriber(); 1734 } 1735 this.connection.addDispatcher(consumer.getConsumerId(), this); 1736 } 1737 1738 /** 1739 * Remove the message consumer. 1740 * 1741 * @param consumer - consumer to be removed. 1742 * @throws JMSException 1743 */ 1744 protected void removeConsumer(ActiveMQMessageConsumer consumer) { 1745 this.connection.removeDispatcher(consumer.getConsumerId()); 1746 if (consumer.isDurableSubscriber()) { 1747 stats.onRemoveDurableSubscriber(); 1748 } 1749 this.consumers.remove(consumer); 1750 this.connection.removeDispatcher(consumer); 1751 } 1752 1753 /** 1754 * Adds a message producer. 1755 * 1756 * @param producer - message producer to be added. 1757 * @throws JMSException 1758 */ 1759 protected void addProducer(ActiveMQMessageProducer producer) throws JMSException { 1760 this.producers.add(producer); 1761 this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer); 1762 } 1763 1764 /** 1765 * Removes a message producer. 1766 * 1767 * @param producer - message producer to be removed. 1768 * @throws JMSException 1769 */ 1770 protected void removeProducer(ActiveMQMessageProducer producer) { 1771 this.connection.removeProducer(producer.getProducerInfo().getProducerId()); 1772 this.producers.remove(producer); 1773 } 1774 1775 /** 1776 * Start this Session. 1777 * 1778 * @throws JMSException 1779 */ 1780 protected void start() throws JMSException { 1781 started.set(true); 1782 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1783 ActiveMQMessageConsumer c = iter.next(); 1784 c.start(); 1785 } 1786 executor.start(); 1787 } 1788 1789 /** 1790 * Stops this session. 1791 * 1792 * @throws JMSException 1793 */ 1794 protected void stop() throws JMSException { 1795 1796 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1797 ActiveMQMessageConsumer c = iter.next(); 1798 c.stop(); 1799 } 1800 1801 started.set(false); 1802 executor.stop(); 1803 } 1804 1805 /** 1806 * Returns the session id. 1807 * 1808 * @return value - session id. 1809 */ 1810 protected SessionId getSessionId() { 1811 return info.getSessionId(); 1812 } 1813 1814 /** 1815 * @return 1816 */ 1817 protected ConsumerId getNextConsumerId() { 1818 return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId()); 1819 } 1820 1821 /** 1822 * @return 1823 */ 1824 protected ProducerId getNextProducerId() { 1825 return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId()); 1826 } 1827 1828 /** 1829 * Sends the message for dispatch by the broker. 1830 * 1831 * 1832 * @param producer - message producer. 1833 * @param destination - message destination. 1834 * @param message - message to be sent. 1835 * @param deliveryMode - JMS messsage delivery mode. 1836 * @param priority - message priority. 1837 * @param timeToLive - message expiration. 1838 * @param producerWindow 1839 * @param onComplete 1840 * @throws JMSException 1841 */ 1842 protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, 1843 MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException { 1844 1845 checkClosed(); 1846 if (destination.isTemporary() && connection.isDeleted(destination)) { 1847 throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination); 1848 } 1849 synchronized (sendMutex) { 1850 // tell the Broker we are about to start a new transaction 1851 doStartTransaction(); 1852 TransactionId txid = transactionContext.getTransactionId(); 1853 long sequenceNumber = producer.getMessageSequence(); 1854 1855 //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11 1856 message.setJMSDeliveryMode(deliveryMode); 1857 long expiration = 0L; 1858 if (!producer.getDisableMessageTimestamp()) { 1859 long timeStamp = System.currentTimeMillis(); 1860 message.setJMSTimestamp(timeStamp); 1861 if (timeToLive > 0) { 1862 expiration = timeToLive + timeStamp; 1863 } 1864 } 1865 message.setJMSExpiration(expiration); 1866 message.setJMSPriority(priority); 1867 message.setJMSRedelivered(false); 1868 1869 // transform to our own message format here 1870 ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection); 1871 msg.setDestination(destination); 1872 msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber)); 1873 1874 // Set the message id. 1875 if (msg != message) { 1876 message.setJMSMessageID(msg.getMessageId().toString()); 1877 // Make sure the JMS destination is set on the foreign messages too. 1878 message.setJMSDestination(destination); 1879 } 1880 //clear the brokerPath in case we are re-sending this message 1881 msg.setBrokerPath(null); 1882 1883 msg.setTransactionId(txid); 1884 if (connection.isCopyMessageOnSend()) { 1885 msg = (ActiveMQMessage)msg.copy(); 1886 } 1887 msg.setConnection(connection); 1888 msg.onSend(); 1889 msg.setProducerId(msg.getMessageId().getProducerId()); 1890 if (LOG.isTraceEnabled()) { 1891 LOG.trace(getSessionId() + " sending message: " + msg); 1892 } 1893 if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) { 1894 this.connection.asyncSendPacket(msg); 1895 if (producerWindow != null) { 1896 // Since we defer lots of the marshaling till we hit the 1897 // wire, this might not 1898 // provide and accurate size. We may change over to doing 1899 // more aggressive marshaling, 1900 // to get more accurate sizes.. this is more important once 1901 // users start using producer window 1902 // flow control. 1903 int size = msg.getSize(); 1904 producerWindow.increaseUsage(size); 1905 } 1906 } else { 1907 if (sendTimeout > 0 && onComplete==null) { 1908 this.connection.syncSendPacket(msg,sendTimeout); 1909 }else { 1910 this.connection.syncSendPacket(msg, onComplete); 1911 } 1912 } 1913 1914 } 1915 } 1916 1917 /** 1918 * Send TransactionInfo to indicate transaction has started 1919 * 1920 * @throws JMSException if some internal error occurs 1921 */ 1922 protected void doStartTransaction() throws JMSException { 1923 if (getTransacted() && !transactionContext.isInXATransaction()) { 1924 transactionContext.begin(); 1925 } 1926 } 1927 1928 /** 1929 * Checks whether the session has unconsumed messages. 1930 * 1931 * @return true - if there are unconsumed messages. 1932 */ 1933 public boolean hasUncomsumedMessages() { 1934 return executor.hasUncomsumedMessages(); 1935 } 1936 1937 /** 1938 * Checks whether the session uses transactions. 1939 * 1940 * @return true - if the session uses transactions. 1941 */ 1942 public boolean isTransacted() { 1943 return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction()); 1944 } 1945 1946 /** 1947 * Checks whether the session used client acknowledgment. 1948 * 1949 * @return true - if the session uses client acknowledgment. 1950 */ 1951 protected boolean isClientAcknowledge() { 1952 return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE; 1953 } 1954 1955 /** 1956 * Checks whether the session used auto acknowledgment. 1957 * 1958 * @return true - if the session uses client acknowledgment. 1959 */ 1960 public boolean isAutoAcknowledge() { 1961 return acknowledgementMode == Session.AUTO_ACKNOWLEDGE; 1962 } 1963 1964 /** 1965 * Checks whether the session used dup ok acknowledgment. 1966 * 1967 * @return true - if the session uses client acknowledgment. 1968 */ 1969 public boolean isDupsOkAcknowledge() { 1970 return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE; 1971 } 1972 1973 public boolean isIndividualAcknowledge(){ 1974 return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE; 1975 } 1976 1977 /** 1978 * Returns the message delivery listener. 1979 * 1980 * @return deliveryListener - message delivery listener. 1981 */ 1982 public DeliveryListener getDeliveryListener() { 1983 return deliveryListener; 1984 } 1985 1986 /** 1987 * Sets the message delivery listener. 1988 * 1989 * @param deliveryListener - message delivery listener. 1990 */ 1991 public void setDeliveryListener(DeliveryListener deliveryListener) { 1992 this.deliveryListener = deliveryListener; 1993 } 1994 1995 /** 1996 * Returns the SessionInfo bean. 1997 * 1998 * @return info - SessionInfo bean. 1999 * @throws JMSException 2000 */ 2001 protected SessionInfo getSessionInfo() throws JMSException { 2002 SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue()); 2003 return info; 2004 } 2005 2006 /** 2007 * Send the asynchronus command. 2008 * 2009 * @param command - command to be executed. 2010 * @throws JMSException 2011 */ 2012 public void asyncSendPacket(Command command) throws JMSException { 2013 connection.asyncSendPacket(command); 2014 } 2015 2016 /** 2017 * Send the synchronus command. 2018 * 2019 * @param command - command to be executed. 2020 * @return Response 2021 * @throws JMSException 2022 */ 2023 public Response syncSendPacket(Command command) throws JMSException { 2024 return connection.syncSendPacket(command); 2025 } 2026 2027 public long getNextDeliveryId() { 2028 return deliveryIdGenerator.getNextSequenceId(); 2029 } 2030 2031 public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException { 2032 2033 List<MessageDispatch> c = unconsumedMessages.removeAll(); 2034 for (MessageDispatch md : c) { 2035 this.connection.rollbackDuplicate(dispatcher, md.getMessage()); 2036 } 2037 Collections.reverse(c); 2038 2039 for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) { 2040 MessageDispatch md = iter.next(); 2041 executor.executeFirst(md); 2042 } 2043 2044 } 2045 2046 public boolean isRunning() { 2047 return started.get(); 2048 } 2049 2050 public boolean isAsyncDispatch() { 2051 return asyncDispatch; 2052 } 2053 2054 public void setAsyncDispatch(boolean asyncDispatch) { 2055 this.asyncDispatch = asyncDispatch; 2056 } 2057 2058 /** 2059 * @return Returns the sessionAsyncDispatch. 2060 */ 2061 public boolean isSessionAsyncDispatch() { 2062 return sessionAsyncDispatch; 2063 } 2064 2065 /** 2066 * @param sessionAsyncDispatch The sessionAsyncDispatch to set. 2067 */ 2068 public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) { 2069 this.sessionAsyncDispatch = sessionAsyncDispatch; 2070 } 2071 2072 public MessageTransformer getTransformer() { 2073 return transformer; 2074 } 2075 2076 public ActiveMQConnection getConnection() { 2077 return connection; 2078 } 2079 2080 /** 2081 * Sets the transformer used to transform messages before they are sent on 2082 * to the JMS bus or when they are received from the bus but before they are 2083 * delivered to the JMS client 2084 */ 2085 public void setTransformer(MessageTransformer transformer) { 2086 this.transformer = transformer; 2087 } 2088 2089 public BlobTransferPolicy getBlobTransferPolicy() { 2090 return blobTransferPolicy; 2091 } 2092 2093 /** 2094 * Sets the policy used to describe how out-of-band BLOBs (Binary Large 2095 * OBjects) are transferred from producers to brokers to consumers 2096 */ 2097 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { 2098 this.blobTransferPolicy = blobTransferPolicy; 2099 } 2100 2101 public List<MessageDispatch> getUnconsumedMessages() { 2102 return executor.getUnconsumedMessages(); 2103 } 2104 2105 @Override 2106 public String toString() { 2107 return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "}"; 2108 } 2109 2110 public void checkMessageListener() throws JMSException { 2111 if (messageListener != null) { 2112 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); 2113 } 2114 for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) { 2115 ActiveMQMessageConsumer consumer = i.next(); 2116 if (consumer.hasMessageListener()) { 2117 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); 2118 } 2119 } 2120 } 2121 2122 protected void setOptimizeAcknowledge(boolean value) { 2123 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2124 ActiveMQMessageConsumer c = iter.next(); 2125 c.setOptimizeAcknowledge(value); 2126 } 2127 } 2128 2129 protected void setPrefetchSize(ConsumerId id, int prefetch) { 2130 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2131 ActiveMQMessageConsumer c = iter.next(); 2132 if (c.getConsumerId().equals(id)) { 2133 c.setPrefetchSize(prefetch); 2134 break; 2135 } 2136 } 2137 } 2138 2139 protected void close(ConsumerId id) { 2140 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2141 ActiveMQMessageConsumer c = iter.next(); 2142 if (c.getConsumerId().equals(id)) { 2143 try { 2144 c.close(); 2145 } catch (JMSException e) { 2146 LOG.warn("Exception closing consumer", e); 2147 } 2148 LOG.warn("Closed consumer on Command, " + id); 2149 break; 2150 } 2151 } 2152 } 2153 2154 public boolean isInUse(ActiveMQTempDestination destination) { 2155 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2156 ActiveMQMessageConsumer c = iter.next(); 2157 if (c.isInUse(destination)) { 2158 return true; 2159 } 2160 } 2161 return false; 2162 } 2163 2164 /** 2165 * highest sequence id of the last message delivered by this session. 2166 * Passed to the broker in the close command, maintained by dispose() 2167 * @return lastDeliveredSequenceId 2168 */ 2169 public long getLastDeliveredSequenceId() { 2170 return lastDeliveredSequenceId; 2171 } 2172 2173 protected void sendAck(MessageAck ack) throws JMSException { 2174 sendAck(ack,false); 2175 } 2176 2177 protected void sendAck(MessageAck ack, boolean lazy) throws JMSException { 2178 if (lazy || connection.isSendAcksAsync() || getTransacted()) { 2179 asyncSendPacket(ack); 2180 } else { 2181 syncSendPacket(ack); 2182 } 2183 } 2184 2185 protected Scheduler getScheduler() throws JMSException { 2186 return this.connection.getScheduler(); 2187 } 2188 2189 protected ThreadPoolExecutor getConnectionExecutor() { 2190 return this.connectionExecutor; 2191 } 2192}