View Javadoc

1   /***
2    *
3    * Copyright 2004 Protique Ltd
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  
19  package org.codehaus.activemq.broker.impl;
20  
21  import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
22  import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.codehaus.activemq.broker.Broker;
26  import org.codehaus.activemq.broker.BrokerClient;
27  import org.codehaus.activemq.broker.BrokerConnector;
28  import org.codehaus.activemq.broker.BrokerContainer;
29  import org.codehaus.activemq.broker.BrokerContext;
30  import org.codehaus.activemq.capacity.CapacityMonitorEvent;
31  import org.codehaus.activemq.capacity.CapacityMonitorEventListener;
32  import org.codehaus.activemq.message.*;
33  import org.codehaus.activemq.security.SecurityAdapter;
34  import org.codehaus.activemq.service.Service;
35  import org.codehaus.activemq.service.RedeliveryPolicy;
36  import org.codehaus.activemq.store.PersistenceAdapter;
37  import org.codehaus.activemq.transport.DiscoveryAgent;
38  import org.codehaus.activemq.transport.NetworkConnector;
39  import org.codehaus.activemq.transport.TransportServerChannel;
40  import org.codehaus.activemq.util.IdGenerator;
41  
42  import javax.jms.InvalidClientIDException;
43  import javax.jms.InvalidDestinationException;
44  import javax.jms.JMSException;
45  import javax.jms.JMSSecurityException;
46  import javax.transaction.xa.XAException;
47  import java.util.ArrayList;
48  import java.util.HashMap;
49  import java.util.Iterator;
50  import java.util.List;
51  import java.util.Map;
52  
53  /***
54   * Represents the ActiveMQ JMS Broker which typically has one or many connectors
55   *
56   * @version $Revision: 1.34 $
57   */
58  public class BrokerContainerImpl implements BrokerContainer, CapacityMonitorEventListener {
59      private static final Log log = LogFactory.getLog(BrokerContainerImpl.class);
60  
61      private BrokerContext context;
62      private Broker broker;
63      private Map clientIds;
64      private Map consumerInfos;
65      private Map producerInfos;
66      private List transportConnectors;
67      private Thread shutdownHook;
68      private boolean stopped;
69      private List networkConnectors;
70      private DiscoveryAgent discoveryAgent;
71      private Map localDiscoveryDetails;
72  
73  
74      public BrokerContainerImpl() {
75          this(new IdGenerator().generateId());
76      }
77  
78      public BrokerContainerImpl(String brokerName) {
79          this(brokerName, BrokerContext.getInstance());
80      }
81  
82      public BrokerContainerImpl(String brokerName, PersistenceAdapter persistenceAdapter) {
83          this(brokerName, persistenceAdapter, BrokerContext.getInstance());
84      }
85  
86      public BrokerContainerImpl(String brokerName, BrokerContext context) {
87          this(new DefaultBroker(brokerName), context);
88      }
89  
90      public BrokerContainerImpl(String brokerName, PersistenceAdapter persistenceAdapter, BrokerContext context) {
91          this(new DefaultBroker(brokerName, persistenceAdapter), context);
92      }
93  
94      /***
95       * @param broker
96       */
97      public BrokerContainerImpl(Broker broker, BrokerContext context) {
98          this.broker = broker;
99          this.context = context;
100         this.clientIds = new ConcurrentHashMap();
101         this.consumerInfos = new ConcurrentHashMap();
102         this.producerInfos = new ConcurrentHashMap();
103         this.transportConnectors = new CopyOnWriteArrayList();
104         this.networkConnectors = new CopyOnWriteArrayList();
105         this.broker.addCapacityEventListener(this);
106 
107         // lets register ourselves with the context
108         context.registerContainer(broker.getBrokerName(), this);
109     }
110 
111     /***
112      * start the Container
113      *
114      * @throws JMSException
115      */
116     public void start() throws JMSException {
117         log.info("ActiveMQ JMS Message Broker (" + broker.getBrokerName() + ") is starting");
118         log.info("For help or more information please see: www.protique.com");
119         broker.start();
120         addShutdownHook();
121 
122         // TODO we might not need to copy the collections, as maybe the List might not
123         // throw concurrent modification exception? Couldn't tell from the docs
124         // but I don't think it does
125 
126         for (Iterator iter = new ArrayList(networkConnectors).iterator(); iter.hasNext();) {
127             Service connector = (Service) iter.next();
128             connector.start();
129         }
130 
131         for (Iterator iter = new ArrayList(transportConnectors).iterator(); iter.hasNext();) {
132             Service connector = (Service) iter.next();
133             connector.start();
134         }
135 
136         if (discoveryAgent != null) {
137             discoveryAgent.start();
138 
139             localDiscoveryDetails = createLocalDiscoveryDetails();
140             discoveryAgent.registerService(getLocalBrokerName(), localDiscoveryDetails);
141         }
142 
143         log.info("ActiveMQ JMS Message Broker (" + broker.getBrokerName() + ") has started");
144     }
145 
146     /***
147      * Stop the Container
148      *
149      * @throws JMSException
150      */
151     public synchronized void stop() throws JMSException {
152         if (!stopped) {
153             stopped = true;
154 
155             log.info("ActiveMQ Message Broker (" + broker.getBrokerName() + ") is shutting down");
156 
157             context.deregisterContainer(broker.getBrokerName(), this);
158 
159             try {
160                 Runtime.getRuntime().removeShutdownHook(shutdownHook);
161             }
162             catch (Exception e) {
163                 log.debug("Caught exception, must be shutting down: " + e);
164             }
165 
166             JMSException firstException = null;
167 
168             for (Iterator iter = new ArrayList(transportConnectors).iterator(); iter.hasNext();) {
169                 Service connector = (Service) iter.next();
170                 try {
171                     connector.stop();
172                 }
173                 catch (JMSException e) {
174                     if (firstException == null) {
175                         firstException = e;
176                     }
177                     log.warn("Could not close transport connector: " + connector + " due to: " + e, e);
178                 }
179             }
180             transportConnectors.clear();
181 
182             for (Iterator iter = new ArrayList(networkConnectors).iterator(); iter.hasNext();) {
183                 Service connector = (Service) iter.next();
184                 try {
185                     connector.stop();
186                 }
187                 catch (JMSException e) {
188                     if (firstException == null) {
189                         firstException = e;
190                     }
191                     log.warn("Could not close network connector: " + connector + " due to: " + e, e);
192                 }
193             }
194             networkConnectors.clear();
195 
196             
197 
198             // lets close all the channels
199             // note that this Map implementation does not throw concurrent modification exception
200             for (Iterator iter = clientIds.values().iterator(); iter.hasNext();) {
201                 // should remove clients from parent container?
202                 BrokerClient client = (BrokerClient) iter.next();
203                 try {
204                     client.stop();
205                 }
206                 catch (JMSException e) {
207                     if (firstException == null) {
208                         firstException = e;
209                     }
210                     log.warn("Could not close client: " + client + " due to: " + e, e);
211                 }
212             }
213             clientIds.clear();
214 
215             broker.removeCapacityEventListener(this);
216             broker.stop();
217 
218             log.info("ActiveMQ JMS Message Broker (" + broker.getBrokerName() + ") stopped");
219 
220             if (firstException != null) {
221                 throw firstException;
222             }
223         }
224     }
225 
226     /***
227      * registers a new Connection
228      *
229      * @param client
230      * @param info   infomation about the client-side Connection
231      * @throws InvalidClientIDException if the ClientID of the Connection is a duplicate
232      */
233     public void registerConnection(BrokerClient client, ConnectionInfo info) throws JMSException {
234         String clientId = info.getClientId();
235         if (clientIds.containsKey(clientId)) {
236             throw new InvalidClientIDException("Duplicate clientId: " + info);
237         }
238         getBroker().addClient(client, info);
239         log.info("Adding new client: " + clientId + " on transport: " + client.getChannel());
240         clientIds.put(clientId, client);
241     }
242 
243     /***
244      * un-registers a Connection
245      *
246      * @param client
247      * @param info   infomation about the client-side Connection
248      * @throws JMSException
249      */
250     public void deregisterConnection(BrokerClient client, ConnectionInfo info) throws JMSException {
251         String clientId = client.getClientID();
252         if (clientId != null) {
253             Object answer = clientIds.remove(clientId);
254             if (answer != null) {
255                 log.info("Removing client: " + clientId + " on transport: " + client.getChannel());
256                 getBroker().removeClient(client, info);
257             }
258             else {
259                 log.warn("Got duplicate deregisterConnection for client: " + clientId);
260             }
261         }
262         else {
263             log.warn("No clientID available for client: " + client);
264         }
265     }
266 
267     /***
268      * Registers a MessageConsumer
269      *
270      * @param client
271      * @param info
272      * @throws JMSException
273      * @throws JMSSecurityException if client authentication fails for the Destination the Consumer applies for
274      */
275     public void registerMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
276         consumerInfos.put(info, client);
277         getBroker().addMessageConsumer(client, info);
278     }
279 
280     /***
281      * De-register a MessageConsumer from the Broker
282      *
283      * @param client
284      * @param info
285      * @throws JMSException
286      */
287     public void deregisterMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
288         consumerInfos.remove(info);
289         getBroker().removeMessageConsumer(client, info);
290     }
291 
292     /***
293      * Registers a MessageProducer
294      *
295      * @param client
296      * @param info
297      * @throws JMSException
298      * @throws JMSSecurityException if client authentication fails for the Destination the Consumer applies for
299      */
300     public void registerMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
301         ActiveMQDestination dest = info.getDestination();
302         if (dest != null && dest.isTemporary()) {
303             //check to see if the client that is the target for the temporary destination still exists
304             String clientId = ActiveMQDestination.getClientId(dest);
305             if (clientId == null) {
306                 throw new InvalidDestinationException("Destination " + dest.getPhysicalName()
307                         + " is a temporary destination with null clientId");
308             }
309             if (!clientIds.containsKey(clientId)) {
310                 throw new InvalidDestinationException("Destination " + dest.getPhysicalName()
311                         + " is no longer valid because the client " + clientId + " no longer exists");
312             }
313         }
314         getBroker().addMessageProducer(client, info);
315 
316         producerInfos.put(info, client);
317     }
318 
319     /***
320      * De-register a MessageProducer from the Broker
321      *
322      * @param client
323      * @param info
324      * @throws JMSException
325      */
326     public void deregisterMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
327         getBroker().removeMessageProducer(client, info);
328 
329         producerInfos.remove(info);
330     }
331 
332     /***
333      * Register a client-side Session (used for Monitoring)
334      *
335      * @param client
336      * @param info
337      * @throws JMSException
338      */
339     public void registerSession(BrokerClient client, SessionInfo info) throws JMSException {
340     }
341 
342     /***
343      * De-register a client-side Session from the Broker (used for monitoring)
344      *
345      * @param client
346      * @param info
347      * @throws JMSException
348      */
349     public void deregisterSession(BrokerClient client, SessionInfo info) throws JMSException {
350     }
351 
352     /***
353      * Start a transaction from the Client session
354      *
355      * @param client
356      * @param transactionId
357      * @throws JMSException
358      */
359     public void startTransaction(BrokerClient client, String transactionId) throws JMSException {
360         getBroker().startTransaction(client, transactionId);
361     }
362 
363     /***
364      * Rollback a transacton
365      *
366      * @param client
367      * @param transactionId
368      * @throws JMSException
369      */
370     public void rollbackTransaction(BrokerClient client, String transactionId) throws JMSException {
371         getBroker().rollbackTransaction(client, transactionId);
372     }
373 
374     /***
375      * Commit a transaction
376      *
377      * @param client
378      * @param transactionId
379      * @throws JMSException
380      */
381     public void commitTransaction(BrokerClient client, String transactionId) throws JMSException {
382         getBroker().commitTransaction(client, transactionId);
383     }
384 
385     /***
386      * send message with a transaction context
387      *
388      * @param client
389      * @param transactionId
390      * @param message
391      * @throws JMSException
392      */
393     public void sendTransactedMessage(BrokerClient client, String transactionId, ActiveMQMessage message)
394             throws JMSException {
395         getBroker().sendTransactedMessage(client, transactionId, message);
396     }
397 
398     /***
399      * Acknowledge receipt of a message within a transaction context
400      *
401      * @param client
402      * @param transactionId
403      * @param ack
404      * @throws JMSException
405      */
406     public void acknowledgeTransactedMessage(BrokerClient client, String transactionId, MessageAck ack)
407             throws JMSException {
408         getBroker().acknowledgeTransactedMessage(client, transactionId, ack);
409     }
410 
411     /***
412      * Send a non-transacted message to the Broker
413      *
414      * @param client
415      * @param message
416      * @throws JMSException
417      */
418     public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
419         getBroker().sendMessage(client, message);
420     }
421 
422     /***
423      * Acknowledge reciept of a message
424      *
425      * @param client
426      * @param ack
427      * @throws JMSException
428      */
429     public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
430         getBroker().acknowledgeMessage(client, ack);
431     }
432 
433     /***
434      * Command to delete a durable topic subscription
435      *
436      * @param client
437      * @param ds
438      * @throws JMSException
439      */
440     public void durableUnsubscribe(BrokerClient client, DurableUnsubscribe ds) throws JMSException {
441         getBroker().deleteSubscription(ds.getClientId(), ds.getSubscriberName());
442     }
443 
444     /***
445      * Start an XA transaction.
446      *
447      * @param client
448      * @param xid
449      */
450     public void startTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
451         getBroker().startTransaction(client, xid);
452     }
453 
454     /***
455      * Gets the prepared XA transactions.
456      *
457      * @param client
458      * @return
459      */
460     public ActiveMQXid[] getPreparedTransactions(BrokerClient client) throws XAException {
461         return getBroker().getPreparedTransactions(client);
462     }
463 
464     /***
465      * Prepare an XA transaction.
466      *
467      * @param client
468      * @param xid
469      */
470     public int prepareTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
471         return getBroker().prepareTransaction(client, xid);
472     }
473 
474     /***
475      * Rollback an XA transaction.
476      *
477      * @param client
478      * @param xid
479      */
480     public void rollbackTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
481         getBroker().rollbackTransaction(client, xid);
482     }
483 
484     /***
485      * Commit an XA transaction.
486      *
487      * @param client
488      * @param xid
489      * @param onePhase
490      */
491     public void commitTransaction(BrokerClient client, ActiveMQXid xid, boolean onePhase) throws XAException {
492         getBroker().commitTransaction(client, xid, onePhase);
493     }
494 
495 
496     /***
497      * Update any message producers about our capacity to handle messages
498      *
499      * @param event
500      */
501     public void capacityChanged(CapacityMonitorEvent event) {
502         //only send to producers
503         for (Iterator i = producerInfos.values().iterator(); i.hasNext();) {
504             BrokerClient client = (BrokerClient) i.next();
505             client.updateBrokerCapacity(event.getCapacity());
506         }
507     }
508 
509 
510     // Properties
511     //-------------------------------------------------------------------------
512 
513     public List getTransportConnectors() {
514         return transportConnectors;
515     }
516 
517     public void setTransportConnectors(List transportConnectors) {
518         this.transportConnectors = transportConnectors;
519     }
520 
521     public void addConnector(BrokerConnector connector) {
522         transportConnectors.add(connector);
523         context.registerConnector(connector.getServerChannel().getUrl(), connector);
524     }
525 
526     public void removeConnector(BrokerConnector connector) {
527         transportConnectors.remove(connector);
528         context.deregisterConnector(connector.getServerChannel().getUrl());
529     }
530 
531 
532     public void addConnector(String bindAddress) throws JMSException {
533         addConnector(bindAddress, new DefaultWireFormat());
534     }
535 
536     public void addConnector(String bindAddress, WireFormat wireFormat) throws JMSException {
537         addConnector(new BrokerConnectorImpl(this, bindAddress, wireFormat));
538     }
539 
540     public void addConnector(TransportServerChannel transportConnector) {
541         addConnector(new BrokerConnectorImpl(this, transportConnector));
542     }
543 
544     public List getNetworkConnectors() {
545         return networkConnectors;
546     }
547 
548     public void setNetworkConnectors(List networkConnectors) {
549         this.networkConnectors = networkConnectors;
550     }
551 
552     public NetworkConnector addNetworkConnector(String uri) {
553         NetworkConnector connector = addNetworkConnector();
554         connector.addNetworkChannel(uri);
555         return connector;
556     }
557 
558     public NetworkConnector addNetworkConnector() {
559         NetworkConnector connector = new NetworkConnector(this);
560         addNetworkConnector(connector);
561         return connector;
562     }
563 
564     public void addNetworkConnector(NetworkConnector connector) {
565         networkConnectors.add(connector);
566     }
567 
568     public void removeNetworkConnector(NetworkConnector connector) {
569         networkConnectors.remove(connector);
570     }
571 
572 
573     public Broker getBroker() {
574         return broker;
575     }
576 
577     public PersistenceAdapter getPersistenceAdapter() {
578         return broker != null ? broker.getPersistenceAdapter() : null;
579     }
580 
581     public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) {
582         checkBrokerSet();
583         broker.setPersistenceAdapter(persistenceAdapter);
584     }
585 
586     public DiscoveryAgent getDiscoveryAgent() {
587         return discoveryAgent;
588     }
589 
590     public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) {
591         this.discoveryAgent = discoveryAgent;
592     }
593 
594     public SecurityAdapter getSecurityAdapter() {
595         return broker != null ? broker.getSecurityAdapter() : null;
596     }
597 
598     public void setSecurityAdapter(SecurityAdapter securityAdapter) {
599         checkBrokerSet();
600         broker.setSecurityAdapter(securityAdapter);
601     }
602 
603     public RedeliveryPolicy getRedeliveryPolicy() {
604         return broker != null ? broker.getRedeliveryPolicy() : null;
605     }
606 
607     public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
608         checkBrokerSet();
609         broker.setRedeliveryPolicy(redeliveryPolicy);
610     }
611 
612     // Implementation methods
613     //-------------------------------------------------------------------------
614 
615     protected void checkBrokerSet() {
616         if (broker == null) {
617             throw new IllegalStateException("Cannot set this property as we don't have a broker yet");
618         }
619     }
620 
621     protected Map createLocalDiscoveryDetails() {
622         Map map = new HashMap();
623         map.put("brokerName", getLocalBrokerName());
624         map.put("connectURL", getLocalConnectionURL());
625         return map;
626     }
627 
628     protected String getLocalBrokerName() {
629         return getBroker().getBrokerName();
630     }
631 
632     protected String getLocalConnectionURL() {
633         StringBuffer buffer = new StringBuffer("reliable:");
634         List list = getTransportConnectors();
635         boolean first = true;
636         for (Iterator iter = list.iterator(); iter.hasNext();) {
637             BrokerConnector brokerConnector = (BrokerConnector) iter.next();
638             TransportServerChannel connector = brokerConnector.getServerChannel();
639             String url = connector.getUrl();
640             if (first) {
641                 first = false;
642             }
643             else {
644                 buffer.append(",");
645             }
646             buffer.append(url);
647         }
648         return buffer.toString();
649     }
650 
651     protected void addShutdownHook() {
652         shutdownHook = new Thread("ActiveMQ ShutdownHook") {
653             public void run() {
654                 containerShutdown();
655             }
656         };
657         Runtime.getRuntime().addShutdownHook(shutdownHook);
658     }
659 
660     /***
661      * Causes a clean shutdown of the container when the VM is being shut down
662      */
663     protected void containerShutdown() {
664         try {
665             stop();
666         }
667         catch (JMSException e) {
668             Exception linkedException = e.getLinkedException();
669             if (linkedException != null) {
670                 log.error("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException);
671             }
672             else {
673                 log.error("Failed to shut down: " + e, e);
674             }
675         }
676         catch (Exception e) {
677             log.error("Failed to shut down: " + e, e);
678         }
679     }
680 }