View Javadoc

1   /***
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.service.impl;
19  
20  import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.codehaus.activemq.broker.BrokerClient;
24  import org.codehaus.activemq.filter.AndFilter;
25  import org.codehaus.activemq.filter.DestinationMap;
26  import org.codehaus.activemq.filter.Filter;
27  import org.codehaus.activemq.filter.FilterFactory;
28  import org.codehaus.activemq.filter.FilterFactoryImpl;
29  import org.codehaus.activemq.filter.NoLocalFilter;
30  import org.codehaus.activemq.message.ActiveMQDestination;
31  import org.codehaus.activemq.message.ActiveMQMessage;
32  import org.codehaus.activemq.message.ActiveMQQueue;
33  import org.codehaus.activemq.message.ConsumerInfo;
34  import org.codehaus.activemq.message.MessageAck;
35  import org.codehaus.activemq.service.Dispatcher;
36  import org.codehaus.activemq.service.MessageContainer;
37  import org.codehaus.activemq.service.QueueList;
38  import org.codehaus.activemq.service.QueueListEntry;
39  import org.codehaus.activemq.service.QueueMessageContainer;
40  import org.codehaus.activemq.service.Subscription;
41  import org.codehaus.activemq.service.SubscriptionContainer;
42  import org.codehaus.activemq.service.RedeliveryPolicy;
43  import org.codehaus.activemq.service.boundedvm.TransientQueueBoundedMessageContainer;
44  import org.codehaus.activemq.store.PersistenceAdapter;
45  
46  import javax.jms.Destination;
47  import javax.jms.JMSException;
48  import java.util.Iterator;
49  import java.util.Map;
50  import java.util.Set;
51  
52  /***
53   * A default Broker used for Queue messages
54   *
55   * @version $Revision: 1.1 $
56   */
57  public class DurableQueueMessageContainerManager extends MessageContainerManagerSupport {
58      private static final Log log = LogFactory.getLog(DurableQueueMessageContainerManager.class);
59      private static final int MAX_MESSAGES_DISPATCHED_FROM_POLL = 50;
60  
61      private PersistenceAdapter persistenceAdapter;
62      protected SubscriptionContainer subscriptionContainer;
63      protected FilterFactory filterFactory;
64      protected Map activeSubscriptions = new ConcurrentHashMap();
65      protected Map browsers = new ConcurrentHashMap();
66      protected DestinationMap destinationMap = new DestinationMap();
67      private Object subscriptionMutex = new Object();
68     
69     
70  
71      public DurableQueueMessageContainerManager(PersistenceAdapter persistenceAdapter, RedeliveryPolicy redeliveryPolicy) {
72          this(persistenceAdapter, new SubscriptionContainerImpl(redeliveryPolicy), new FilterFactoryImpl(), new DispatcherImpl());
73      }
74  
75      public DurableQueueMessageContainerManager(PersistenceAdapter persistenceAdapter, SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) {
76          super(dispatcher);
77          this.persistenceAdapter = persistenceAdapter;
78          this.subscriptionContainer = subscriptionContainer;
79          this.filterFactory = filterFactory;
80      }
81  
82      /***
83       * @param client
84       * @param info
85       * @throws javax.jms.JMSException
86       */
87      public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
88          if (log.isDebugEnabled()) {
89              log.debug("Adding consumer: " + info);
90          }
91          if (info.getDestination().isQueue() && !info.getDestination().isTemporary()) {
92              //ensure a matching container exists for the destination
93              getContainer(info.getDestination().getPhysicalName());
94              
95              Subscription sub = subscriptionContainer.makeSubscription(dispatcher, client,info, createFilter(info));
96              dispatcher.addActiveSubscription(client, sub);
97              updateActiveSubscriptions(sub);
98  
99              // set active last in case we end up dispatching some messages
100             // while recovering
101             sub.setActive(true);
102         }
103     }
104 
105     /***
106      * @param client
107      * @param info
108      * @throws javax.jms.JMSException
109      */
110     public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
111         if (log.isDebugEnabled()) {
112             log.debug("Removing consumer: " + info);
113         }
114         if (info.getDestination() != null && info.getDestination().isQueue()) {
115             synchronized (subscriptionMutex) {
116                 Subscription sub = (Subscription) subscriptionContainer.removeSubscription(info.getConsumerId());
117                 if (sub != null) {
118                     sub.setActive(false);
119                     sub.clear();//resets entries in the QueueMessageContainer
120                     dispatcher.removeActiveSubscription(client, sub);
121                     //need to do wildcards for this - but for now use exact matches
122                     for (Iterator iter = messageContainers.values().iterator(); iter.hasNext();) {
123                         QueueMessageContainer container = (QueueMessageContainer) iter.next();
124                         //should change this for wild cards ...
125                         if (container.getDestinationName().equals(sub.getDestination().getPhysicalName())) {
126                             QueueList list = getSubscriptionList(container);
127                             list.remove(sub);
128                             if (list.isEmpty()) {
129                                 activeSubscriptions.remove(sub.getDestination().getPhysicalName());
130                             }
131                             list = getBrowserList(container);
132                             list.remove(sub);
133                             if (list.isEmpty()) {
134                                 browsers.remove(sub.getDestination().getPhysicalName());
135                             }
136                         }
137                     }
138                 }
139             }
140         }
141     }
142 
143     /***
144      * Delete a durable subscriber
145      *
146      * @param clientId
147      * @param subscriberName
148      * @throws javax.jms.JMSException if the subscriber doesn't exist or is still active
149      */
150     public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
151     }
152 
153     /***
154      * @param client
155      * @param message
156      * @throws javax.jms.JMSException
157      */
158     public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
159         ActiveMQDestination dest = (ActiveMQDestination) message.getJMSDestination();
160         if (dest != null && dest.isQueue() && !message.isTemporary()) {
161             if (log.isDebugEnabled()) {
162                 log.debug("Dispaching message: " + message);
163             }
164             //ensure a matching container exists for the destination
165             getContainer(((ActiveMQDestination) message.getJMSDestination()).getPhysicalName());
166             Set set = destinationMap.get(message.getJMSActiveMQDestination());
167             for (Iterator i = set.iterator();i.hasNext();) {
168                 QueueMessageContainer container = (QueueMessageContainer) i.next();
169                 container.addMessage(message);
170                 dispatcher.wakeup();
171                 updateSendStats(client, message);
172             }
173         }
174     }
175 
176     /***
177      * Acknowledge a message as being read and consumed by the Consumer
178      *
179      * @param client
180      * @param ack
181      * @throws javax.jms.JMSException
182      */
183     public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
184         if (!ack.isTemporary() && ack.getDestination().isQueue()){
185             Subscription sub = subscriptionContainer.getSubscription(ack.getConsumerId());
186             if (sub != null) {
187                 sub.messageConsumed(ack);
188                 if (ack.isMessageRead()) {
189                     updateAcknowledgeStats(client, sub);
190                 }
191             }
192         }
193     }
194 
195     public void acknowledgeTransactedMessage(BrokerClient client, String transactionId, MessageAck ack) throws JMSException {
196         if (!ack.isTemporary() && ack.getDestination().isQueue()){
197             Subscription sub = subscriptionContainer.getSubscription(ack.getConsumerId());
198             if (sub != null) {
199                 sub.onAcknowledgeTransactedMessageBeforeCommit(ack);
200             }
201         }
202     }
203 
204     public void redeliverMessage(BrokerClient client, MessageAck ack) throws JMSException {
205         if (!ack.isTemporary() && ack.getDestination().isQueue()){
206             Subscription sub = subscriptionContainer.getSubscription(ack.getConsumerId());
207             if (sub != null) {
208                 sub.redeliverMessage(null, ack);
209             }
210         }
211     }
212 
213     /***
214      * Poll for messages
215      *
216      * @throws javax.jms.JMSException
217      */
218     public void poll() throws JMSException {
219         synchronized (subscriptionMutex) {
220             for (Iterator iter = activeSubscriptions.keySet().iterator(); iter.hasNext();) {
221                 QueueMessageContainer container = (QueueMessageContainer) iter.next();
222 
223                 QueueList browserList = (QueueList) browsers.get(container);
224                 doPeek(container, browserList);
225                 QueueList list = (QueueList) activeSubscriptions.get(container);
226                 doPoll(container, list);
227             }
228         }
229     }
230 
231     public void commitTransaction(BrokerClient client, String transactionId) {
232     }
233 
234     public void rollbackTransaction(BrokerClient client, String transactionId) {
235     }
236 
237     public MessageContainer getContainer(String destinationName) throws JMSException {
238         synchronized (subscriptionMutex) {
239             return super.getContainer(destinationName);
240         }
241     }
242 
243     // Implementation methods
244     //-------------------------------------------------------------------------
245 
246     protected MessageContainer createContainer(String destinationName) throws JMSException {
247         QueueMessageContainer container = persistenceAdapter.createQueueMessageContainer(destinationName);
248 
249         //Add any interested Subscriptions to the new Container
250         for (Iterator iter = subscriptionContainer.subscriptionIterator(); iter.hasNext();) {
251             Subscription sub = (Subscription) iter.next();
252             if (sub.isBrowser()) {
253                 updateBrowsers(container, sub);
254             }
255             else {
256                 updateActiveSubscriptions(container, sub);
257             }
258         }
259         
260         ActiveMQDestination key = new ActiveMQQueue(destinationName);
261         destinationMap.put(key, container);
262         return container;
263     }
264 
265     protected Destination createDestination(String destinationName) {
266         return new ActiveMQQueue(destinationName);
267     }
268 
269     private void doPeek(QueueMessageContainer container, QueueList browsers) throws JMSException {
270         if (browsers != null && browsers.size() > 0) {
271             for (int i = 0; i < browsers.size(); i++) {
272                 SubscriptionImpl sub = (SubscriptionImpl) browsers.get(i);
273                 int count = 0;
274                 ActiveMQMessage msg = null;
275                 do {
276                     msg = container.peekNext(sub.getLastMessageIdentity());
277                     if (msg != null) {
278                         if (sub.isTarget(msg)) {
279                             sub.addMessage(container, msg);
280                             dispatcher.wakeup(sub);
281                         }
282                         else {
283                             sub.setLastMessageIdentifier(msg.getJMSMessageIdentity());
284                         }
285                     }
286                 }
287                 while (msg != null && !sub.isAtPrefetchLimit() && count++ < MAX_MESSAGES_DISPATCHED_FROM_POLL);
288             }
289         }
290     }
291 
292     private void doPoll(QueueMessageContainer container, QueueList subList) throws JMSException {
293         int count = 0;
294         ActiveMQMessage msg = null;
295         if (subList != null && subList.size() > 0) {
296             do {
297                 boolean dispatched = false;
298                 msg = container.poll();
299                 if (msg != null) {
300                     QueueListEntry entry = subList.getFirstEntry();
301                     boolean targeted = false;
302                     while (entry != null) {
303                         SubscriptionImpl sub = (SubscriptionImpl) entry.getElement();
304                         if (sub.isTarget(msg)) {
305                             targeted = true;
306                             if (!sub.isAtPrefetchLimit()) {
307                                 sub.addMessage(container, msg);
308                                 dispatched = true;
309                                 dispatcher.wakeup(sub);
310                                 subList.rotate(); //round-robin the list
311                                 break;
312                             }
313                         }
314                         entry = subList.getNextEntry(entry);
315                     }
316                     if (!dispatched) {
317                         if (targeted) { //ie. it can be selected by current active consumers - but they are at
318                             // pre-fectch
319                             // limit
320                             container.returnMessage(msg.getJMSMessageIdentity());
321                         }
322                         break;
323                     }
324                 }
325             }
326             while (msg != null && count++ < MAX_MESSAGES_DISPATCHED_FROM_POLL);
327         }
328     }
329 
330     private void updateActiveSubscriptions(Subscription subscription) throws JMSException {
331         //need to do wildcards for this - but for now use exact matches
332         synchronized (subscriptionMutex) {
333             boolean processedSubscriptionContainer = false;
334 
335             String subscriptionPhysicalName = subscription.getDestination().getPhysicalName();
336             for (Iterator iter = messageContainers.entrySet().iterator(); iter.hasNext();) {
337                 Map.Entry entry = (Map.Entry) iter.next();
338                 String destinationName = (String) entry.getKey();
339                 QueueMessageContainer container = (QueueMessageContainer) entry.getValue();
340 
341                 if (destinationName.equals(subscriptionPhysicalName)) {
342                     processedSubscriptionContainer = true;
343                 }
344                 processSubscription(subscription, container);
345             }
346             if (!processedSubscriptionContainer) {
347                 processSubscription(subscription, (QueueMessageContainer) getContainer(subscriptionPhysicalName));
348             }
349         }
350     }
351 
352     protected void processSubscription(Subscription subscription, QueueMessageContainer container) throws JMSException {
353         // TODO should change this for wild cards ...
354         if (subscription.isBrowser()) {
355             updateBrowsers(container, subscription);
356         }
357         else {
358             updateActiveSubscriptions(container, subscription);
359         }
360     }
361 
362     private void updateActiveSubscriptions(QueueMessageContainer container, Subscription sub) throws JMSException {
363         //need to do wildcards for this - but for now use exact matches
364         //should change this for wild cards ...
365         if (container.getDestinationName().equals(sub.getDestination().getPhysicalName())) {
366             container.reset();//reset container - flushing all filter out messages to new consumer
367             QueueList list = getSubscriptionList(container);
368             if (!list.contains(sub)) {
369                 list.add(sub);
370             }
371         }
372     }
373 
374     private QueueList getSubscriptionList(QueueMessageContainer container) {
375         QueueList list = (QueueList) activeSubscriptions.get(container);
376         if (list == null) {
377             list = new DefaultQueueList();
378             activeSubscriptions.put(container, list);
379         }
380         return list;
381     }
382 
383     private void updateBrowsers(QueueMessageContainer container, Subscription sub) throws JMSException {
384         //need to do wildcards for this - but for now use exact matches
385         //should change this for wild cards ...
386         if (container.getDestinationName().equals(sub.getDestination().getPhysicalName())) {
387             container.reset();//reset container - flushing all filter out messages to new consumer
388             QueueList list = getBrowserList(container);
389             if (!list.contains(sub)) {
390                 list.add(sub);
391             }
392         }
393     }
394 
395     private QueueList getBrowserList(QueueMessageContainer container) {
396         QueueList list = (QueueList) browsers.get(container);
397         if (list == null) {
398             list = new DefaultQueueList();
399             browsers.put(container, list);
400         }
401         return list;
402     }
403 
404     /***
405      * Create filter for a Consumer
406      *
407      * @param info
408      * @return the Fitler
409      * @throws javax.jms.JMSException
410      */
411     protected Filter createFilter(ConsumerInfo info) throws JMSException {
412         Filter filter = filterFactory.createFilter(info.getDestination(), info.getSelector());
413         if (info.isNoLocal()) {
414             filter = new AndFilter(filter, new NoLocalFilter(info.getClientId()));
415         }
416         return filter;
417     }
418 
419 }