View Javadoc

1   /***
2    *
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.broker.impl;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.codehaus.activemq.broker.BrokerClient;
23  import org.codehaus.activemq.broker.BrokerConnector;
24  import org.codehaus.activemq.broker.BrokerContainer;
25  import org.codehaus.activemq.message.*;
26  import org.codehaus.activemq.transport.TransportChannel;
27  import org.codehaus.activemq.transport.TransportChannelListener;
28  import org.codehaus.activemq.transport.TransportServerChannel;
29  import org.codehaus.activemq.transport.TransportServerChannelProvider;
30  
31  import javax.jms.JMSException;
32  import javax.jms.JMSSecurityException;
33  import javax.transaction.xa.XAException;
34  import java.net.URI;
35  import java.net.URISyntaxException;
36  import java.util.Collections;
37  import java.util.HashMap;
38  import java.util.Map;
39  
40  /***
41   * An implementation of the broker (the JMS server)
42   *
43   * @version $Revision: 1.13 $
44   */
45  public class BrokerConnectorImpl implements BrokerConnector, TransportChannelListener {
46      private BrokerInfo brokerInfo;
47  
48      private TransportServerChannel serverChannel;
49      private Log log;
50      private BrokerContainer container;
51      private Map clients = Collections.synchronizedMap(new HashMap());
52  
53      /***
54       * Helper constructor for TCP protocol with the given bind address
55       *
56       * @param container
57       * @param bindAddress
58       * @throws JMSException
59       */
60      public BrokerConnectorImpl(BrokerContainer container, String bindAddress, WireFormat wireFormat) throws JMSException {
61          this(container, createTransportServerChannel(wireFormat, bindAddress));
62      }
63  
64      /***
65       * @param container
66       * @param serverChannel
67       */
68      public BrokerConnectorImpl(BrokerContainer container, TransportServerChannel serverChannel) {
69          assert container != null;
70          this.brokerInfo = new BrokerInfo();
71          this.brokerInfo.setBrokerName(container.getBroker().getBrokerName());
72          this.brokerInfo.setClusterName(container.getBroker().getBrokerClusterName());
73          this.log = LogFactory.getLog(getClass().getName());
74          this.serverChannel = serverChannel;
75          this.container = container;
76          this.container.addConnector(this);
77          serverChannel.setTransportChannelListener(this);
78      }
79  
80      /***
81       * @return infomation about the Broker
82       */
83      public BrokerInfo getBrokerInfo() {
84          return brokerInfo;
85      }
86  
87      /***
88       * Get a hint about the broker capacity for more messages
89       *
90       * @return percentage value (0-100) about how much capacity the
91       *         broker has
92       */
93      public int getBrokerCapacity() {
94          return container.getBroker().getRoundedCapacity();
95      }
96  
97      /***
98       * @return Get the server channel
99       */
100     public TransportServerChannel getServerChannel() {
101         return serverChannel;
102     }
103 
104     /***
105      * start the Broker
106      *
107      * @throws JMSException
108      */
109     public void start() throws JMSException {
110         this.serverChannel.start();
111         log.info("ActiveMQ connector started: " + serverChannel);
112     }
113 
114     /***
115      * Stop the Broker
116      *
117      * @throws JMSException
118      */
119     public void stop() throws JMSException {
120         this.container.removeConnector(this);
121         this.serverChannel.stop();
122         log.info("ActiveMQ connector stopped: " + serverChannel);
123     }
124 
125     /***
126      * Register a Broker Client
127      *
128      * @param client
129      * @param info   contains infomation about the Connection this Client represents
130      * @throws JMSException
131      * @throws javax.jms.InvalidClientIDException
132      *                              if the JMS client specifies an invalid or duplicate client ID.
133      * @throws JMSSecurityException if client authentication fails due to an invalid user name or password.
134      */
135     public void registerClient(BrokerClient client, ConnectionInfo info) throws JMSException {
136         this.container.registerConnection(client, info);
137     }
138 
139     /***
140      * Deregister a Broker Client
141      *
142      * @param client
143      * @param info
144      * @throws JMSException if some internal error occurs
145      */
146     public void deregisterClient(BrokerClient client, ConnectionInfo info) throws JMSException {
147         this.container.deregisterConnection(client, info);
148     }
149 
150     /***
151      * Registers a MessageConsumer
152      *
153      * @param client
154      * @param info
155      * @throws JMSException
156      * @throws JMSSecurityException if client authentication fails for the Destination the Consumer applies for
157      */
158     public void registerMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
159         if (info.getDestination() == null) {
160             throw new JMSException("No Destination specified on consumerInfo for client: " + client + " info: " + info);
161         }
162         this.container.registerMessageConsumer(client, info);
163 
164     }
165 
166     /***
167      * De-register a MessageConsumer from the Broker
168      *
169      * @param client
170      * @param info
171      * @throws JMSException
172      */
173     public void deregisterMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
174         this.container.deregisterMessageConsumer(client, info);
175     }
176 
177     /***
178      * Registers a MessageProducer
179      *
180      * @param client
181      * @param info
182      * @throws JMSException
183      * @throws JMSSecurityException if client authentication fails for the Destination the Consumer applies for
184      */
185     public void registerMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
186         this.container.registerMessageProducer(client, info);
187     }
188 
189     /***
190      * De-register a MessageProducer from the Broker
191      *
192      * @param client
193      * @param info
194      * @throws JMSException
195      */
196     public void deregisterMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
197         this.container.deregisterMessageProducer(client, info);
198     }
199 
200     /***
201      * Register a client-side Session (used for Monitoring)
202      *
203      * @param client
204      * @param info
205      * @throws JMSException
206      */
207     public void registerSession(BrokerClient client, SessionInfo info) throws JMSException {
208         this.container.registerSession(client, info);
209     }
210 
211     /***
212      * De-register a client-side Session from the Broker (used for monitoring)
213      *
214      * @param client
215      * @param info
216      * @throws JMSException
217      */
218     public void deregisterSession(BrokerClient client, SessionInfo info) throws JMSException {
219         this.container.deregisterSession(client, info);
220     }
221 
222     /***
223      * Start a transaction from the Client session
224      *
225      * @param client
226      * @param transactionId
227      * @throws JMSException
228      */
229     public void startTransaction(BrokerClient client, String transactionId) throws JMSException {
230         this.container.startTransaction(client, transactionId);
231     }
232 
233     /***
234      * Rollback a transacton
235      *
236      * @param client
237      * @param transactionId
238      * @throws JMSException
239      */
240     public void rollbackTransaction(BrokerClient client, String transactionId) throws JMSException {
241         this.container.rollbackTransaction(client, transactionId);
242     }
243 
244     /***
245      * Commit a transaction
246      *
247      * @param client
248      * @param transactionId
249      * @throws JMSException
250      */
251     public void commitTransaction(BrokerClient client, String transactionId) throws JMSException {
252         this.container.commitTransaction(client, transactionId);
253     }
254 
255     /***
256      * send message with a transaction context
257      *
258      * @param client
259      * @param transactionId
260      * @param message
261      * @throws JMSException
262      */
263     public void sendTransactedMessage(BrokerClient client, String transactionId, ActiveMQMessage message)
264             throws JMSException {
265         this.container.sendTransactedMessage(client, transactionId, message);
266     }
267 
268     /***
269      * Acknowledge receipt of a message within a transaction context
270      *
271      * @param client
272      * @param transactionId
273      * @param ack
274      * @throws JMSException
275      */
276     public void acknowledgeTransactedMessage(BrokerClient client, String transactionId, MessageAck ack)
277             throws JMSException {
278         this.container.acknowledgeTransactedMessage(client, transactionId, ack);
279     }
280 
281     /***
282      * Send a non-transacted message to the Broker
283      *
284      * @param client
285      * @param message
286      * @throws JMSException
287      */
288     public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
289         this.container.sendMessage(client, message);
290     }
291 
292     /***
293      * Acknowledge reciept of a message
294      *
295      * @param client
296      * @param ack
297      * @throws JMSException
298      */
299     public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
300         this.container.acknowledgeMessage(client, ack);
301     }
302 
303     /***
304      * Command to delete a durable topic subscription
305      *
306      * @param client
307      * @param ds
308      * @throws JMSException
309      */
310     public void durableUnsubscribe(BrokerClient client, DurableUnsubscribe ds) throws JMSException {
311         this.container.durableUnsubscribe(client, ds);
312     }
313 
314 
315     /***
316      * @param channel - client to add
317      */
318     public void addClient(TransportChannel channel) {
319         try {
320             BrokerClient client = new BrokerClientImpl();
321             client.initialize(this, channel);
322             if (log.isDebugEnabled()) {
323                 log.debug("Starting new client: " + client);
324             }
325             channel.setServerSide(true);
326             channel.start();
327             clients.put(channel, client);
328         }
329         catch (JMSException e) {
330             log.error("Failed to add client due to: " + e, e);
331         }
332     }
333 
334     /***
335      * @param channel - client to remove
336      */
337     public void removeClient(TransportChannel channel) {
338         BrokerClient client = (BrokerClient) clients.remove(channel);
339         if (client != null) {
340             if (log.isDebugEnabled()) {
341                 log.debug("Client leaving client: " + client);
342             }
343 
344             // we may have already been closed, if not then lets simulate a normal shutdown
345             client.cleanUp();
346         }
347         else {
348             // might have got a duplicate callback
349             log.warn("No such client for channel: " + channel);
350         }
351     }
352 
353     /***
354      * @return the BrokerContainer for this Connector
355      */
356     public BrokerContainer getBrokerContainer() {
357         return this.container;
358     }
359 
360     /***
361      * Start an XA transaction.
362      *
363      * @see org.codehaus.activemq.broker.BrokerConnector#startTransaction(org.codehaus.activemq.broker.BrokerClient, org.codehaus.activemq.message.ActiveMQXid)
364      */
365     public void startTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
366         this.container.startTransaction(client, xid);
367     }
368 
369     /***
370      * Gets the prepared XA transactions.
371      *
372      * @see org.codehaus.activemq.broker.BrokerConnector#getPreparedTransactions(org.codehaus.activemq.broker.BrokerClient)
373      */
374     public ActiveMQXid[] getPreparedTransactions(BrokerClient client) throws XAException {
375         return this.container.getPreparedTransactions(client);
376     }
377 
378     /***
379      * Prepare an XA transaction.
380      *
381      * @see org.codehaus.activemq.broker.BrokerConnector#prepareTransaction(org.codehaus.activemq.broker.BrokerClient, org.codehaus.activemq.message.ActiveMQXid)
382      */
383     public int prepareTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
384         return this.container.prepareTransaction(client, xid);
385     }
386 
387     /***
388      * Rollback an XA transaction.
389      *
390      * @see org.codehaus.activemq.broker.BrokerConnector#rollbackTransaction(org.codehaus.activemq.broker.BrokerClient, org.codehaus.activemq.message.ActiveMQXid)
391      */
392     public void rollbackTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
393         this.container.rollbackTransaction(client, xid);
394     }
395 
396     /***
397      * Commit an XA transaction.
398      *
399      * @see org.codehaus.activemq.broker.BrokerConnector#commitTransaction(org.codehaus.activemq.broker.BrokerClient, org.codehaus.activemq.message.ActiveMQXid, boolean)
400      */
401     public void commitTransaction(BrokerClient client, ActiveMQXid xid, boolean onePhase) throws XAException {
402         this.container.commitTransaction(client, xid, onePhase);
403     }
404 
405     /***
406      * @see org.codehaus.activemq.broker.BrokerConnector#getResourceManagerId(org.codehaus.activemq.broker.BrokerClient)
407      */
408     public String getResourceManagerId(BrokerClient client) {
409         // TODO: I think we need to return a better (more unique) RM id.
410         return getBrokerInfo().getBrokerName();
411     }
412 
413 
414     // Implementation methods
415     //-------------------------------------------------------------------------
416     /***
417      * Factory method ot create a transport channel
418      *
419      * @param bindAddress
420      * @return @throws JMSException
421      * @throws JMSException
422      */
423     protected static TransportServerChannel createTransportServerChannel(WireFormat wireFormat, String bindAddress) throws JMSException {
424         URI url;
425         try {
426             url = new URI(bindAddress);
427         }
428         catch (URISyntaxException e) {
429             JMSException jmsEx = new JMSException("Badly formated bindAddress: " + e.getMessage());
430             jmsEx.setLinkedException(e);
431             throw jmsEx;
432         }
433         return TransportServerChannelProvider.create(wireFormat, url);
434     }
435 
436 }