View Javadoc

1   /***
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.codehaus.activemq.broker.Broker;
23  import org.codehaus.activemq.broker.BrokerConnector;
24  import org.codehaus.activemq.broker.BrokerContainer;
25  import org.codehaus.activemq.broker.BrokerContainerFactory;
26  import org.codehaus.activemq.broker.BrokerContext;
27  import org.codehaus.activemq.broker.impl.BrokerClientImpl;
28  import org.codehaus.activemq.broker.impl.BrokerConnectorImpl;
29  import org.codehaus.activemq.broker.impl.BrokerContainerFactoryImpl;
30  import org.codehaus.activemq.jndi.JNDIBaseStorable;
31  import org.codehaus.activemq.management.JMSStatsImpl;
32  import org.codehaus.activemq.management.StatsCapable;
33  import org.codehaus.activemq.message.ActiveMQQueue;
34  import org.codehaus.activemq.message.ActiveMQTopic;
35  import org.codehaus.activemq.message.ConnectionInfo;
36  import org.codehaus.activemq.message.ConsumerInfo;
37  import org.codehaus.activemq.message.DefaultWireFormat;
38  import org.codehaus.activemq.message.WireFormat;
39  import org.codehaus.activemq.service.Service;
40  import org.codehaus.activemq.transport.TransportChannel;
41  import org.codehaus.activemq.transport.TransportChannelFactory;
42  import org.codehaus.activemq.transport.TransportChannelListener;
43  import org.codehaus.activemq.transport.TransportChannelProvider;
44  import org.codehaus.activemq.transport.vm.VmTransportChannel;
45  import org.codehaus.activemq.util.IdGenerator;
46  
47  import javax.jms.Connection;
48  import javax.jms.ConnectionFactory;
49  import javax.jms.JMSException;
50  import javax.jms.QueueConnection;
51  import javax.jms.QueueConnectionFactory;
52  import javax.jms.TopicConnection;
53  import javax.jms.TopicConnectionFactory;
54  import javax.management.j2ee.statistics.Stats;
55  import java.net.URI;
56  import java.net.URISyntaxException;
57  import java.util.ArrayList;
58  import java.util.Iterator;
59  import java.util.List;
60  import java.util.Properties;
61  
62  /***
63   * A ConnectionFactory is an an Administed object, and is used for creating
64   * Connections.
65   * <p/>
66   * This class also implements QueueConnectionFactory and TopicConnectionFactory and is an Administered object.
67   * You can use this connection to create both QueueConnections and TopicConnections.
68   *
69   * @version $Revision: 1.37 $
70   * @see javax.jms.ConnectionFactory
71   */
72  public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, Service, StatsCapable {
73  
74      private static final Log log = LogFactory.getLog(ActiveMQConnectionFactory.class);
75  
76      private BrokerContext brokerContext = BrokerContext.getInstance();
77      private BrokerContainerFactory brokerContainerFactory;
78      protected BrokerContainer brokerContainer;
79  
80      protected String userName;
81      protected String password;
82      protected String brokerURL;
83      protected String clientID;
84      protected String brokerName;
85      private boolean useEmbeddedBroker;
86      /***
87       * Should we use an async send for persistent non transacted messages ?
88       */
89      protected boolean useAsyncSend = true;
90      /* The list of emebeded brokers that this object started */
91      private List startedEmbeddedBrokers = new ArrayList();
92  
93      private JMSStatsImpl stats = new JMSStatsImpl();
94      private WireFormat wireFormat = new DefaultWireFormat();
95      private IdGenerator idGenerator = new IdGenerator();
96      private int connectionCount;
97      private String brokerXmlConfig;
98  
99      /***
100      * Default Constructor for ActiveMQConnectionFactory
101      */
102     public ActiveMQConnectionFactory() {
103         this.userName = ActiveMQConnection.DEFAULT_USER;
104         this.password = ActiveMQConnection.DEFAULT_PASSWORD;
105         this.brokerURL = ActiveMQConnection.DEFAULT_URL;
106     }
107 
108 
109     public ActiveMQConnectionFactory(String brokerURL) {
110         this();
111         this.brokerURL = brokerURL;
112     }
113 
114     public ActiveMQConnectionFactory(String userName, String password, String brokerURL) {
115         this.userName = userName;
116         this.password = password;
117         this.brokerURL = brokerURL;
118     }
119 
120     /***
121      * Constructs a {@link ConnectionFactory} with an already configured and <b>started</b> {@link BrokerContainer}
122      * ready for use in embedded mode.
123      *
124      * @param container
125      */
126     public ActiveMQConnectionFactory(BrokerContainer container) {
127         this(container, "vm://" + container.getBroker().getName());
128     }
129 
130     /***
131      * Constructs a {@link ConnectionFactory} with an already configured and <b>started</b> {@link BrokerContainer}
132      * ready for use in embedded mode and the brokerURL connection.
133      *
134      * @param container
135      */
136     public ActiveMQConnectionFactory(BrokerContainer container, String brokerURL) {
137         this();
138         this.brokerContainer = container;
139         this.useEmbeddedBroker = true;
140         this.brokerURL = brokerURL;
141     }
142 
143 
144     public Stats getStats() {
145         return stats;
146     }
147 
148     public JMSStatsImpl getFactoryStats() {
149         return stats;
150     }
151 
152     /***
153      * @return Returns the brokerURL.
154      */
155     public String getBrokerURL() {
156         return brokerURL;
157     }
158 
159     /***
160      * @param brokerURL The brokerURL to set.
161      */
162     public void setBrokerURL(String brokerURL) {
163         this.brokerURL = brokerURL;
164     }
165 
166     /***
167      * @return Returns the clientID.
168      */
169     public String getClientID() {
170         return clientID;
171     }
172 
173     /***
174      * @param clientID The clientID to set.
175      */
176     public void setClientID(String clientID) {
177         this.clientID = clientID;
178     }
179 
180     /***
181      * @return Returns the password.
182      */
183     public String getPassword() {
184         return password;
185     }
186 
187     /***
188      * @param password The password to set.
189      */
190     public void setPassword(String password) {
191         this.password = password;
192     }
193 
194     /***
195      * @return Returns the userName.
196      */
197     public String getUserName() {
198         return userName;
199     }
200 
201     /***
202      * @param userName The userName to set.
203      */
204     public void setUserName(String userName) {
205         this.userName = userName;
206     }
207 
208     /***
209      * Is an embedded broker used by this connection factory
210      *
211      * @return true if an embedded broker will be used by this connection factory
212      */
213     public boolean isUseEmbeddedBroker() {
214         return useEmbeddedBroker;
215     }
216 
217     /***
218      * Allows embedded brokers to be associated with a connection factory
219      *
220      * @param useEmbeddedBroker
221      */
222     public void setUseEmbeddedBroker(boolean useEmbeddedBroker) {
223         this.useEmbeddedBroker = useEmbeddedBroker;
224     }
225 
226     /***
227      * The name of the broker to use if creating an embedded broker
228      *
229      * @return
230      */
231     public String getBrokerName() {
232         if (brokerName == null) {
233             // lets auto-create a broker name
234             brokerName = idGenerator.generateId();
235         }
236         return brokerName;
237     }
238 
239     public void setBrokerName(String brokerName) {
240         this.brokerName = brokerName;
241     }
242 
243     /***
244      * @return Returns the useAsyncSend.
245      */
246     public boolean isUseAsyncSend() {
247         return useAsyncSend;
248     }
249 
250     /***
251      * @param useAsyncSend The useAsyncSend to set.
252      */
253     public void setUseAsyncSend(boolean useAsyncSend) {
254         this.useAsyncSend = useAsyncSend;
255     }
256 
257     public WireFormat getWireFormat() {
258         return wireFormat;
259     }
260 
261     /***
262      * Allows a custom wire format to be used; otherwise the default Java wire format is used
263      * which is designed for minimum size and maximum speed on the Java platform
264      *
265      * @param wireFormat
266      */
267     public void setWireFormat(WireFormat wireFormat) {
268         this.wireFormat = wireFormat;
269     }
270 
271     public String getBrokerXmlConfig() {
272         return brokerXmlConfig;
273     }
274 
275     public BrokerContainer getBrokerContainer() {
276         return brokerContainer;
277     }
278 
279     /***
280      * Sets the <a href="http://activemq.codehaus.org/Xml+Configuration">XML configuration file</a>
281      * used to configure the ActiveMQ broker via Spring if using embedded mode.
282      *
283      * @param brokerXmlConfig is the filename which is assumed to be on the classpath unless a URL
284      *                        is specified. So a value of <code>foo/bar.xml</code> would be assumed to be on the classpath
285      *                        whereas <code>file:dir/file.xml</code> would use the file system.
286      *                        Any valid URL string is supported.
287      * @see #setUseEmbeddedBroker(boolean)
288      */
289     public void setBrokerXmlConfig(String brokerXmlConfig) {
290         this.brokerXmlConfig = brokerXmlConfig;
291     }
292 
293     public BrokerContainerFactory getBrokerContainerFactory() throws JMSException {
294         if (brokerContainerFactory == null) {
295             brokerContainerFactory = createBrokerContainerFactory();
296         }
297         return brokerContainerFactory;
298     }
299 
300     public void setBrokerContainerFactory(BrokerContainerFactory brokerContainerFactory) {
301         this.brokerContainerFactory = brokerContainerFactory;
302     }
303 
304     /***
305      * Returns the context used to store broker containers and connectors which defaults
306      * to using the singleton
307      */
308     public BrokerContext getBrokerContext() {
309         return brokerContext;
310     }
311 
312     public void setBrokerContext(BrokerContext brokerContext) {
313         this.brokerContext = brokerContext;
314     }
315 
316     /***
317      * Create a JMS Connection
318      *
319      * @return the JMS Connection
320      * @throws JMSException if an error occurs creating the Connection
321      */
322     public Connection createConnection() throws JMSException {
323         return this.createConnection(this.userName, this.password);
324     }
325 
326     /***
327      * @param userName
328      * @param password
329      * @return the Connection
330      * @throws JMSException if an error occurs creating the Connection
331      */
332     public Connection createConnection(String userName, String password) throws JMSException {
333         ActiveMQConnection connection = new ActiveMQConnection(this, userName, password, createTransportChannel(this.brokerURL));
334         connection.setUseAsyncSend(isUseAsyncSend());
335         if (this.clientID != null && this.clientID.length() > 0) {
336             connection.setClientID(this.clientID);
337         }
338         return connection;
339     }
340 
341     /***
342      * Create a JMS QueueConnection
343      *
344      * @return the JMS QueueConnection
345      * @throws JMSException if an error occurs creating the Connection
346      */
347     public QueueConnection createQueueConnection() throws JMSException {
348         return this.createQueueConnection(this.userName, this.password);
349     }
350 
351     /***
352      * @param userName
353      * @param password
354      * @return the QueueConnection
355      * @throws JMSException if an error occurs creating the Connection
356      */
357     public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
358         return (QueueConnection) createConnection(userName, password);
359     }
360 
361     /***
362      * Create a JMS TopicConnection
363      *
364      * @return the JMS TopicConnection
365      * @throws JMSException if an error occurs creating the Connection
366      */
367     public TopicConnection createTopicConnection() throws JMSException {
368         return this.createTopicConnection(this.userName, this.password);
369     }
370 
371     /***
372      * @param userName
373      * @param password
374      * @return the TopicConnection
375      * @throws JMSException if an error occurs creating the Connection
376      */
377     public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
378         return (TopicConnection) createConnection(userName, password);
379     }
380 
381 
382     public void start() throws JMSException {
383     }
384 
385     /***
386      * A hook to allow any embedded JMS Broker's to be closed down
387      *
388      * @throws JMSException
389      */
390     public synchronized void stop() throws JMSException {
391         // Stop all embded brokers that we started.
392         for (Iterator iter = startedEmbeddedBrokers.iterator(); iter.hasNext();) {
393             String uri = (String) iter.next();
394             brokerContext.deregisterConnector(uri);
395         }
396         if (brokerContainer != null) {
397             brokerContainer.stop();
398             brokerContainer = null;
399         }
400     }
401 
402 
403     public Broker getEmbeddedBroker() throws JMSException {
404         if (isUseEmbeddedBroker()) {
405             return getContainer(getBrokerName()).getBroker();
406         }
407         return null;
408     }
409 
410     public static synchronized void registerBroker(String theURLString, BrokerConnector brokerConnector) {
411         BrokerContext.getInstance().registerConnector(theURLString, brokerConnector);
412     }
413 
414     public static synchronized void unregisterBroker(String theURLString) {
415         BrokerContext.getInstance().deregisterConnector(theURLString);
416     }
417 
418 
419     // Implementation methods
420     //-------------------------------------------------------------------------
421 
422 
423     /***
424      * Set the properties that will represent the instance in JNDI
425      *
426      * @param props
427      */
428     protected void buildFromProperties(Properties props) {
429         this.userName = props.getProperty("userName", this.userName);
430         this.password = props.getProperty("password", this.password);
431         this.brokerURL = props.getProperty("brokerURL", this.brokerURL);
432         this.brokerName = props.getProperty("brokerName", this.brokerName);
433         this.clientID = props.getProperty("clientID");
434         this.useAsyncSend = getBoolean(props, "useAsyncSend", true);
435         this.useEmbeddedBroker = getBoolean(props, "useEmbeddedBroker");
436         this.brokerXmlConfig = props.getProperty("brokerXmlConfig", this.brokerXmlConfig);
437     }
438 
439     /***
440      * Initialize the instance from properties stored in JNDI
441      *
442      * @param props
443      */
444     protected void populateProperties(Properties props) {
445         props.put("userName", this.userName);
446         props.put("password", this.password);
447         props.put("brokerURL", this.brokerURL);
448         props.put("brokerName", this.brokerName);
449         if (this.clientID != null) {
450             props.put("clientID", this.clientID);
451         }
452         props.put("useAsyncSend", (useAsyncSend) ? "true" : "false");
453         props.put("useEmbeddedBroker", (useEmbeddedBroker) ? "true" : "false");
454         if (this.brokerXmlConfig != null) {
455             props.put("brokerXmlConfig", this.brokerXmlConfig);
456         }
457     }
458 
459     /***
460      * Helper method to return the property value as a boolean flag
461      *
462      * @param props
463      * @param key
464      * @return
465      */
466     protected boolean getBoolean(Properties props, String key) {
467         return getBoolean(props, key, false);
468     }
469 
470     /***
471      * Helper method to return the property value as a boolean flag
472      *
473      * @param props
474      * @param key
475      * @param defaultValue
476      * @return
477      */
478     protected boolean getBoolean(Properties props, String key, boolean defaultValue) {
479         String value = props.getProperty(key);
480         return value != null ? value.equalsIgnoreCase("true") : defaultValue;
481     }
482 
483     protected BrokerContainerFactory createBrokerContainerFactory() throws JMSException {
484         if (brokerXmlConfig != null) {
485             return XmlConfigHelper.createBrokerContainerFactory(brokerXmlConfig);
486         }
487         return new BrokerContainerFactoryImpl();
488     }
489 
490     /***
491      * Factory method to create a TransportChannel from a URL
492      */
493     protected TransportChannel createTransportChannel(String theURLString) throws JMSException {
494         URI uri = createURI(theURLString);
495 
496         TransportChannelFactory factory =
497                 TransportChannelProvider.getFactory(uri);
498 
499         BrokerConnector brokerConnector = null;
500         boolean created = false;
501         boolean embedServer = isUseEmbeddedBroker() || factory.requiresEmbeddedBroker();
502         if (embedServer) {
503             synchronized (this) {
504                 brokerConnector = brokerContext.getConnectorByURL(theURLString);
505                 if (brokerConnector == null) {
506                     brokerConnector = createBrokerConnector(theURLString);
507                     brokerContext.registerConnector(theURLString, brokerConnector);
508                     startedEmbeddedBrokers.add(theURLString);
509                     created = true;
510                 }
511             }
512         }
513         TransportChannel transportChannel = factory.create(getWireFormat(), uri);
514         if (embedServer) {
515             return ensureServerIsAvailable(uri, transportChannel, brokerConnector, created);
516         }
517         return transportChannel;
518     }
519 
520     protected synchronized BrokerContainer getContainer(String brokerName) throws JMSException {
521         if (brokerContainer == null) {
522             brokerContainer = brokerContext.getBrokerContainerByName(brokerName, getBrokerContainerFactory());
523         }
524         return brokerContainer;
525     }
526 
527     protected BrokerConnector createBrokerConnector(String url) throws JMSException {
528         BrokerConnector brokerConnector;
529         brokerConnector = new BrokerConnectorImpl(getContainer(getBrokerName()), url, getWireFormat());
530         brokerConnector.start();
531 
532         // lets wait a little for the server to startup
533         log.info("Embedded JMS Broker has started");
534         try {
535             Thread.sleep(1000);
536         }
537         catch (InterruptedException e) {
538             System.out.println("Caught: " + e);
539             e.printStackTrace();
540         }
541         return brokerConnector;
542     }
543 
544 
545     protected TransportChannel ensureServerIsAvailable(URI remoteLocation, TransportChannel channel, BrokerConnector brokerConnector, boolean created) throws JMSException {
546         ensureVmServerIsAvailable(channel, brokerConnector);
547         if (channel.isMulticast()) {
548             return ensureMulticastChannelIsAvailable(remoteLocation, channel, brokerConnector, created);
549         }
550         return channel;
551     }
552 
553     private void ensureVmServerIsAvailable(TransportChannel channel, BrokerConnector brokerConnector) throws JMSException {
554         if (channel instanceof VmTransportChannel && brokerConnector instanceof TransportChannelListener) {
555             VmTransportChannel answer = (VmTransportChannel) channel;
556             answer.connect(brokerConnector);
557         }
558     }
559 
560     protected TransportChannel ensureMulticastChannelIsAvailable(URI remoteLocation, TransportChannel channel, BrokerConnector brokerConnector, boolean created) throws JMSException {
561         if (created) {
562             BrokerConnectorImpl brokerImpl = (BrokerConnectorImpl) brokerConnector;
563 
564             BrokerClientImpl client = new BrokerClientImpl();
565             client.initialize(brokerImpl, channel);
566             channel.start();
567             String brokerClientID = createMulticastClientID();
568             channel.setClientID(brokerClientID);
569 
570             // lets spoof a consumer for topics which will replicate messages
571             // over the multicast transport
572             ConnectionInfo info = new ConnectionInfo();
573             info.setHostName(IdGenerator.getHostName());
574             info.setClientId(brokerClientID);
575             info.setStarted(true);
576             client.consumeConnectionInfo(info);
577 
578             ConsumerInfo consumerInfo = new ConsumerInfo();
579             consumerInfo.setDestination(new ActiveMQTopic(">"));
580             consumerInfo.setNoLocal(true);
581             consumerInfo.setClientId(brokerClientID);
582             consumerInfo.setConsumerId(idGenerator.generateId());
583             consumerInfo.setId(consumerInfo.getConsumerId());
584             consumerInfo.setStarted(true);
585             client.consumeConsumerInfo(consumerInfo);
586 
587             consumerInfo = new ConsumerInfo();
588             consumerInfo.setDestination(new ActiveMQQueue(">"));
589             consumerInfo.setNoLocal(true);
590             consumerInfo.setClientId(brokerClientID);
591             consumerInfo.setConsumerId(idGenerator.generateId());
592             consumerInfo.setId(consumerInfo.getConsumerId());
593             consumerInfo.setStarted(true);
594             client.consumeConsumerInfo(consumerInfo);
595         }
596 
597         // now lets create a VM channel that the JMS client will use
598         // to connect to the embedded brokerConnector
599         URI localURI = createURI("vm", remoteLocation);
600         TransportChannel localChannel = TransportChannelProvider.create(getWireFormat(), localURI);
601         ensureVmServerIsAvailable(localChannel, brokerConnector);
602         return localChannel;
603     }
604 
605     /***
606      * Creates the clientID for the multicast client (used to dispatch local
607      * messages over a multicast bus)
608      */
609     protected String createMulticastClientID() {
610         return idGenerator.generateId();
611     }
612 
613     protected URI createURI(String protocol, URI uri) throws JMSException {
614         try {
615             return new URI(protocol, uri.getRawSchemeSpecificPart(), uri.getFragment());
616         }
617         catch (URISyntaxException e) {
618             JMSException jmsEx = new JMSException("the URL string is badly formated:", e.getMessage());
619             jmsEx.setLinkedException(e);
620             throw jmsEx;
621 
622         }
623     }
624 
625     protected URI createURI(String uri) throws JMSException {
626         try {
627             if (uri == null) {
628                 throw new JMSException("The connection URI must be specified!");
629             }
630             return new URI(uri);
631         }
632         catch (URISyntaxException e) {
633             JMSException jmsEx = new JMSException("the URL string is badly formated:", e.getMessage());
634             jmsEx.setLinkedException(e);
635             throw jmsEx;
636 
637         }
638     }
639 
640     /***
641      * Called when a connection is closed so that we can shut down any embedded brokers cleanly
642      *
643      * @param connection
644      */
645     synchronized void onConnectionClose(ActiveMQConnection connection) throws JMSException {
646         if (--connectionCount <= 0) {
647             // close any broker if we've got one
648             stop();
649         }
650 
651     }
652 
653     synchronized void onConnectionCreate(ActiveMQConnection connection) {
654         ++connectionCount;
655     }
656 }