001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.io.IOException; 020import java.io.InputStream; 021import java.io.OutputStream; 022import java.net.URI; 023import java.net.URISyntaxException; 024import java.util.HashMap; 025import java.util.Iterator; 026import java.util.Map; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.CopyOnWriteArrayList; 029import java.util.concurrent.CountDownLatch; 030import java.util.concurrent.LinkedBlockingQueue; 031import java.util.concurrent.RejectedExecutionHandler; 032import java.util.concurrent.ThreadFactory; 033import java.util.concurrent.ThreadPoolExecutor; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicBoolean; 036import java.util.concurrent.atomic.AtomicInteger; 037 038import javax.jms.Connection; 039import javax.jms.ConnectionConsumer; 040import javax.jms.ConnectionMetaData; 041import javax.jms.DeliveryMode; 042import javax.jms.Destination; 043import javax.jms.ExceptionListener; 044import javax.jms.IllegalStateException; 045import javax.jms.InvalidDestinationException; 046import javax.jms.JMSException; 047import javax.jms.Queue; 048import javax.jms.QueueConnection; 049import javax.jms.QueueSession; 050import javax.jms.ServerSessionPool; 051import javax.jms.Session; 052import javax.jms.Topic; 053import javax.jms.TopicConnection; 054import javax.jms.TopicSession; 055import javax.jms.XAConnection; 056 057import org.apache.activemq.advisory.DestinationSource; 058import org.apache.activemq.blob.BlobTransferPolicy; 059import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap; 060import org.apache.activemq.command.ActiveMQDestination; 061import org.apache.activemq.command.ActiveMQMessage; 062import org.apache.activemq.command.ActiveMQTempDestination; 063import org.apache.activemq.command.ActiveMQTempQueue; 064import org.apache.activemq.command.ActiveMQTempTopic; 065import org.apache.activemq.command.BrokerInfo; 066import org.apache.activemq.command.Command; 067import org.apache.activemq.command.CommandTypes; 068import org.apache.activemq.command.ConnectionControl; 069import org.apache.activemq.command.ConnectionError; 070import org.apache.activemq.command.ConnectionId; 071import org.apache.activemq.command.ConnectionInfo; 072import org.apache.activemq.command.ConsumerControl; 073import org.apache.activemq.command.ConsumerId; 074import org.apache.activemq.command.ConsumerInfo; 075import org.apache.activemq.command.ControlCommand; 076import org.apache.activemq.command.DestinationInfo; 077import org.apache.activemq.command.ExceptionResponse; 078import org.apache.activemq.command.Message; 079import org.apache.activemq.command.MessageDispatch; 080import org.apache.activemq.command.MessageId; 081import org.apache.activemq.command.ProducerAck; 082import org.apache.activemq.command.ProducerId; 083import org.apache.activemq.command.RemoveInfo; 084import org.apache.activemq.command.RemoveSubscriptionInfo; 085import org.apache.activemq.command.Response; 086import org.apache.activemq.command.SessionId; 087import org.apache.activemq.command.ShutdownInfo; 088import org.apache.activemq.command.WireFormatInfo; 089import org.apache.activemq.management.JMSConnectionStatsImpl; 090import org.apache.activemq.management.JMSStatsImpl; 091import org.apache.activemq.management.StatsCapable; 092import org.apache.activemq.management.StatsImpl; 093import org.apache.activemq.state.CommandVisitorAdapter; 094import org.apache.activemq.thread.Scheduler; 095import org.apache.activemq.thread.TaskRunnerFactory; 096import org.apache.activemq.transport.FutureResponse; 097import org.apache.activemq.transport.RequestTimedOutIOException; 098import org.apache.activemq.transport.ResponseCallback; 099import org.apache.activemq.transport.Transport; 100import org.apache.activemq.transport.TransportListener; 101import org.apache.activemq.transport.failover.FailoverTransport; 102import org.apache.activemq.util.IdGenerator; 103import org.apache.activemq.util.IntrospectionSupport; 104import org.apache.activemq.util.JMSExceptionSupport; 105import org.apache.activemq.util.LongSequenceGenerator; 106import org.apache.activemq.util.ServiceSupport; 107import org.apache.activemq.util.ThreadPoolUtils; 108import org.slf4j.Logger; 109import org.slf4j.LoggerFactory; 110 111public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection { 112 113 public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER; 114 public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; 115 public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; 116 public static int DEFAULT_THREAD_POOL_SIZE = 1000; 117 118 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class); 119 120 public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>(); 121 122 protected boolean dispatchAsync=true; 123 protected boolean alwaysSessionAsync = true; 124 125 private TaskRunnerFactory sessionTaskRunner; 126 private final ThreadPoolExecutor executor; 127 128 // Connection state variables 129 private final ConnectionInfo info; 130 private ExceptionListener exceptionListener; 131 private ClientInternalExceptionListener clientInternalExceptionListener; 132 private boolean clientIDSet; 133 private boolean isConnectionInfoSentToBroker; 134 private boolean userSpecifiedClientID; 135 136 // Configuration options variables 137 private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); 138 private BlobTransferPolicy blobTransferPolicy; 139 private RedeliveryPolicyMap redeliveryPolicyMap; 140 private MessageTransformer transformer; 141 142 private boolean disableTimeStampsByDefault; 143 private boolean optimizedMessageDispatch = true; 144 private boolean copyMessageOnSend = true; 145 private boolean useCompression; 146 private boolean objectMessageSerializationDefered; 147 private boolean useAsyncSend; 148 private boolean optimizeAcknowledge; 149 private long optimizeAcknowledgeTimeOut = 0; 150 private long optimizedAckScheduledAckInterval = 0; 151 private boolean nestedMapAndListEnabled = true; 152 private boolean useRetroactiveConsumer; 153 private boolean exclusiveConsumer; 154 private boolean alwaysSyncSend; 155 private int closeTimeout = 15000; 156 private boolean watchTopicAdvisories = true; 157 private long warnAboutUnstartedConnectionTimeout = 500L; 158 private int sendTimeout =0; 159 private boolean sendAcksAsync=true; 160 private boolean checkForDuplicates = true; 161 private boolean queueOnlyConnection = false; 162 private boolean consumerExpiryCheckEnabled = true; 163 164 private final Transport transport; 165 private final IdGenerator clientIdGenerator; 166 private final JMSStatsImpl factoryStats; 167 private final JMSConnectionStatsImpl stats; 168 169 private final AtomicBoolean started = new AtomicBoolean(false); 170 private final AtomicBoolean closing = new AtomicBoolean(false); 171 private final AtomicBoolean closed = new AtomicBoolean(false); 172 private final AtomicBoolean transportFailed = new AtomicBoolean(false); 173 private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>(); 174 private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>(); 175 private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>(); 176 // Stream are deprecated and will be removed in a later release. 177 private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>(); 178 private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>(); 179 180 // Maps ConsumerIds to ActiveMQConsumer objects 181 private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>(); 182 private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>(); 183 private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator(); 184 private final SessionId connectionSessionId; 185 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 186 private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator(); 187 private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator(); 188 private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator(); 189 190 private AdvisoryConsumer advisoryConsumer; 191 private final CountDownLatch brokerInfoReceived = new CountDownLatch(1); 192 private BrokerInfo brokerInfo; 193 private IOException firstFailureError; 194 private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE; 195 196 // Assume that protocol is the latest. Change to the actual protocol 197 // version when a WireFormatInfo is received. 198 private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); 199 private final long timeCreated; 200 private final ConnectionAudit connectionAudit = new ConnectionAudit(); 201 private DestinationSource destinationSource; 202 private final Object ensureConnectionInfoSentMutex = new Object(); 203 private boolean useDedicatedTaskRunner; 204 protected AtomicInteger transportInterruptionProcessingComplete = new AtomicInteger(0); 205 private long consumerFailoverRedeliveryWaitPeriod; 206 private Scheduler scheduler; 207 private boolean messagePrioritySupported = false; 208 private boolean transactedIndividualAck = false; 209 private boolean nonBlockingRedelivery = false; 210 private boolean rmIdFromConnectionId = false; 211 212 private int maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE; 213 private RejectedExecutionHandler rejectedTaskHandler = null; 214 215 /** 216 * Construct an <code>ActiveMQConnection</code> 217 * 218 * @param transport 219 * @param factoryStats 220 * @throws Exception 221 */ 222 protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception { 223 224 this.transport = transport; 225 this.clientIdGenerator = clientIdGenerator; 226 this.factoryStats = factoryStats; 227 228 // Configure a single threaded executor who's core thread can timeout if 229 // idle 230 executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { 231 @Override 232 public Thread newThread(Runnable r) { 233 Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport); 234 //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796 235 //thread.setDaemon(true); 236 return thread; 237 } 238 }); 239 // asyncConnectionThread.allowCoreThreadTimeOut(true); 240 String uniqueId = connectionIdGenerator.generateId(); 241 this.info = new ConnectionInfo(new ConnectionId(uniqueId)); 242 this.info.setManageable(true); 243 this.info.setFaultTolerant(transport.isFaultTolerant()); 244 this.connectionSessionId = new SessionId(info.getConnectionId(), -1); 245 246 this.transport.setTransportListener(this); 247 248 this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection); 249 this.factoryStats.addConnection(this); 250 this.timeCreated = System.currentTimeMillis(); 251 this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant()); 252 } 253 254 protected void setUserName(String userName) { 255 this.info.setUserName(userName); 256 } 257 258 protected void setPassword(String password) { 259 this.info.setPassword(password); 260 } 261 262 /** 263 * A static helper method to create a new connection 264 * 265 * @return an ActiveMQConnection 266 * @throws JMSException 267 */ 268 public static ActiveMQConnection makeConnection() throws JMSException { 269 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); 270 return (ActiveMQConnection)factory.createConnection(); 271 } 272 273 /** 274 * A static helper method to create a new connection 275 * 276 * @param uri 277 * @return and ActiveMQConnection 278 * @throws JMSException 279 */ 280 public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException { 281 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri); 282 return (ActiveMQConnection)factory.createConnection(); 283 } 284 285 /** 286 * A static helper method to create a new connection 287 * 288 * @param user 289 * @param password 290 * @param uri 291 * @return an ActiveMQConnection 292 * @throws JMSException 293 */ 294 public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException { 295 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri)); 296 return (ActiveMQConnection)factory.createConnection(); 297 } 298 299 /** 300 * @return a number unique for this connection 301 */ 302 public JMSConnectionStatsImpl getConnectionStats() { 303 return stats; 304 } 305 306 /** 307 * Creates a <CODE>Session</CODE> object. 308 * 309 * @param transacted indicates whether the session is transacted 310 * @param acknowledgeMode indicates whether the consumer or the client will 311 * acknowledge any messages it receives; ignored if the 312 * session is transacted. Legal values are 313 * <code>Session.AUTO_ACKNOWLEDGE</code>, 314 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and 315 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. 316 * @return a newly created session 317 * @throws JMSException if the <CODE>Connection</CODE> object fails to 318 * create a session due to some internal error or lack of 319 * support for the specific transaction and acknowledgement 320 * mode. 321 * @see Session#AUTO_ACKNOWLEDGE 322 * @see Session#CLIENT_ACKNOWLEDGE 323 * @see Session#DUPS_OK_ACKNOWLEDGE 324 * @since 1.1 325 */ 326 @Override 327 public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { 328 checkClosedOrFailed(); 329 ensureConnectionInfoSent(); 330 if(!transacted) { 331 if (acknowledgeMode==Session.SESSION_TRANSACTED) { 332 throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session"); 333 } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) { 334 throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " + 335 "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)"); 336 } 337 } 338 return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED 339 ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync()); 340 } 341 342 /** 343 * @return sessionId 344 */ 345 protected SessionId getNextSessionId() { 346 return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId()); 347 } 348 349 /** 350 * Gets the client identifier for this connection. 351 * <P> 352 * This value is specific to the JMS provider. It is either preconfigured by 353 * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned 354 * dynamically by the application by calling the <code>setClientID</code> 355 * method. 356 * 357 * @return the unique client identifier 358 * @throws JMSException if the JMS provider fails to return the client ID 359 * for this connection due to some internal error. 360 */ 361 @Override 362 public String getClientID() throws JMSException { 363 checkClosedOrFailed(); 364 return this.info.getClientId(); 365 } 366 367 /** 368 * Sets the client identifier for this connection. 369 * <P> 370 * The preferred way to assign a JMS client's client identifier is for it to 371 * be configured in a client-specific <CODE>ConnectionFactory</CODE> 372 * object and transparently assigned to the <CODE>Connection</CODE> object 373 * it creates. 374 * <P> 375 * Alternatively, a client can set a connection's client identifier using a 376 * provider-specific value. The facility to set a connection's client 377 * identifier explicitly is not a mechanism for overriding the identifier 378 * that has been administratively configured. It is provided for the case 379 * where no administratively specified identifier exists. If one does exist, 380 * an attempt to change it by setting it must throw an 381 * <CODE>IllegalStateException</CODE>. If a client sets the client 382 * identifier explicitly, it must do so immediately after it creates the 383 * connection and before any other action on the connection is taken. After 384 * this point, setting the client identifier is a programming error that 385 * should throw an <CODE>IllegalStateException</CODE>. 386 * <P> 387 * The purpose of the client identifier is to associate a connection and its 388 * objects with a state maintained on behalf of the client by a provider. 389 * The only such state identified by the JMS API is that required to support 390 * durable subscriptions. 391 * <P> 392 * If another connection with the same <code>clientID</code> is already 393 * running when this method is called, the JMS provider should detect the 394 * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>. 395 * 396 * @param newClientID the unique client identifier 397 * @throws JMSException if the JMS provider fails to set the client ID for 398 * this connection due to some internal error. 399 * @throws javax.jms.InvalidClientIDException if the JMS client specifies an 400 * invalid or duplicate client ID. 401 * @throws javax.jms.IllegalStateException if the JMS client attempts to set 402 * a connection's client ID at the wrong time or when it has 403 * been administratively configured. 404 */ 405 @Override 406 public void setClientID(String newClientID) throws JMSException { 407 checkClosedOrFailed(); 408 409 if (this.clientIDSet) { 410 throw new IllegalStateException("The clientID has already been set"); 411 } 412 413 if (this.isConnectionInfoSentToBroker) { 414 throw new IllegalStateException("Setting clientID on a used Connection is not allowed"); 415 } 416 417 this.info.setClientId(newClientID); 418 this.userSpecifiedClientID = true; 419 ensureConnectionInfoSent(); 420 } 421 422 /** 423 * Sets the default client id that the connection will use if explicitly not 424 * set with the setClientId() call. 425 */ 426 public void setDefaultClientID(String clientID) throws JMSException { 427 this.info.setClientId(clientID); 428 this.userSpecifiedClientID = true; 429 } 430 431 /** 432 * Gets the metadata for this connection. 433 * 434 * @return the connection metadata 435 * @throws JMSException if the JMS provider fails to get the connection 436 * metadata for this connection. 437 * @see javax.jms.ConnectionMetaData 438 */ 439 @Override 440 public ConnectionMetaData getMetaData() throws JMSException { 441 checkClosedOrFailed(); 442 return ActiveMQConnectionMetaData.INSTANCE; 443 } 444 445 /** 446 * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not 447 * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE> 448 * associated with it. 449 * 450 * @return the <CODE>ExceptionListener</CODE> for this connection, or 451 * null, if no <CODE>ExceptionListener</CODE> is associated with 452 * this connection. 453 * @throws JMSException if the JMS provider fails to get the 454 * <CODE>ExceptionListener</CODE> for this connection. 455 * @see javax.jms.Connection#setExceptionListener(ExceptionListener) 456 */ 457 @Override 458 public ExceptionListener getExceptionListener() throws JMSException { 459 checkClosedOrFailed(); 460 return this.exceptionListener; 461 } 462 463 /** 464 * Sets an exception listener for this connection. 465 * <P> 466 * If a JMS provider detects a serious problem with a connection, it informs 467 * the connection's <CODE> ExceptionListener</CODE>, if one has been 468 * registered. It does this by calling the listener's <CODE>onException 469 * </CODE> 470 * method, passing it a <CODE>JMSException</CODE> object describing the 471 * problem. 472 * <P> 473 * An exception listener allows a client to be notified of a problem 474 * asynchronously. Some connections only consume messages, so they would 475 * have no other way to learn their connection has failed. 476 * <P> 477 * A connection serializes execution of its <CODE>ExceptionListener</CODE>. 478 * <P> 479 * A JMS provider should attempt to resolve connection problems itself 480 * before it notifies the client of them. 481 * 482 * @param listener the exception listener 483 * @throws JMSException if the JMS provider fails to set the exception 484 * listener for this connection. 485 */ 486 @Override 487 public void setExceptionListener(ExceptionListener listener) throws JMSException { 488 checkClosedOrFailed(); 489 this.exceptionListener = listener; 490 } 491 492 /** 493 * Gets the <code>ClientInternalExceptionListener</code> object for this connection. 494 * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE> 495 * associated with it. 496 * 497 * @return the listener or <code>null</code> if no listener is registered with the connection. 498 */ 499 public ClientInternalExceptionListener getClientInternalExceptionListener() { 500 return clientInternalExceptionListener; 501 } 502 503 /** 504 * Sets a client internal exception listener for this connection. 505 * The connection will notify the listener, if one has been registered, of exceptions thrown by container components 506 * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message. 507 * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code> 508 * describing the problem. 509 * 510 * @param listener the exception listener 511 */ 512 public void setClientInternalExceptionListener(ClientInternalExceptionListener listener) { 513 this.clientInternalExceptionListener = listener; 514 } 515 516 /** 517 * Starts (or restarts) a connection's delivery of incoming messages. A call 518 * to <CODE>start</CODE> on a connection that has already been started is 519 * ignored. 520 * 521 * @throws JMSException if the JMS provider fails to start message delivery 522 * due to some internal error. 523 * @see javax.jms.Connection#stop() 524 */ 525 @Override 526 public void start() throws JMSException { 527 checkClosedOrFailed(); 528 ensureConnectionInfoSent(); 529 if (started.compareAndSet(false, true)) { 530 for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) { 531 ActiveMQSession session = i.next(); 532 session.start(); 533 } 534 } 535 } 536 537 /** 538 * Temporarily stops a connection's delivery of incoming messages. Delivery 539 * can be restarted using the connection's <CODE>start</CODE> method. When 540 * the connection is stopped, delivery to all the connection's message 541 * consumers is inhibited: synchronous receives block, and messages are not 542 * delivered to message listeners. 543 * <P> 544 * This call blocks until receives and/or message listeners in progress have 545 * completed. 546 * <P> 547 * Stopping a connection has no effect on its ability to send messages. A 548 * call to <CODE>stop</CODE> on a connection that has already been stopped 549 * is ignored. 550 * <P> 551 * A call to <CODE>stop</CODE> must not return until delivery of messages 552 * has paused. This means that a client can rely on the fact that none of 553 * its message listeners will be called and that all threads of control 554 * waiting for <CODE>receive</CODE> calls to return will not return with a 555 * message until the connection is restarted. The receive timers for a 556 * stopped connection continue to advance, so receives may time out while 557 * the connection is stopped. 558 * <P> 559 * If message listeners are running when <CODE>stop</CODE> is invoked, the 560 * <CODE>stop</CODE> call must wait until all of them have returned before 561 * it may return. While these message listeners are completing, they must 562 * have the full services of the connection available to them. 563 * 564 * @throws JMSException if the JMS provider fails to stop message delivery 565 * due to some internal error. 566 * @see javax.jms.Connection#start() 567 */ 568 @Override 569 public void stop() throws JMSException { 570 doStop(true); 571 } 572 573 /** 574 * @see #stop() 575 * @param checkClosed <tt>true</tt> to check for already closed and throw {@link java.lang.IllegalStateException} if already closed, 576 * <tt>false</tt> to skip this check 577 * @throws JMSException if the JMS provider fails to stop message delivery due to some internal error. 578 */ 579 void doStop(boolean checkClosed) throws JMSException { 580 if (checkClosed) { 581 checkClosedOrFailed(); 582 } 583 if (started.compareAndSet(true, false)) { 584 synchronized(sessions) { 585 for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) { 586 ActiveMQSession s = i.next(); 587 s.stop(); 588 } 589 } 590 } 591 } 592 593 /** 594 * Closes the connection. 595 * <P> 596 * Since a provider typically allocates significant resources outside the 597 * JVM on behalf of a connection, clients should close these resources when 598 * they are not needed. Relying on garbage collection to eventually reclaim 599 * these resources may not be timely enough. 600 * <P> 601 * There is no need to close the sessions, producers, and consumers of a 602 * closed connection. 603 * <P> 604 * Closing a connection causes all temporary destinations to be deleted. 605 * <P> 606 * When this method is invoked, it should not return until message 607 * processing has been shut down in an orderly fashion. This means that all 608 * message listeners that may have been running have returned, and that all 609 * pending receives have returned. A close terminates all pending message 610 * receives on the connection's sessions' consumers. The receives may return 611 * with a message or with null, depending on whether there was a message 612 * available at the time of the close. If one or more of the connection's 613 * sessions' message listeners is processing a message at the time when 614 * connection <CODE>close</CODE> is invoked, all the facilities of the 615 * connection and its sessions must remain available to those listeners 616 * until they return control to the JMS provider. 617 * <P> 618 * Closing a connection causes any of its sessions' transactions in progress 619 * to be rolled back. In the case where a session's work is coordinated by 620 * an external transaction manager, a session's <CODE>commit</CODE> and 621 * <CODE> rollback</CODE> methods are not used and the result of a closed 622 * session's work is determined later by the transaction manager. Closing a 623 * connection does NOT force an acknowledgment of client-acknowledged 624 * sessions. 625 * <P> 626 * Invoking the <CODE>acknowledge</CODE> method of a received message from 627 * a closed connection's session must throw an 628 * <CODE>IllegalStateException</CODE>. Closing a closed connection must 629 * NOT throw an exception. 630 * 631 * @throws JMSException if the JMS provider fails to close the connection 632 * due to some internal error. For example, a failure to 633 * release resources or to close a socket connection can 634 * cause this exception to be thrown. 635 */ 636 @Override 637 public void close() throws JMSException { 638 // Store the interrupted state and clear so that cleanup happens without 639 // leaking connection resources. Reset in finally to preserve state. 640 boolean interrupted = Thread.interrupted(); 641 642 try { 643 644 // If we were running, lets stop first. 645 if (!closed.get() && !transportFailed.get()) { 646 // do not fail if already closed as according to JMS spec we must not 647 // throw exception if already closed 648 doStop(false); 649 } 650 651 synchronized (this) { 652 if (!closed.get()) { 653 closing.set(true); 654 655 if (destinationSource != null) { 656 destinationSource.stop(); 657 destinationSource = null; 658 } 659 if (advisoryConsumer != null) { 660 advisoryConsumer.dispose(); 661 advisoryConsumer = null; 662 } 663 664 Scheduler scheduler = this.scheduler; 665 if (scheduler != null) { 666 try { 667 scheduler.stop(); 668 } catch (Exception e) { 669 JMSException ex = JMSExceptionSupport.create(e); 670 throw ex; 671 } 672 } 673 674 long lastDeliveredSequenceId = 0; 675 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 676 ActiveMQSession s = i.next(); 677 s.dispose(); 678 lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId()); 679 } 680 for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) { 681 ActiveMQConnectionConsumer c = i.next(); 682 c.dispose(); 683 } 684 // Stream are deprecated and will be removed in a later release. 685 for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) { 686 ActiveMQInputStream c = i.next(); 687 c.dispose(); 688 } 689 for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) { 690 ActiveMQOutputStream c = i.next(); 691 c.dispose(); 692 } 693 694 this.activeTempDestinations.clear(); 695 696 if (isConnectionInfoSentToBroker) { 697 // If we announced ourselves to the broker.. Try to let the broker 698 // know that the connection is being shutdown. 699 RemoveInfo removeCommand = info.createRemoveCommand(); 700 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 701 try { 702 doSyncSendPacket(info.createRemoveCommand(), closeTimeout); 703 } catch (JMSException e) { 704 if (e.getCause() instanceof RequestTimedOutIOException) { 705 // expected 706 } else { 707 throw e; 708 } 709 } 710 doAsyncSendPacket(new ShutdownInfo()); 711 } 712 713 started.set(false); 714 715 // TODO if we move the TaskRunnerFactory to the connection 716 // factory 717 // then we may need to call 718 // factory.onConnectionClose(this); 719 if (sessionTaskRunner != null) { 720 sessionTaskRunner.shutdown(); 721 } 722 closed.set(true); 723 closing.set(false); 724 } 725 } 726 } finally { 727 try { 728 if (executor != null) { 729 ThreadPoolUtils.shutdown(executor); 730 } 731 } catch (Throwable e) { 732 LOG.warn("Error shutting down thread pool: " + executor + ". This exception will be ignored.", e); 733 } 734 735 ServiceSupport.dispose(this.transport); 736 737 factoryStats.removeConnection(this); 738 if (interrupted) { 739 Thread.currentThread().interrupt(); 740 } 741 } 742 } 743 744 /** 745 * Tells the broker to terminate its VM. This can be used to cleanly 746 * terminate a broker running in a standalone java process. Server must have 747 * property enable.vm.shutdown=true defined to allow this to work. 748 */ 749 // TODO : org.apache.activemq.message.BrokerAdminCommand not yet 750 // implemented. 751 /* 752 * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand 753 * command = new BrokerAdminCommand(); 754 * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM); 755 * asyncSendPacket(command); } 756 */ 757 758 /** 759 * Create a durable connection consumer for this connection (optional 760 * operation). This is an expert facility not used by regular JMS clients. 761 * 762 * @param topic topic to access 763 * @param subscriptionName durable subscription name 764 * @param messageSelector only messages with properties matching the message 765 * selector expression are delivered. A value of null or an 766 * empty string indicates that there is no message selector 767 * for the message consumer. 768 * @param sessionPool the server session pool to associate with this durable 769 * connection consumer 770 * @param maxMessages the maximum number of messages that can be assigned to 771 * a server session at one time 772 * @return the durable connection consumer 773 * @throws JMSException if the <CODE>Connection</CODE> object fails to 774 * create a connection consumer due to some internal error 775 * or invalid arguments for <CODE>sessionPool</CODE> and 776 * <CODE>messageSelector</CODE>. 777 * @throws javax.jms.InvalidDestinationException if an invalid destination 778 * is specified. 779 * @throws javax.jms.InvalidSelectorException if the message selector is 780 * invalid. 781 * @see javax.jms.ConnectionConsumer 782 * @since 1.1 783 */ 784 @Override 785 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) 786 throws JMSException { 787 return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false); 788 } 789 790 /** 791 * Create a durable connection consumer for this connection (optional 792 * operation). This is an expert facility not used by regular JMS clients. 793 * 794 * @param topic topic to access 795 * @param subscriptionName durable subscription name 796 * @param messageSelector only messages with properties matching the message 797 * selector expression are delivered. A value of null or an 798 * empty string indicates that there is no message selector 799 * for the message consumer. 800 * @param sessionPool the server session pool to associate with this durable 801 * connection consumer 802 * @param maxMessages the maximum number of messages that can be assigned to 803 * a server session at one time 804 * @param noLocal set true if you want to filter out messages published 805 * locally 806 * @return the durable connection consumer 807 * @throws JMSException if the <CODE>Connection</CODE> object fails to 808 * create a connection consumer due to some internal error 809 * or invalid arguments for <CODE>sessionPool</CODE> and 810 * <CODE>messageSelector</CODE>. 811 * @throws javax.jms.InvalidDestinationException if an invalid destination 812 * is specified. 813 * @throws javax.jms.InvalidSelectorException if the message selector is 814 * invalid. 815 * @see javax.jms.ConnectionConsumer 816 * @since 1.1 817 */ 818 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages, 819 boolean noLocal) throws JMSException { 820 checkClosedOrFailed(); 821 822 if (queueOnlyConnection) { 823 throw new IllegalStateException("QueueConnection cannot be used to create Pub/Sub based resources."); 824 } 825 826 ensureConnectionInfoSent(); 827 SessionId sessionId = new SessionId(info.getConnectionId(), -1); 828 ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId())); 829 info.setDestination(ActiveMQMessageTransformation.transformDestination(topic)); 830 info.setSubscriptionName(subscriptionName); 831 info.setSelector(messageSelector); 832 info.setPrefetchSize(maxMessages); 833 info.setDispatchAsync(isDispatchAsync()); 834 835 // Allows the options on the destination to configure the consumerInfo 836 if (info.getDestination().getOptions() != null) { 837 Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions()); 838 IntrospectionSupport.setProperties(this.info, options, "consumer."); 839 } 840 841 return new ActiveMQConnectionConsumer(this, sessionPool, info); 842 } 843 844 // Properties 845 // ------------------------------------------------------------------------- 846 847 /** 848 * Returns true if this connection has been started 849 * 850 * @return true if this Connection is started 851 */ 852 public boolean isStarted() { 853 return started.get(); 854 } 855 856 /** 857 * Returns true if the connection is closed 858 */ 859 public boolean isClosed() { 860 return closed.get(); 861 } 862 863 /** 864 * Returns true if the connection is in the process of being closed 865 */ 866 public boolean isClosing() { 867 return closing.get(); 868 } 869 870 /** 871 * Returns true if the underlying transport has failed 872 */ 873 public boolean isTransportFailed() { 874 return transportFailed.get(); 875 } 876 877 /** 878 * @return Returns the prefetchPolicy. 879 */ 880 public ActiveMQPrefetchPolicy getPrefetchPolicy() { 881 return prefetchPolicy; 882 } 883 884 /** 885 * Sets the <a 886 * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch 887 * policy</a> for consumers created by this connection. 888 */ 889 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) { 890 this.prefetchPolicy = prefetchPolicy; 891 } 892 893 /** 894 */ 895 public Transport getTransportChannel() { 896 return transport; 897 } 898 899 /** 900 * @return Returns the clientID of the connection, forcing one to be 901 * generated if one has not yet been configured. 902 */ 903 public String getInitializedClientID() throws JMSException { 904 ensureConnectionInfoSent(); 905 return info.getClientId(); 906 } 907 908 /** 909 * @return Returns the timeStampsDisableByDefault. 910 */ 911 public boolean isDisableTimeStampsByDefault() { 912 return disableTimeStampsByDefault; 913 } 914 915 /** 916 * Sets whether or not timestamps on messages should be disabled or not. If 917 * you disable them it adds a small performance boost. 918 */ 919 public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) { 920 this.disableTimeStampsByDefault = timeStampsDisableByDefault; 921 } 922 923 /** 924 * @return Returns the dispatchOptimizedMessage. 925 */ 926 public boolean isOptimizedMessageDispatch() { 927 return optimizedMessageDispatch; 928 } 929 930 /** 931 * If this flag is set then an larger prefetch limit is used - only 932 * applicable for durable topic subscribers. 933 */ 934 public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) { 935 this.optimizedMessageDispatch = dispatchOptimizedMessage; 936 } 937 938 /** 939 * @return Returns the closeTimeout. 940 */ 941 public int getCloseTimeout() { 942 return closeTimeout; 943 } 944 945 /** 946 * Sets the timeout before a close is considered complete. Normally a 947 * close() on a connection waits for confirmation from the broker; this 948 * allows that operation to timeout to save the client hanging if there is 949 * no broker 950 */ 951 public void setCloseTimeout(int closeTimeout) { 952 this.closeTimeout = closeTimeout; 953 } 954 955 /** 956 * @return ConnectionInfo 957 */ 958 public ConnectionInfo getConnectionInfo() { 959 return this.info; 960 } 961 962 public boolean isUseRetroactiveConsumer() { 963 return useRetroactiveConsumer; 964 } 965 966 /** 967 * Sets whether or not retroactive consumers are enabled. Retroactive 968 * consumers allow non-durable topic subscribers to receive old messages 969 * that were published before the non-durable subscriber started. 970 */ 971 public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) { 972 this.useRetroactiveConsumer = useRetroactiveConsumer; 973 } 974 975 public boolean isNestedMapAndListEnabled() { 976 return nestedMapAndListEnabled; 977 } 978 979 /** 980 * Enables/disables whether or not Message properties and MapMessage entries 981 * support <a 982 * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested 983 * Structures</a> of Map and List objects 984 */ 985 public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) { 986 this.nestedMapAndListEnabled = structuredMapsEnabled; 987 } 988 989 public boolean isExclusiveConsumer() { 990 return exclusiveConsumer; 991 } 992 993 /** 994 * Enables or disables whether or not queue consumers should be exclusive or 995 * not for example to preserve ordering when not using <a 996 * href="http://activemq.apache.org/message-groups.html">Message Groups</a> 997 * 998 * @param exclusiveConsumer 999 */ 1000 public void setExclusiveConsumer(boolean exclusiveConsumer) { 1001 this.exclusiveConsumer = exclusiveConsumer; 1002 } 1003 1004 /** 1005 * Adds a transport listener so that a client can be notified of events in 1006 * the underlying transport 1007 */ 1008 public void addTransportListener(TransportListener transportListener) { 1009 transportListeners.add(transportListener); 1010 } 1011 1012 public void removeTransportListener(TransportListener transportListener) { 1013 transportListeners.remove(transportListener); 1014 } 1015 1016 public boolean isUseDedicatedTaskRunner() { 1017 return useDedicatedTaskRunner; 1018 } 1019 1020 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { 1021 this.useDedicatedTaskRunner = useDedicatedTaskRunner; 1022 } 1023 1024 public TaskRunnerFactory getSessionTaskRunner() { 1025 synchronized (this) { 1026 if (sessionTaskRunner == null) { 1027 sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner(), maxThreadPoolSize); 1028 sessionTaskRunner.setRejectedTaskHandler(rejectedTaskHandler); 1029 } 1030 } 1031 return sessionTaskRunner; 1032 } 1033 1034 public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) { 1035 this.sessionTaskRunner = sessionTaskRunner; 1036 } 1037 1038 public MessageTransformer getTransformer() { 1039 return transformer; 1040 } 1041 1042 /** 1043 * Sets the transformer used to transform messages before they are sent on 1044 * to the JMS bus or when they are received from the bus but before they are 1045 * delivered to the JMS client 1046 */ 1047 public void setTransformer(MessageTransformer transformer) { 1048 this.transformer = transformer; 1049 } 1050 1051 /** 1052 * @return the statsEnabled 1053 */ 1054 public boolean isStatsEnabled() { 1055 return this.stats.isEnabled(); 1056 } 1057 1058 /** 1059 * @param statsEnabled the statsEnabled to set 1060 */ 1061 public void setStatsEnabled(boolean statsEnabled) { 1062 this.stats.setEnabled(statsEnabled); 1063 } 1064 1065 /** 1066 * Returns the {@link DestinationSource} object which can be used to listen to destinations 1067 * being created or destroyed or to enquire about the current destinations available on the broker 1068 * 1069 * @return a lazily created destination source 1070 * @throws JMSException 1071 */ 1072 @Override 1073 public DestinationSource getDestinationSource() throws JMSException { 1074 if (destinationSource == null) { 1075 destinationSource = new DestinationSource(this); 1076 destinationSource.start(); 1077 } 1078 return destinationSource; 1079 } 1080 1081 // Implementation methods 1082 // ------------------------------------------------------------------------- 1083 1084 /** 1085 * Used internally for adding Sessions to the Connection 1086 * 1087 * @param session 1088 * @throws JMSException 1089 * @throws JMSException 1090 */ 1091 protected void addSession(ActiveMQSession session) throws JMSException { 1092 this.sessions.add(session); 1093 if (sessions.size() > 1 || session.isTransacted()) { 1094 optimizedMessageDispatch = false; 1095 } 1096 } 1097 1098 /** 1099 * Used interanlly for removing Sessions from a Connection 1100 * 1101 * @param session 1102 */ 1103 protected void removeSession(ActiveMQSession session) { 1104 this.sessions.remove(session); 1105 this.removeDispatcher(session); 1106 } 1107 1108 /** 1109 * Add a ConnectionConsumer 1110 * 1111 * @param connectionConsumer 1112 * @throws JMSException 1113 */ 1114 protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException { 1115 this.connectionConsumers.add(connectionConsumer); 1116 } 1117 1118 /** 1119 * Remove a ConnectionConsumer 1120 * 1121 * @param connectionConsumer 1122 */ 1123 protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) { 1124 this.connectionConsumers.remove(connectionConsumer); 1125 this.removeDispatcher(connectionConsumer); 1126 } 1127 1128 /** 1129 * Creates a <CODE>TopicSession</CODE> object. 1130 * 1131 * @param transacted indicates whether the session is transacted 1132 * @param acknowledgeMode indicates whether the consumer or the client will 1133 * acknowledge any messages it receives; ignored if the 1134 * session is transacted. Legal values are 1135 * <code>Session.AUTO_ACKNOWLEDGE</code>, 1136 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and 1137 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. 1138 * @return a newly created topic session 1139 * @throws JMSException if the <CODE>TopicConnection</CODE> object fails 1140 * to create a session due to some internal error or lack of 1141 * support for the specific transaction and acknowledgement 1142 * mode. 1143 * @see Session#AUTO_ACKNOWLEDGE 1144 * @see Session#CLIENT_ACKNOWLEDGE 1145 * @see Session#DUPS_OK_ACKNOWLEDGE 1146 */ 1147 @Override 1148 public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException { 1149 return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode)); 1150 } 1151 1152 /** 1153 * Creates a connection consumer for this connection (optional operation). 1154 * This is an expert facility not used by regular JMS clients. 1155 * 1156 * @param topic the topic to access 1157 * @param messageSelector only messages with properties matching the message 1158 * selector expression are delivered. A value of null or an 1159 * empty string indicates that there is no message selector 1160 * for the message consumer. 1161 * @param sessionPool the server session pool to associate with this 1162 * connection consumer 1163 * @param maxMessages the maximum number of messages that can be assigned to 1164 * a server session at one time 1165 * @return the connection consumer 1166 * @throws JMSException if the <CODE>TopicConnection</CODE> object fails 1167 * to create a connection consumer due to some internal 1168 * error or invalid arguments for <CODE>sessionPool</CODE> 1169 * and <CODE>messageSelector</CODE>. 1170 * @throws javax.jms.InvalidDestinationException if an invalid topic is 1171 * specified. 1172 * @throws javax.jms.InvalidSelectorException if the message selector is 1173 * invalid. 1174 * @see javax.jms.ConnectionConsumer 1175 */ 1176 @Override 1177 public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 1178 return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false); 1179 } 1180 1181 /** 1182 * Creates a connection consumer for this connection (optional operation). 1183 * This is an expert facility not used by regular JMS clients. 1184 * 1185 * @param queue the queue to access 1186 * @param messageSelector only messages with properties matching the message 1187 * selector expression are delivered. A value of null or an 1188 * empty string indicates that there is no message selector 1189 * for the message consumer. 1190 * @param sessionPool the server session pool to associate with this 1191 * connection consumer 1192 * @param maxMessages the maximum number of messages that can be assigned to 1193 * a server session at one time 1194 * @return the connection consumer 1195 * @throws JMSException if the <CODE>QueueConnection</CODE> object fails 1196 * to create a connection consumer due to some internal 1197 * error or invalid arguments for <CODE>sessionPool</CODE> 1198 * and <CODE>messageSelector</CODE>. 1199 * @throws javax.jms.InvalidDestinationException if an invalid queue is 1200 * specified. 1201 * @throws javax.jms.InvalidSelectorException if the message selector is 1202 * invalid. 1203 * @see javax.jms.ConnectionConsumer 1204 */ 1205 @Override 1206 public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 1207 return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false); 1208 } 1209 1210 /** 1211 * Creates a connection consumer for this connection (optional operation). 1212 * This is an expert facility not used by regular JMS clients. 1213 * 1214 * @param destination the destination to access 1215 * @param messageSelector only messages with properties matching the message 1216 * selector expression are delivered. A value of null or an 1217 * empty string indicates that there is no message selector 1218 * for the message consumer. 1219 * @param sessionPool the server session pool to associate with this 1220 * connection consumer 1221 * @param maxMessages the maximum number of messages that can be assigned to 1222 * a server session at one time 1223 * @return the connection consumer 1224 * @throws JMSException if the <CODE>Connection</CODE> object fails to 1225 * create a connection consumer due to some internal error 1226 * or invalid arguments for <CODE>sessionPool</CODE> and 1227 * <CODE>messageSelector</CODE>. 1228 * @throws javax.jms.InvalidDestinationException if an invalid destination 1229 * is specified. 1230 * @throws javax.jms.InvalidSelectorException if the message selector is 1231 * invalid. 1232 * @see javax.jms.ConnectionConsumer 1233 * @since 1.1 1234 */ 1235 @Override 1236 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 1237 return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false); 1238 } 1239 1240 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal) 1241 throws JMSException { 1242 1243 checkClosedOrFailed(); 1244 ensureConnectionInfoSent(); 1245 1246 ConsumerId consumerId = createConsumerId(); 1247 ConsumerInfo consumerInfo = new ConsumerInfo(consumerId); 1248 consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination)); 1249 consumerInfo.setSelector(messageSelector); 1250 consumerInfo.setPrefetchSize(maxMessages); 1251 consumerInfo.setNoLocal(noLocal); 1252 consumerInfo.setDispatchAsync(isDispatchAsync()); 1253 1254 // Allows the options on the destination to configure the consumerInfo 1255 if (consumerInfo.getDestination().getOptions() != null) { 1256 Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions()); 1257 IntrospectionSupport.setProperties(consumerInfo, options, "consumer."); 1258 } 1259 1260 return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo); 1261 } 1262 1263 /** 1264 * @return 1265 */ 1266 private ConsumerId createConsumerId() { 1267 return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId()); 1268 } 1269 1270 /** 1271 * @return 1272 */ 1273 private ProducerId createProducerId() { 1274 return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId()); 1275 } 1276 1277 /** 1278 * Creates a <CODE>QueueSession</CODE> object. 1279 * 1280 * @param transacted indicates whether the session is transacted 1281 * @param acknowledgeMode indicates whether the consumer or the client will 1282 * acknowledge any messages it receives; ignored if the 1283 * session is transacted. Legal values are 1284 * <code>Session.AUTO_ACKNOWLEDGE</code>, 1285 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and 1286 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. 1287 * @return a newly created queue session 1288 * @throws JMSException if the <CODE>QueueConnection</CODE> object fails 1289 * to create a session due to some internal error or lack of 1290 * support for the specific transaction and acknowledgement 1291 * mode. 1292 * @see Session#AUTO_ACKNOWLEDGE 1293 * @see Session#CLIENT_ACKNOWLEDGE 1294 * @see Session#DUPS_OK_ACKNOWLEDGE 1295 */ 1296 @Override 1297 public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException { 1298 return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode)); 1299 } 1300 1301 /** 1302 * Ensures that the clientID was manually specified and not auto-generated. 1303 * If the clientID was not specified this method will throw an exception. 1304 * This method is used to ensure that the clientID + durableSubscriber name 1305 * are used correctly. 1306 * 1307 * @throws JMSException 1308 */ 1309 public void checkClientIDWasManuallySpecified() throws JMSException { 1310 if (!userSpecifiedClientID) { 1311 throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection"); 1312 } 1313 } 1314 1315 /** 1316 * send a Packet through the Connection - for internal use only 1317 * 1318 * @param command 1319 * @throws JMSException 1320 */ 1321 public void asyncSendPacket(Command command) throws JMSException { 1322 if (isClosed()) { 1323 throw new ConnectionClosedException(); 1324 } else { 1325 doAsyncSendPacket(command); 1326 } 1327 } 1328 1329 private void doAsyncSendPacket(Command command) throws JMSException { 1330 try { 1331 this.transport.oneway(command); 1332 } catch (IOException e) { 1333 throw JMSExceptionSupport.create(e); 1334 } 1335 } 1336 1337 /** 1338 * Send a packet through a Connection - for internal use only 1339 * 1340 * @param command 1341 * @return 1342 * @throws JMSException 1343 */ 1344 public void syncSendPacket(final Command command, final AsyncCallback onComplete) throws JMSException { 1345 if(onComplete==null) { 1346 syncSendPacket(command); 1347 } else { 1348 if (isClosed()) { 1349 throw new ConnectionClosedException(); 1350 } 1351 try { 1352 this.transport.asyncRequest(command, new ResponseCallback() { 1353 @Override 1354 public void onCompletion(FutureResponse resp) { 1355 Response response; 1356 Throwable exception = null; 1357 try { 1358 response = resp.getResult(); 1359 if (response.isException()) { 1360 ExceptionResponse er = (ExceptionResponse)response; 1361 exception = er.getException(); 1362 } 1363 } catch (Exception e) { 1364 exception = e; 1365 } 1366 if(exception!=null) { 1367 if ( exception instanceof JMSException) { 1368 onComplete.onException((JMSException) exception); 1369 } else { 1370 if (isClosed()||closing.get()) { 1371 LOG.debug("Received an exception but connection is closing"); 1372 } 1373 JMSException jmsEx = null; 1374 try { 1375 jmsEx = JMSExceptionSupport.create(exception); 1376 } catch(Throwable e) { 1377 LOG.error("Caught an exception trying to create a JMSException for " +exception,e); 1378 } 1379 // dispose of transport for security exceptions on connection initiation 1380 if (exception instanceof SecurityException && command instanceof ConnectionInfo){ 1381 forceCloseOnSecurityException(exception); 1382 } 1383 if (jmsEx !=null) { 1384 onComplete.onException(jmsEx); 1385 } 1386 } 1387 } else { 1388 onComplete.onSuccess(); 1389 } 1390 } 1391 }); 1392 } catch (IOException e) { 1393 throw JMSExceptionSupport.create(e); 1394 } 1395 } 1396 } 1397 1398 private void forceCloseOnSecurityException(Throwable exception) { 1399 LOG.trace("force close on security exception:" + this + ", transport=" + transport, exception); 1400 onException(new IOException("Force close due to SecurityException on connect", exception)); 1401 } 1402 1403 public Response syncSendPacket(Command command) throws JMSException { 1404 if (isClosed()) { 1405 throw new ConnectionClosedException(); 1406 } else { 1407 1408 try { 1409 Response response = (Response)this.transport.request(command); 1410 if (response.isException()) { 1411 ExceptionResponse er = (ExceptionResponse)response; 1412 if (er.getException() instanceof JMSException) { 1413 throw (JMSException)er.getException(); 1414 } else { 1415 if (isClosed()||closing.get()) { 1416 LOG.debug("Received an exception but connection is closing"); 1417 } 1418 JMSException jmsEx = null; 1419 try { 1420 jmsEx = JMSExceptionSupport.create(er.getException()); 1421 } catch(Throwable e) { 1422 LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e); 1423 } 1424 if (er.getException() instanceof SecurityException && command instanceof ConnectionInfo){ 1425 forceCloseOnSecurityException(er.getException()); 1426 } 1427 if (jmsEx !=null) { 1428 throw jmsEx; 1429 } 1430 } 1431 } 1432 return response; 1433 } catch (IOException e) { 1434 throw JMSExceptionSupport.create(e); 1435 } 1436 } 1437 } 1438 1439 /** 1440 * Send a packet through a Connection - for internal use only 1441 * 1442 * @param command 1443 * @return 1444 * @throws JMSException 1445 */ 1446 public Response syncSendPacket(Command command, int timeout) throws JMSException { 1447 if (isClosed() || closing.get()) { 1448 throw new ConnectionClosedException(); 1449 } else { 1450 return doSyncSendPacket(command, timeout); 1451 } 1452 } 1453 1454 private Response doSyncSendPacket(Command command, int timeout) 1455 throws JMSException { 1456 try { 1457 Response response = (Response) (timeout > 0 1458 ? this.transport.request(command, timeout) 1459 : this.transport.request(command)); 1460 if (response != null && response.isException()) { 1461 ExceptionResponse er = (ExceptionResponse)response; 1462 if (er.getException() instanceof JMSException) { 1463 throw (JMSException)er.getException(); 1464 } else { 1465 throw JMSExceptionSupport.create(er.getException()); 1466 } 1467 } 1468 return response; 1469 } catch (IOException e) { 1470 throw JMSExceptionSupport.create(e); 1471 } 1472 } 1473 1474 /** 1475 * @return statistics for this Connection 1476 */ 1477 @Override 1478 public StatsImpl getStats() { 1479 return stats; 1480 } 1481 1482 /** 1483 * simply throws an exception if the Connection is already closed or the 1484 * Transport has failed 1485 * 1486 * @throws JMSException 1487 */ 1488 protected synchronized void checkClosedOrFailed() throws JMSException { 1489 checkClosed(); 1490 if (transportFailed.get()) { 1491 throw new ConnectionFailedException(firstFailureError); 1492 } 1493 } 1494 1495 /** 1496 * simply throws an exception if the Connection is already closed 1497 * 1498 * @throws JMSException 1499 */ 1500 protected synchronized void checkClosed() throws JMSException { 1501 if (closed.get()) { 1502 throw new ConnectionClosedException(); 1503 } 1504 } 1505 1506 /** 1507 * Send the ConnectionInfo to the Broker 1508 * 1509 * @throws JMSException 1510 */ 1511 protected void ensureConnectionInfoSent() throws JMSException { 1512 synchronized(this.ensureConnectionInfoSentMutex) { 1513 // Can we skip sending the ConnectionInfo packet?? 1514 if (isConnectionInfoSentToBroker || closed.get()) { 1515 return; 1516 } 1517 //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID? 1518 if (info.getClientId() == null || info.getClientId().trim().length() == 0) { 1519 info.setClientId(clientIdGenerator.generateId()); 1520 } 1521 syncSendPacket(info.copy()); 1522 1523 this.isConnectionInfoSentToBroker = true; 1524 // Add a temp destination advisory consumer so that 1525 // We know what the valid temporary destinations are on the 1526 // broker without having to do an RPC to the broker. 1527 1528 ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId()); 1529 if (watchTopicAdvisories) { 1530 advisoryConsumer = new AdvisoryConsumer(this, consumerId); 1531 } 1532 } 1533 } 1534 1535 public synchronized boolean isWatchTopicAdvisories() { 1536 return watchTopicAdvisories; 1537 } 1538 1539 public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) { 1540 this.watchTopicAdvisories = watchTopicAdvisories; 1541 } 1542 1543 /** 1544 * @return Returns the useAsyncSend. 1545 */ 1546 public boolean isUseAsyncSend() { 1547 return useAsyncSend; 1548 } 1549 1550 /** 1551 * Forces the use of <a 1552 * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which 1553 * adds a massive performance boost; but means that the send() method will 1554 * return immediately whether the message has been sent or not which could 1555 * lead to message loss. 1556 */ 1557 public void setUseAsyncSend(boolean useAsyncSend) { 1558 this.useAsyncSend = useAsyncSend; 1559 } 1560 1561 /** 1562 * @return true if always sync send messages 1563 */ 1564 public boolean isAlwaysSyncSend() { 1565 return this.alwaysSyncSend; 1566 } 1567 1568 /** 1569 * Set true if always require messages to be sync sent 1570 * 1571 * @param alwaysSyncSend 1572 */ 1573 public void setAlwaysSyncSend(boolean alwaysSyncSend) { 1574 this.alwaysSyncSend = alwaysSyncSend; 1575 } 1576 1577 /** 1578 * @return the messagePrioritySupported 1579 */ 1580 public boolean isMessagePrioritySupported() { 1581 return this.messagePrioritySupported; 1582 } 1583 1584 /** 1585 * @param messagePrioritySupported the messagePrioritySupported to set 1586 */ 1587 public void setMessagePrioritySupported(boolean messagePrioritySupported) { 1588 this.messagePrioritySupported = messagePrioritySupported; 1589 } 1590 1591 /** 1592 * Cleans up this connection so that it's state is as if the connection was 1593 * just created. This allows the Resource Adapter to clean up a connection 1594 * so that it can be reused without having to close and recreate the 1595 * connection. 1596 */ 1597 public void cleanup() throws JMSException { 1598 1599 if (advisoryConsumer != null && !isTransportFailed()) { 1600 advisoryConsumer.dispose(); 1601 advisoryConsumer = null; 1602 } 1603 1604 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 1605 ActiveMQSession s = i.next(); 1606 s.dispose(); 1607 } 1608 for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) { 1609 ActiveMQConnectionConsumer c = i.next(); 1610 c.dispose(); 1611 } 1612 1613 // Stream are deprecated and will be removed in a later release. 1614 for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) { 1615 ActiveMQInputStream c = i.next(); 1616 c.dispose(); 1617 } 1618 for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) { 1619 ActiveMQOutputStream c = i.next(); 1620 c.dispose(); 1621 } 1622 1623 if (isConnectionInfoSentToBroker) { 1624 if (!transportFailed.get() && !closing.get()) { 1625 syncSendPacket(info.createRemoveCommand()); 1626 } 1627 isConnectionInfoSentToBroker = false; 1628 } 1629 if (userSpecifiedClientID) { 1630 info.setClientId(null); 1631 userSpecifiedClientID = false; 1632 } 1633 clientIDSet = false; 1634 1635 started.set(false); 1636 } 1637 1638 /** 1639 * Changes the associated username/password that is associated with this 1640 * connection. If the connection has been used, you must called cleanup() 1641 * before calling this method. 1642 * 1643 * @throws IllegalStateException if the connection is in used. 1644 */ 1645 public void changeUserInfo(String userName, String password) throws JMSException { 1646 if (isConnectionInfoSentToBroker) { 1647 throw new IllegalStateException("changeUserInfo used Connection is not allowed"); 1648 } 1649 this.info.setUserName(userName); 1650 this.info.setPassword(password); 1651 } 1652 1653 /** 1654 * @return Returns the resourceManagerId. 1655 * @throws JMSException 1656 */ 1657 public String getResourceManagerId() throws JMSException { 1658 if (isRmIdFromConnectionId()) { 1659 return info.getConnectionId().getValue(); 1660 } 1661 waitForBrokerInfo(); 1662 if (brokerInfo == null) { 1663 throw new JMSException("Connection failed before Broker info was received."); 1664 } 1665 return brokerInfo.getBrokerId().getValue(); 1666 } 1667 1668 /** 1669 * Returns the broker name if one is available or null if one is not 1670 * available yet. 1671 */ 1672 public String getBrokerName() { 1673 try { 1674 brokerInfoReceived.await(5, TimeUnit.SECONDS); 1675 if (brokerInfo == null) { 1676 return null; 1677 } 1678 return brokerInfo.getBrokerName(); 1679 } catch (InterruptedException e) { 1680 Thread.currentThread().interrupt(); 1681 return null; 1682 } 1683 } 1684 1685 /** 1686 * Returns the broker information if it is available or null if it is not 1687 * available yet. 1688 */ 1689 public BrokerInfo getBrokerInfo() { 1690 return brokerInfo; 1691 } 1692 1693 /** 1694 * @return Returns the RedeliveryPolicy. 1695 * @throws JMSException 1696 */ 1697 public RedeliveryPolicy getRedeliveryPolicy() throws JMSException { 1698 return redeliveryPolicyMap.getDefaultEntry(); 1699 } 1700 1701 /** 1702 * Sets the redelivery policy to be used when messages are rolled back 1703 */ 1704 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 1705 this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy); 1706 } 1707 1708 public BlobTransferPolicy getBlobTransferPolicy() { 1709 if (blobTransferPolicy == null) { 1710 blobTransferPolicy = createBlobTransferPolicy(); 1711 } 1712 return blobTransferPolicy; 1713 } 1714 1715 /** 1716 * Sets the policy used to describe how out-of-band BLOBs (Binary Large 1717 * OBjects) are transferred from producers to brokers to consumers 1718 */ 1719 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { 1720 this.blobTransferPolicy = blobTransferPolicy; 1721 } 1722 1723 /** 1724 * @return Returns the alwaysSessionAsync. 1725 */ 1726 public boolean isAlwaysSessionAsync() { 1727 return alwaysSessionAsync; 1728 } 1729 1730 /** 1731 * If this flag is not set then a separate thread is not used for dispatching messages for each Session in 1732 * the Connection. However, a separate thread is always used if there is more than one session, or the session 1733 * isn't in auto acknowledge or duplicates ok mode. By default this value is set to true and session dispatch 1734 * happens asynchronously. 1735 */ 1736 public void setAlwaysSessionAsync(boolean alwaysSessionAsync) { 1737 this.alwaysSessionAsync = alwaysSessionAsync; 1738 } 1739 1740 /** 1741 * @return Returns the optimizeAcknowledge. 1742 */ 1743 public boolean isOptimizeAcknowledge() { 1744 return optimizeAcknowledge; 1745 } 1746 1747 /** 1748 * Enables an optimised acknowledgement mode where messages are acknowledged 1749 * in batches rather than individually 1750 * 1751 * @param optimizeAcknowledge The optimizeAcknowledge to set. 1752 */ 1753 public void setOptimizeAcknowledge(boolean optimizeAcknowledge) { 1754 this.optimizeAcknowledge = optimizeAcknowledge; 1755 } 1756 1757 /** 1758 * The max time in milliseconds between optimized ack batches 1759 * @param optimizeAcknowledgeTimeOut 1760 */ 1761 public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) { 1762 this.optimizeAcknowledgeTimeOut = optimizeAcknowledgeTimeOut; 1763 } 1764 1765 public long getOptimizeAcknowledgeTimeOut() { 1766 return optimizeAcknowledgeTimeOut; 1767 } 1768 1769 public long getWarnAboutUnstartedConnectionTimeout() { 1770 return warnAboutUnstartedConnectionTimeout; 1771 } 1772 1773 /** 1774 * Enables the timeout from a connection creation to when a warning is 1775 * generated if the connection is not properly started via {@link #start()} 1776 * and a message is received by a consumer. It is a very common gotcha to 1777 * forget to <a 1778 * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start 1779 * the connection</a> so this option makes the default case to create a 1780 * warning if the user forgets. To disable the warning just set the value to < 1781 * 0 (say -1). 1782 */ 1783 public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) { 1784 this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout; 1785 } 1786 1787 /** 1788 * @return the sendTimeout (in milliseconds) 1789 */ 1790 public int getSendTimeout() { 1791 return sendTimeout; 1792 } 1793 1794 /** 1795 * @param sendTimeout the sendTimeout to set (in milliseconds) 1796 */ 1797 public void setSendTimeout(int sendTimeout) { 1798 this.sendTimeout = sendTimeout; 1799 } 1800 1801 /** 1802 * @return the sendAcksAsync 1803 */ 1804 public boolean isSendAcksAsync() { 1805 return sendAcksAsync; 1806 } 1807 1808 /** 1809 * @param sendAcksAsync the sendAcksAsync to set 1810 */ 1811 public void setSendAcksAsync(boolean sendAcksAsync) { 1812 this.sendAcksAsync = sendAcksAsync; 1813 } 1814 1815 /** 1816 * Returns the time this connection was created 1817 */ 1818 public long getTimeCreated() { 1819 return timeCreated; 1820 } 1821 1822 private void waitForBrokerInfo() throws JMSException { 1823 try { 1824 brokerInfoReceived.await(); 1825 } catch (InterruptedException e) { 1826 Thread.currentThread().interrupt(); 1827 throw JMSExceptionSupport.create(e); 1828 } 1829 } 1830 1831 // Package protected so that it can be used in unit tests 1832 public Transport getTransport() { 1833 return transport; 1834 } 1835 1836 public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) { 1837 producers.put(producerId, producer); 1838 } 1839 1840 public void removeProducer(ProducerId producerId) { 1841 producers.remove(producerId); 1842 } 1843 1844 public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) { 1845 dispatchers.put(consumerId, dispatcher); 1846 } 1847 1848 public void removeDispatcher(ConsumerId consumerId) { 1849 dispatchers.remove(consumerId); 1850 } 1851 1852 public boolean hasDispatcher(ConsumerId consumerId) { 1853 return dispatchers.containsKey(consumerId); 1854 } 1855 1856 /** 1857 * @param o - the command to consume 1858 */ 1859 @Override 1860 public void onCommand(final Object o) { 1861 final Command command = (Command)o; 1862 if (!closed.get() && command != null) { 1863 try { 1864 command.visit(new CommandVisitorAdapter() { 1865 @Override 1866 public Response processMessageDispatch(MessageDispatch md) throws Exception { 1867 waitForTransportInterruptionProcessingToComplete(); 1868 ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId()); 1869 if (dispatcher != null) { 1870 // Copy in case a embedded broker is dispatching via 1871 // vm:// 1872 // md.getMessage() == null to signal end of queue 1873 // browse. 1874 Message msg = md.getMessage(); 1875 if (msg != null) { 1876 msg = msg.copy(); 1877 msg.setReadOnlyBody(true); 1878 msg.setReadOnlyProperties(true); 1879 msg.setRedeliveryCounter(md.getRedeliveryCounter()); 1880 msg.setConnection(ActiveMQConnection.this); 1881 msg.setMemoryUsage(null); 1882 md.setMessage(msg); 1883 } 1884 dispatcher.dispatch(md); 1885 } else { 1886 LOG.debug("{} no dispatcher for {} in {}", this, md, dispatchers); 1887 } 1888 return null; 1889 } 1890 1891 @Override 1892 public Response processProducerAck(ProducerAck pa) throws Exception { 1893 if (pa != null && pa.getProducerId() != null) { 1894 ActiveMQMessageProducer producer = producers.get(pa.getProducerId()); 1895 if (producer != null) { 1896 producer.onProducerAck(pa); 1897 } 1898 } 1899 return null; 1900 } 1901 1902 @Override 1903 public Response processBrokerInfo(BrokerInfo info) throws Exception { 1904 brokerInfo = info; 1905 brokerInfoReceived.countDown(); 1906 optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration(); 1907 getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl()); 1908 return null; 1909 } 1910 1911 @Override 1912 public Response processConnectionError(final ConnectionError error) throws Exception { 1913 executor.execute(new Runnable() { 1914 @Override 1915 public void run() { 1916 onAsyncException(error.getException()); 1917 } 1918 }); 1919 return null; 1920 } 1921 1922 @Override 1923 public Response processControlCommand(ControlCommand command) throws Exception { 1924 onControlCommand(command); 1925 return null; 1926 } 1927 1928 @Override 1929 public Response processConnectionControl(ConnectionControl control) throws Exception { 1930 onConnectionControl((ConnectionControl)command); 1931 return null; 1932 } 1933 1934 @Override 1935 public Response processConsumerControl(ConsumerControl control) throws Exception { 1936 onConsumerControl((ConsumerControl)command); 1937 return null; 1938 } 1939 1940 @Override 1941 public Response processWireFormat(WireFormatInfo info) throws Exception { 1942 onWireFormatInfo((WireFormatInfo)command); 1943 return null; 1944 } 1945 }); 1946 } catch (Exception e) { 1947 onClientInternalException(e); 1948 } 1949 } 1950 1951 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) { 1952 TransportListener listener = iter.next(); 1953 listener.onCommand(command); 1954 } 1955 } 1956 1957 protected void onWireFormatInfo(WireFormatInfo info) { 1958 protocolVersion.set(info.getVersion()); 1959 } 1960 1961 /** 1962 * Handles async client internal exceptions. 1963 * A client internal exception is usually one that has been thrown 1964 * by a container runtime component during asynchronous processing of a 1965 * message that does not affect the connection itself. 1966 * This method notifies the <code>ClientInternalExceptionListener</code> by invoking 1967 * its <code>onException</code> method, if one has been registered with this connection. 1968 * 1969 * @param error the exception that the problem 1970 */ 1971 public void onClientInternalException(final Throwable error) { 1972 if ( !closed.get() && !closing.get() ) { 1973 if ( this.clientInternalExceptionListener != null ) { 1974 executor.execute(new Runnable() { 1975 @Override 1976 public void run() { 1977 ActiveMQConnection.this.clientInternalExceptionListener.onException(error); 1978 } 1979 }); 1980 } else { 1981 LOG.debug("Async client internal exception occurred with no exception listener registered: " 1982 + error, error); 1983 } 1984 } 1985 } 1986 1987 /** 1988 * Used for handling async exceptions 1989 * 1990 * @param error 1991 */ 1992 public void onAsyncException(Throwable error) { 1993 if (!closed.get() && !closing.get()) { 1994 if (this.exceptionListener != null) { 1995 1996 if (!(error instanceof JMSException)) { 1997 error = JMSExceptionSupport.create(error); 1998 } 1999 final JMSException e = (JMSException)error; 2000 2001 executor.execute(new Runnable() { 2002 @Override 2003 public void run() { 2004 ActiveMQConnection.this.exceptionListener.onException(e); 2005 } 2006 }); 2007 2008 } else { 2009 LOG.debug("Async exception with no exception listener: " + error, error); 2010 } 2011 } 2012 } 2013 2014 @Override 2015 public void onException(final IOException error) { 2016 onAsyncException(error); 2017 if (!closing.get() && !closed.get()) { 2018 executor.execute(new Runnable() { 2019 @Override 2020 public void run() { 2021 transportFailed(error); 2022 ServiceSupport.dispose(ActiveMQConnection.this.transport); 2023 brokerInfoReceived.countDown(); 2024 try { 2025 cleanup(); 2026 } catch (JMSException e) { 2027 LOG.warn("Exception during connection cleanup, " + e, e); 2028 } 2029 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) { 2030 TransportListener listener = iter.next(); 2031 listener.onException(error); 2032 } 2033 } 2034 }); 2035 } 2036 } 2037 2038 @Override 2039 public void transportInterupted() { 2040 transportInterruptionProcessingComplete.set(1); 2041 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 2042 ActiveMQSession s = i.next(); 2043 s.clearMessagesInProgress(transportInterruptionProcessingComplete); 2044 } 2045 2046 for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) { 2047 connectionConsumer.clearMessagesInProgress(transportInterruptionProcessingComplete); 2048 } 2049 2050 if (transportInterruptionProcessingComplete.decrementAndGet() > 0) { 2051 if (LOG.isDebugEnabled()) { 2052 LOG.debug("transport interrupted - processing required, dispatchers: " + transportInterruptionProcessingComplete.get()); 2053 } 2054 signalInterruptionProcessingNeeded(); 2055 } 2056 2057 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) { 2058 TransportListener listener = iter.next(); 2059 listener.transportInterupted(); 2060 } 2061 } 2062 2063 @Override 2064 public void transportResumed() { 2065 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) { 2066 TransportListener listener = iter.next(); 2067 listener.transportResumed(); 2068 } 2069 } 2070 2071 /** 2072 * Create the DestinationInfo object for the temporary destination. 2073 * 2074 * @param topic - if its true topic, else queue. 2075 * @return DestinationInfo 2076 * @throws JMSException 2077 */ 2078 protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException { 2079 2080 // Check if Destination info is of temporary type. 2081 ActiveMQTempDestination dest; 2082 if (topic) { 2083 dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId()); 2084 } else { 2085 dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId()); 2086 } 2087 2088 DestinationInfo info = new DestinationInfo(); 2089 info.setConnectionId(this.info.getConnectionId()); 2090 info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE); 2091 info.setDestination(dest); 2092 syncSendPacket(info); 2093 2094 dest.setConnection(this); 2095 activeTempDestinations.put(dest, dest); 2096 return dest; 2097 } 2098 2099 /** 2100 * @param destination 2101 * @throws JMSException 2102 */ 2103 public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException { 2104 2105 checkClosedOrFailed(); 2106 2107 for (ActiveMQSession session : this.sessions) { 2108 if (session.isInUse(destination)) { 2109 throw new JMSException("A consumer is consuming from the temporary destination"); 2110 } 2111 } 2112 2113 activeTempDestinations.remove(destination); 2114 2115 DestinationInfo destInfo = new DestinationInfo(); 2116 destInfo.setConnectionId(this.info.getConnectionId()); 2117 destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 2118 destInfo.setDestination(destination); 2119 destInfo.setTimeout(0); 2120 syncSendPacket(destInfo); 2121 } 2122 2123 public boolean isDeleted(ActiveMQDestination dest) { 2124 2125 // If we are not watching the advisories.. then 2126 // we will assume that the temp destination does exist. 2127 if (advisoryConsumer == null) { 2128 return false; 2129 } 2130 2131 return !activeTempDestinations.contains(dest); 2132 } 2133 2134 public boolean isCopyMessageOnSend() { 2135 return copyMessageOnSend; 2136 } 2137 2138 public LongSequenceGenerator getLocalTransactionIdGenerator() { 2139 return localTransactionIdGenerator; 2140 } 2141 2142 public boolean isUseCompression() { 2143 return useCompression; 2144 } 2145 2146 /** 2147 * Enables the use of compression of the message bodies 2148 */ 2149 public void setUseCompression(boolean useCompression) { 2150 this.useCompression = useCompression; 2151 } 2152 2153 public void destroyDestination(ActiveMQDestination destination) throws JMSException { 2154 2155 checkClosedOrFailed(); 2156 ensureConnectionInfoSent(); 2157 2158 DestinationInfo info = new DestinationInfo(); 2159 info.setConnectionId(this.info.getConnectionId()); 2160 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 2161 info.setDestination(destination); 2162 info.setTimeout(0); 2163 syncSendPacket(info); 2164 } 2165 2166 public boolean isDispatchAsync() { 2167 return dispatchAsync; 2168 } 2169 2170 /** 2171 * Enables or disables the default setting of whether or not consumers have 2172 * their messages <a 2173 * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched 2174 * synchronously or asynchronously by the broker</a>. For non-durable 2175 * topics for example we typically dispatch synchronously by default to 2176 * minimize context switches which boost performance. However sometimes its 2177 * better to go slower to ensure that a single blocked consumer socket does 2178 * not block delivery to other consumers. 2179 * 2180 * @param asyncDispatch If true then consumers created on this connection 2181 * will default to having their messages dispatched 2182 * asynchronously. The default value is true. 2183 */ 2184 public void setDispatchAsync(boolean asyncDispatch) { 2185 this.dispatchAsync = asyncDispatch; 2186 } 2187 2188 public boolean isObjectMessageSerializationDefered() { 2189 return objectMessageSerializationDefered; 2190 } 2191 2192 /** 2193 * When an object is set on an ObjectMessage, the JMS spec requires the 2194 * object to be serialized by that set method. Enabling this flag causes the 2195 * object to not get serialized. The object may subsequently get serialized 2196 * if the message needs to be sent over a socket or stored to disk. 2197 */ 2198 public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) { 2199 this.objectMessageSerializationDefered = objectMessageSerializationDefered; 2200 } 2201 2202 @Override 2203 @Deprecated 2204 public InputStream createInputStream(Destination dest) throws JMSException { 2205 return createInputStream(dest, null); 2206 } 2207 2208 @Override 2209 @Deprecated 2210 public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException { 2211 return createInputStream(dest, messageSelector, false); 2212 } 2213 2214 @Override 2215 @Deprecated 2216 public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException { 2217 return createInputStream(dest, messageSelector, noLocal, -1); 2218 } 2219 2220 @Override 2221 @Deprecated 2222 public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException { 2223 return doCreateInputStream(dest, messageSelector, noLocal, null, timeout); 2224 } 2225 2226 @Override 2227 @Deprecated 2228 public InputStream createDurableInputStream(Topic dest, String name) throws JMSException { 2229 return createInputStream(dest, null, false); 2230 } 2231 2232 @Override 2233 @Deprecated 2234 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException { 2235 return createDurableInputStream(dest, name, messageSelector, false); 2236 } 2237 2238 @Override 2239 @Deprecated 2240 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException { 2241 return createDurableInputStream(dest, name, messageSelector, noLocal, -1); 2242 } 2243 2244 @Override 2245 @Deprecated 2246 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException { 2247 return doCreateInputStream(dest, messageSelector, noLocal, name, timeout); 2248 } 2249 2250 @Deprecated 2251 private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException { 2252 checkClosedOrFailed(); 2253 ensureConnectionInfoSent(); 2254 return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch(), timeout); 2255 } 2256 2257 /** 2258 * Creates a persistent output stream; individual messages will be written 2259 * to disk/database by the broker 2260 */ 2261 @Override 2262 @Deprecated 2263 public OutputStream createOutputStream(Destination dest) throws JMSException { 2264 return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE); 2265 } 2266 2267 /** 2268 * Creates a non persistent output stream; messages will not be written to 2269 * disk 2270 */ 2271 @Deprecated 2272 public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException { 2273 return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE); 2274 } 2275 2276 /** 2277 * Creates an output stream allowing full control over the delivery mode, 2278 * the priority and time to live of the messages and the properties added to 2279 * messages on the stream. 2280 * 2281 * @param streamProperties defines a map of key-value pairs where the keys 2282 * are strings and the values are primitive values (numbers 2283 * and strings) which are appended to the messages similarly 2284 * to using the 2285 * {@link javax.jms.Message#setObjectProperty(String, Object)} 2286 * method 2287 */ 2288 @Override 2289 @Deprecated 2290 public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException { 2291 checkClosedOrFailed(); 2292 ensureConnectionInfoSent(); 2293 return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive); 2294 } 2295 2296 /** 2297 * Unsubscribes a durable subscription that has been created by a client. 2298 * <P> 2299 * This method deletes the state being maintained on behalf of the 2300 * subscriber by its provider. 2301 * <P> 2302 * It is erroneous for a client to delete a durable subscription while there 2303 * is an active <CODE>MessageConsumer </CODE> or 2304 * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed 2305 * message is part of a pending transaction or has not been acknowledged in 2306 * the session. 2307 * 2308 * @param name the name used to identify this subscription 2309 * @throws JMSException if the session fails to unsubscribe to the durable 2310 * subscription due to some internal error. 2311 * @throws InvalidDestinationException if an invalid subscription name is 2312 * specified. 2313 * @since 1.1 2314 */ 2315 @Override 2316 public void unsubscribe(String name) throws InvalidDestinationException, JMSException { 2317 checkClosedOrFailed(); 2318 RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); 2319 rsi.setConnectionId(getConnectionInfo().getConnectionId()); 2320 rsi.setSubscriptionName(name); 2321 rsi.setClientId(getConnectionInfo().getClientId()); 2322 syncSendPacket(rsi); 2323 } 2324 2325 /** 2326 * Internal send method optimized: - It does not copy the message - It can 2327 * only handle ActiveMQ messages. - You can specify if the send is async or 2328 * sync - Does not allow you to send /w a transaction. 2329 */ 2330 void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException { 2331 checkClosedOrFailed(); 2332 2333 if (destination.isTemporary() && isDeleted(destination)) { 2334 throw new JMSException("Cannot publish to a deleted Destination: " + destination); 2335 } 2336 2337 msg.setJMSDestination(destination); 2338 msg.setJMSDeliveryMode(deliveryMode); 2339 long expiration = 0L; 2340 2341 if (!isDisableTimeStampsByDefault()) { 2342 long timeStamp = System.currentTimeMillis(); 2343 msg.setJMSTimestamp(timeStamp); 2344 if (timeToLive > 0) { 2345 expiration = timeToLive + timeStamp; 2346 } 2347 } 2348 2349 msg.setJMSExpiration(expiration); 2350 msg.setJMSPriority(priority); 2351 msg.setJMSRedelivered(false); 2352 msg.setMessageId(messageId); 2353 msg.onSend(); 2354 msg.setProducerId(msg.getMessageId().getProducerId()); 2355 2356 if (LOG.isDebugEnabled()) { 2357 LOG.debug("Sending message: " + msg); 2358 } 2359 2360 if (async) { 2361 asyncSendPacket(msg); 2362 } else { 2363 syncSendPacket(msg); 2364 } 2365 } 2366 2367 @Deprecated 2368 public void addOutputStream(ActiveMQOutputStream stream) { 2369 outputStreams.add(stream); 2370 } 2371 2372 @Deprecated 2373 public void removeOutputStream(ActiveMQOutputStream stream) { 2374 outputStreams.remove(stream); 2375 } 2376 2377 @Deprecated 2378 public void addInputStream(ActiveMQInputStream stream) { 2379 inputStreams.add(stream); 2380 } 2381 2382 @Deprecated 2383 public void removeInputStream(ActiveMQInputStream stream) { 2384 inputStreams.remove(stream); 2385 } 2386 2387 protected void onControlCommand(ControlCommand command) { 2388 String text = command.getCommand(); 2389 if (text != null) { 2390 if ("shutdown".equals(text)) { 2391 LOG.info("JVM told to shutdown"); 2392 System.exit(0); 2393 } 2394 2395 // TODO Should we handle the "close" case? 2396 // if (false && "close".equals(text)){ 2397 // LOG.error("Broker " + getBrokerInfo() + "shutdown connection"); 2398 // try { 2399 // close(); 2400 // } catch (JMSException e) { 2401 // } 2402 // } 2403 } 2404 } 2405 2406 protected void onConnectionControl(ConnectionControl command) { 2407 if (command.isFaultTolerant()) { 2408 this.optimizeAcknowledge = false; 2409 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 2410 ActiveMQSession s = i.next(); 2411 s.setOptimizeAcknowledge(false); 2412 } 2413 } 2414 } 2415 2416 protected void onConsumerControl(ConsumerControl command) { 2417 if (command.isClose()) { 2418 for (ActiveMQSession session : this.sessions) { 2419 session.close(command.getConsumerId()); 2420 } 2421 } else { 2422 for (ActiveMQSession session : this.sessions) { 2423 session.setPrefetchSize(command.getConsumerId(), command.getPrefetch()); 2424 } 2425 for (ActiveMQConnectionConsumer connectionConsumer: connectionConsumers) { 2426 ConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo(); 2427 if (consumerInfo.getConsumerId().equals(command.getConsumerId())) { 2428 consumerInfo.setPrefetchSize(command.getPrefetch()); 2429 } 2430 } 2431 } 2432 } 2433 2434 protected void transportFailed(IOException error) { 2435 transportFailed.set(true); 2436 if (firstFailureError == null) { 2437 firstFailureError = error; 2438 } 2439 } 2440 2441 /** 2442 * Should a JMS message be copied to a new JMS Message object as part of the 2443 * send() method in JMS. This is enabled by default to be compliant with the 2444 * JMS specification. You can disable it if you do not mutate JMS messages 2445 * after they are sent for a performance boost 2446 */ 2447 public void setCopyMessageOnSend(boolean copyMessageOnSend) { 2448 this.copyMessageOnSend = copyMessageOnSend; 2449 } 2450 2451 @Override 2452 public String toString() { 2453 return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}"; 2454 } 2455 2456 protected BlobTransferPolicy createBlobTransferPolicy() { 2457 return new BlobTransferPolicy(); 2458 } 2459 2460 public int getProtocolVersion() { 2461 return protocolVersion.get(); 2462 } 2463 2464 public int getProducerWindowSize() { 2465 return producerWindowSize; 2466 } 2467 2468 public void setProducerWindowSize(int producerWindowSize) { 2469 this.producerWindowSize = producerWindowSize; 2470 } 2471 2472 public void setAuditDepth(int auditDepth) { 2473 connectionAudit.setAuditDepth(auditDepth); 2474 } 2475 2476 public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) { 2477 connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber); 2478 } 2479 2480 protected void removeDispatcher(ActiveMQDispatcher dispatcher) { 2481 connectionAudit.removeDispatcher(dispatcher); 2482 } 2483 2484 protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) { 2485 return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message); 2486 } 2487 2488 protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) { 2489 connectionAudit.rollbackDuplicate(dispatcher, message); 2490 } 2491 2492 public IOException getFirstFailureError() { 2493 return firstFailureError; 2494 } 2495 2496 protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException { 2497 if (!closed.get() && !transportFailed.get() && transportInterruptionProcessingComplete.get()>0) { 2498 LOG.warn("dispatch with outstanding dispatch interruption processing count " + transportInterruptionProcessingComplete.get()); 2499 signalInterruptionProcessingComplete(); 2500 } 2501 } 2502 2503 protected void transportInterruptionProcessingComplete() { 2504 if (transportInterruptionProcessingComplete.decrementAndGet() == 0) { 2505 signalInterruptionProcessingComplete(); 2506 } 2507 } 2508 2509 private void signalInterruptionProcessingComplete() { 2510 if (LOG.isDebugEnabled()) { 2511 LOG.debug("transportInterruptionProcessingComplete: " + transportInterruptionProcessingComplete.get() 2512 + " for:" + this.getConnectionInfo().getConnectionId()); 2513 } 2514 2515 FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class); 2516 if (failoverTransport != null) { 2517 failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId()); 2518 if (LOG.isDebugEnabled()) { 2519 LOG.debug("notified failover transport (" + failoverTransport 2520 + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId()); 2521 } 2522 } 2523 transportInterruptionProcessingComplete.set(0); 2524 } 2525 2526 private void signalInterruptionProcessingNeeded() { 2527 FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class); 2528 if (failoverTransport != null) { 2529 failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId()); 2530 if (LOG.isDebugEnabled()) { 2531 LOG.debug("notified failover transport (" + failoverTransport 2532 + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId()); 2533 } 2534 } 2535 } 2536 2537 /* 2538 * specify the amount of time in milliseconds that a consumer with a transaction pending recovery 2539 * will wait to receive re dispatched messages. 2540 * default value is 0 so there is no wait by default. 2541 */ 2542 public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) { 2543 this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod; 2544 } 2545 2546 public long getConsumerFailoverRedeliveryWaitPeriod() { 2547 return consumerFailoverRedeliveryWaitPeriod; 2548 } 2549 2550 protected Scheduler getScheduler() throws JMSException { 2551 Scheduler result = scheduler; 2552 if (result == null) { 2553 if (isClosing() || isClosed()) { 2554 // without lock contention report the closing state 2555 throw new ConnectionClosedException(); 2556 } 2557 synchronized (this) { 2558 result = scheduler; 2559 if (result == null) { 2560 checkClosed(); 2561 try { 2562 result = new Scheduler("ActiveMQConnection["+info.getConnectionId().getValue()+"] Scheduler"); 2563 result.start(); 2564 scheduler = result; 2565 } catch(Exception e) { 2566 throw JMSExceptionSupport.create(e); 2567 } 2568 } 2569 } 2570 } 2571 return result; 2572 } 2573 2574 protected ThreadPoolExecutor getExecutor() { 2575 return this.executor; 2576 } 2577 2578 protected CopyOnWriteArrayList<ActiveMQSession> getSessions() { 2579 return sessions; 2580 } 2581 2582 /** 2583 * @return the checkForDuplicates 2584 */ 2585 public boolean isCheckForDuplicates() { 2586 return this.checkForDuplicates; 2587 } 2588 2589 /** 2590 * @param checkForDuplicates the checkForDuplicates to set 2591 */ 2592 public void setCheckForDuplicates(boolean checkForDuplicates) { 2593 this.checkForDuplicates = checkForDuplicates; 2594 } 2595 2596 public boolean isTransactedIndividualAck() { 2597 return transactedIndividualAck; 2598 } 2599 2600 public void setTransactedIndividualAck(boolean transactedIndividualAck) { 2601 this.transactedIndividualAck = transactedIndividualAck; 2602 } 2603 2604 public boolean isNonBlockingRedelivery() { 2605 return nonBlockingRedelivery; 2606 } 2607 2608 public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) { 2609 this.nonBlockingRedelivery = nonBlockingRedelivery; 2610 } 2611 2612 public boolean isRmIdFromConnectionId() { 2613 return rmIdFromConnectionId; 2614 } 2615 2616 public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) { 2617 this.rmIdFromConnectionId = rmIdFromConnectionId; 2618 } 2619 2620 /** 2621 * Removes any TempDestinations that this connection has cached, ignoring 2622 * any exceptions generated because the destination is in use as they should 2623 * not be removed. 2624 * Used from a pooled connection, b/c it will not be explicitly closed. 2625 */ 2626 public void cleanUpTempDestinations() { 2627 2628 if (this.activeTempDestinations == null || this.activeTempDestinations.isEmpty()) { 2629 return; 2630 } 2631 2632 Iterator<ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination>> entries 2633 = this.activeTempDestinations.entrySet().iterator(); 2634 while(entries.hasNext()) { 2635 ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination> entry = entries.next(); 2636 try { 2637 // Only delete this temp destination if it was created from this connection. The connection used 2638 // for the advisory consumer may also have a reference to this temp destination. 2639 ActiveMQTempDestination dest = entry.getValue(); 2640 String thisConnectionId = (info.getConnectionId() == null) ? "" : info.getConnectionId().toString(); 2641 if (dest.getConnectionId() != null && dest.getConnectionId().equals(thisConnectionId)) { 2642 this.deleteTempDestination(entry.getValue()); 2643 } 2644 } catch (Exception ex) { 2645 // the temp dest is in use so it can not be deleted. 2646 // it is ok to leave it to connection tear down phase 2647 } 2648 } 2649 } 2650 2651 /** 2652 * Sets the Connection wide RedeliveryPolicyMap for handling messages that are being rolled back. 2653 * @param redeliveryPolicyMap the redeliveryPolicyMap to set 2654 */ 2655 public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) { 2656 this.redeliveryPolicyMap = redeliveryPolicyMap; 2657 } 2658 2659 /** 2660 * Gets the Connection's configured RedeliveryPolicyMap which will be used by all the 2661 * Consumers when dealing with transaction messages that have been rolled back. 2662 * 2663 * @return the redeliveryPolicyMap 2664 */ 2665 public RedeliveryPolicyMap getRedeliveryPolicyMap() { 2666 return redeliveryPolicyMap; 2667 } 2668 2669 public int getMaxThreadPoolSize() { 2670 return maxThreadPoolSize; 2671 } 2672 2673 public void setMaxThreadPoolSize(int maxThreadPoolSize) { 2674 this.maxThreadPoolSize = maxThreadPoolSize; 2675 } 2676 2677 /** 2678 * Enable enforcement of QueueConnection semantics. 2679 * 2680 * @return this object, useful for chaining 2681 */ 2682 ActiveMQConnection enforceQueueOnlyConnection() { 2683 this.queueOnlyConnection = true; 2684 return this; 2685 } 2686 2687 public RejectedExecutionHandler getRejectedTaskHandler() { 2688 return rejectedTaskHandler; 2689 } 2690 2691 public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) { 2692 this.rejectedTaskHandler = rejectedTaskHandler; 2693 } 2694 2695 /** 2696 * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled 2697 * to send an ack for any outstanding Message Acks. By default this value is set to zero meaning that the consumers 2698 * will not do any background Message acknowledgment. 2699 * 2700 * @return the scheduledOptimizedAckInterval 2701 */ 2702 public long getOptimizedAckScheduledAckInterval() { 2703 return optimizedAckScheduledAckInterval; 2704 } 2705 2706 /** 2707 * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that 2708 * have been configured with optimizeAcknowledge enabled. 2709 * 2710 * @param optimizedAckScheduledAckInterval the scheduledOptimizedAckInterval to set 2711 */ 2712 public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) { 2713 this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval; 2714 } 2715 2716 /** 2717 * @return true if MessageConsumer instance will check for expired messages before dispatch. 2718 */ 2719 public boolean isConsumerExpiryCheckEnabled() { 2720 return consumerExpiryCheckEnabled; 2721 } 2722 2723 /** 2724 * Controls whether message expiration checking is done in each MessageConsumer 2725 * prior to dispatching a message. Disabling this check can lead to consumption 2726 * of expired messages. 2727 * 2728 * @param consumerExpiryCheckEnabled 2729 * controls whether expiration checking is done prior to dispatch. 2730 */ 2731 public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) { 2732 this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled; 2733 } 2734}