View Javadoc

1   /***
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.service.impl;
20  
21  import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.codehaus.activemq.broker.BrokerClient;
25  import org.codehaus.activemq.broker.BrokerConnector;
26  import org.codehaus.activemq.filter.Filter;
27  import org.codehaus.activemq.message.ActiveMQDestination;
28  import org.codehaus.activemq.message.ActiveMQMessage;
29  import org.codehaus.activemq.message.BrokerInfo;
30  import org.codehaus.activemq.message.ConsumerInfo;
31  import org.codehaus.activemq.message.MessageAck;
32  import org.codehaus.activemq.service.Dispatcher;
33  import org.codehaus.activemq.service.MessageContainer;
34  import org.codehaus.activemq.service.MessageIdentity;
35  import org.codehaus.activemq.service.QueueList;
36  import org.codehaus.activemq.service.QueueListEntry;
37  import org.codehaus.activemq.service.SubscriberEntry;
38  import org.codehaus.activemq.service.Subscription;
39  import org.codehaus.activemq.service.RedeliveryPolicy;
40  
41  import javax.jms.JMSException;
42  import java.util.ArrayList;
43  import java.util.List;
44  
45  /***
46   * A Subscription holds messages to be dispatched to a a Client Consumer
47   *
48   * @version $Revision: 1.25 $
49   */
50  public class SubscriptionImpl implements Subscription {
51      private static final Log log = LogFactory.getLog(SubscriptionImpl.class);
52      private String clientId;
53      private String subscriberName;
54      private ActiveMQDestination destination;
55      private String selector;
56      private int prefetchLimit;
57      private boolean noLocal;
58      private boolean active;
59      private int consumerNumber;
60      private String consumerId;
61      private boolean browser;
62      protected Dispatcher dispatch;
63      protected String brokerName;
64      protected String clusterName;
65      private MessageIdentity lastMessageIdentity;
66      Filter filter;
67      protected SynchronizedInt unconsumedMessagesDispatched = new SynchronizedInt(0);
68      QueueList messagePtrs = new DefaultQueueList();
69      private boolean usePrefetch = false;
70      private SubscriberEntry subscriberEntry;
71      private ConsumerInfo activeConsumer;
72      private BrokerClient activeClient;
73      private RedeliveryPolicy redeliveryPolicy;
74  
75      /***
76       * Create a Subscription object that holds messages to be dispatched to a Consumer
77       *
78       * @param dispatcher
79       * @param client
80       * @param info
81       * @param filter
82       */
83      public SubscriptionImpl(Dispatcher dispatcher, BrokerClient client, ConsumerInfo info, Filter filter, RedeliveryPolicy redeliveryPolicy) {
84          this.dispatch = dispatcher;
85          this.filter = filter;
86          this.redeliveryPolicy = redeliveryPolicy;
87          setActiveConsumer(client, info);
88      }
89  
90      /***
91       * Set the active consumer info
92       *
93       * @param client
94       * @param info
95       */
96      public void setActiveConsumer(BrokerClient client, ConsumerInfo info) {
97          if (info != null) {
98              this.clientId = info.getClientId();
99              this.subscriberName = info.getConsumerName();
100             this.noLocal = info.isNoLocal();
101             this.destination = info.getDestination();
102             this.selector = info.getSelector();
103             this.prefetchLimit = info.getPrefetchNumber();
104             this.consumerNumber = info.getConsumerNo();
105             this.consumerId = info.getConsumerId();
106             this.browser = info.isBrowser();
107         }
108         this.activeClient = client;
109         this.activeConsumer = info;
110         if (client != null) {
111             BrokerConnector brokerConnector = client.getBrokerConnector();
112             if (brokerConnector != null) {
113                 BrokerInfo brokerInfo = brokerConnector.getBrokerInfo();
114                 if (brokerInfo != null) {
115                     brokerName = brokerInfo.getBrokerName();
116                     clusterName = brokerInfo.getClusterName();
117                 }
118             }
119         }
120     }
121 
122     /***
123      * @return pretty print of the Subscription
124      */
125     public String toString() {
126         String str = "SubscriptionImpl(" + super.hashCode() + ")[" + consumerId + "]" + clientId + ": "
127                 + subscriberName + " : " + destination;
128         return str;
129     }
130 
131     /***
132      * Called when the Subscription is discarded
133      *
134      * @throws JMSException
135      */
136     public synchronized void clear() throws JMSException {
137         QueueListEntry entry = messagePtrs.getFirstEntry();
138         while (entry != null) {
139             MessagePointer pointer = (MessagePointer) entry.getElement();
140             pointer.clear();
141             entry = messagePtrs.getNextEntry(entry);
142         }
143         messagePtrs.clear();
144     }
145 
146     /***
147      * Called when an active subscriber has closed. This resets all MessagePtrs
148      */
149     public synchronized void reset() throws JMSException {
150         QueueListEntry entry = messagePtrs.getFirstEntry();
151         while (entry != null) {
152             MessagePointer pointer = (MessagePointer) entry.getElement();
153             if (pointer.isDispatched()) {
154                 pointer.reset();
155                 pointer.setRedelivered(true);
156             }
157             else {
158                 break;
159             }
160             entry = messagePtrs.getNextEntry(entry);
161         }
162     }
163 
164     /***
165      * @return Returns the clientId.
166      */
167     public String getClientId() {
168         return clientId;
169     }
170 
171     /***
172      * @param clientId The clientId to set.
173      */
174     public void setClientId(String clientId) {
175         this.clientId = clientId;
176     }
177 
178     /***
179      * @return Returns the filter.
180      */
181     public Filter getFilter() {
182         return filter;
183     }
184 
185     /***
186      * @param filter The filter to set.
187      */
188     public void setFilter(Filter filter) {
189         this.filter = filter;
190     }
191 
192     public boolean isWildcard() {
193         return filter.isWildcard();
194     }
195 
196     public String getPersistentKey() {
197         // not required other than for persistent topic subscriptions
198         return null;
199     }
200 
201     public boolean isSameDurableSubscription(ConsumerInfo info) throws JMSException {
202         if (isDurableTopic()) {
203             return equal(clientId, info.getClientId()) && equal(subscriberName, info.getConsumerName());
204         }
205         return false;
206     }
207 
208     /***
209      * @return Returns the noLocal.
210      */
211     public boolean isNoLocal() {
212         return noLocal;
213     }
214 
215     /***
216      * @param noLocal The noLocal to set.
217      */
218     public void setNoLocal(boolean noLocal) {
219         this.noLocal = noLocal;
220     }
221 
222     /***
223      * @return Returns the subscriberName.
224      */
225     public String getSubscriberName() {
226         return subscriberName;
227     }
228 
229     /***
230      * @param subscriberName The subscriberName to set.
231      */
232     public void setSubscriberName(String subscriberName) {
233         this.subscriberName = subscriberName;
234     }
235 
236     public RedeliveryPolicy getRedeliveryPolicy() {
237         return redeliveryPolicy;
238     }
239 
240     public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
241         this.redeliveryPolicy = redeliveryPolicy;
242     }
243 
244     /***
245      * determines if the Subscription is interested in the message
246      *
247      * @param message
248      * @return true if this Subscription will accept the message
249      * @throws JMSException
250      */
251     public boolean isTarget(ActiveMQMessage message) throws JMSException {
252         boolean result = false;
253         if (message != null) {
254             if (activeClient == null || brokerName == null || clusterName == null
255                     || !activeClient.isClusteredConnection() || !message.isEntryCluster(clusterName)
256                     || message.isEntryBroker(brokerName)) {
257                 result = filter.matches(message);
258                 // lets check that we don't have no-local enabled
259                 if (noLocal && result) {
260                     if (clientIDsEqual(message)) {
261                         result = false;
262                     }
263                 }
264             }
265         }
266         return result;
267     }
268 
269     /***
270      * If the Subscription is a target for the message, the subscription will add a reference to the message and
271      * register an interest in the message to the container
272      *
273      * @param container
274      * @param message
275      * @throws JMSException
276      */
277     public synchronized void addMessage(MessageContainer container, ActiveMQMessage message) throws JMSException {
278         //log.info("###### Adding to subscription: " + this + " message: " + message);
279         if (log.isDebugEnabled()) {
280             log.debug("Adding to subscription: " + this + " message: " + message);
281         }
282         MessagePointer pointer = new MessagePointer(container, message.getJMSMessageIdentity());
283         messagePtrs.add(pointer);
284         dispatch.wakeup(this);
285         lastMessageIdentity = message.getJMSMessageIdentity();
286     }
287 
288     /***
289      * Indicates a message has been delivered to a MessageConsumer
290      *
291      * @param ack
292      * @throws JMSException
293      */
294     public synchronized void messageConsumed(MessageAck ack) throws JMSException {
295         doMessageConsume(ack, true);
296     }
297 
298     public synchronized void onAcknowledgeTransactedMessageBeforeCommit(MessageAck ack) throws JMSException {
299         doMessageConsume(ack, false);
300     }
301 
302     public synchronized void redeliverMessage(MessageContainer container, MessageAck ack) throws JMSException {
303         QueueListEntry entry = messagePtrs.getFirstEntry();
304         while (entry != null) {
305             MessagePointer pointer = (MessagePointer) entry.getElement();
306             if (pointer.getMessageIdentity().getMessageID().equals(ack.getMessageID())) {
307                 break;
308             }
309             entry = messagePtrs.getNextEntry(entry);
310         }
311         if (entry != null) {
312             MessagePointer pointer = (MessagePointer) entry.getElement();
313             if (pointer != null) {
314                 unconsumedMessagesDispatched.increment();
315                 //System.out.println("Incremented unconsumed count to: " + unconsumedMessagesDispatched.get());
316                 pointer.reset();
317                 pointer.setRedelivered(true);
318                 dispatch.wakeup(this);
319             }
320         }
321     }
322 
323     /***
324      * Retrieve messages to dispatch
325      *
326      * @return the messages to dispatch
327      * @throws JMSException
328      */
329     public synchronized ActiveMQMessage[] getMessagesToDispatch() throws JMSException {
330         if (usePrefetch) {
331             return getMessagesWithPrefetch();
332         }
333         List tmpList = new ArrayList();
334         QueueListEntry entry = messagePtrs.getFirstEntry();
335         while (entry != null) {
336             MessagePointer pointer = (MessagePointer) entry.getElement();
337             if (!pointer.isDispatched()) {
338                 ActiveMQMessage msg = pointer.getContainer().getMessage(pointer.getMessageIdentity());
339                 if (msg != null) {
340                     if (pointer.isDispatched() || pointer.isRedelivered()) {
341                         //already dispatched - so mark as redelivered
342                         msg.setJMSRedelivered(true);
343                     }
344                     pointer.setDispatched(true);
345                     tmpList.add(msg);
346                 }
347                 else {
348                     //the message is probably expired
349                     log.info("Message probably expired: " + msg);
350                     QueueListEntry discarded = entry;
351                     entry = messagePtrs.getPrevEntry(discarded);
352                     messagePtrs.remove(discarded);
353                 }
354             }
355             entry = messagePtrs.getNextEntry(entry);
356         }
357         ActiveMQMessage[] messages = new ActiveMQMessage[tmpList.size()];
358         return (ActiveMQMessage[]) tmpList.toArray(messages);
359     }
360 
361     public synchronized SubscriberEntry getSubscriptionEntry() {
362         if (subscriberEntry == null) {
363             subscriberEntry = createSubscriptionEntry();
364         }
365         return subscriberEntry;
366     }
367 
368     // Implementation methods
369     //-------------------------------------------------------------------------
370     protected SubscriberEntry createSubscriptionEntry() {
371         SubscriberEntry answer = new SubscriberEntry();
372         answer.setClientID(clientId);
373         answer.setConsumerName(subscriberName);
374         answer.setDestination(destination.getPhysicalName());
375         answer.setSelector(selector);
376         return answer;
377     }
378 
379     protected synchronized ActiveMQMessage[] getMessagesWithPrefetch() throws JMSException {
380         List tmpList = new ArrayList();
381         QueueListEntry entry = messagePtrs.getFirstEntry();
382         int count = 0;
383         int maxNumberToDispatch = prefetchLimit - unconsumedMessagesDispatched.get();
384         while (entry != null && count < maxNumberToDispatch) {
385             MessagePointer pointer = (MessagePointer) entry.getElement();
386             if (!pointer.isDispatched()) {
387                 ActiveMQMessage msg = pointer.getContainer().getMessage(pointer.getMessageIdentity());
388                 if (msg != null) {
389                     if (pointer.isDispatched() || pointer.isRedelivered()) {
390                         //already dispatched - so mark as redelivered
391                         msg.setJMSRedelivered(true);
392                     }
393                     pointer.setDispatched(true);
394                     tmpList.add(msg);
395                     unconsumedMessagesDispatched.increment();
396                     count++;
397                 }
398                 else {
399                     //the message is probably expired
400                     log.info("Message probably expired: " + msg);
401                     QueueListEntry discarded = entry;
402                     entry = messagePtrs.getPrevEntry(discarded);
403                     messagePtrs.remove(discarded);
404                 }
405             }
406             entry = messagePtrs.getNextEntry(entry);
407         }
408         /***
409          * if (tmpList.isEmpty() && ! messagePtrs.isEmpty()) { System.out.println("### Nothing to dispatch but
410          * messagePtrs still has: " + messagePtrs.size() + " to dispatch, prefetchLimit: " + prefetchLimit + "
411          * unconsumedMessagesDispatched: " + unconsumedMessagesDispatched.get() + " maxNumberToDispatch: " +
412          * maxNumberToDispatch); MessagePointer first = (MessagePointer) messagePtrs.getFirst(); System.out.println("###
413          * First: " + first + " dispatched: " + first.isDispatched() + " id: " + first.getMessageIdentity()); } else {
414          * if (! tmpList.isEmpty()) { System.out.println("### dispatching: " + tmpList.size() + " items = " + tmpList); } }
415          */
416         ActiveMQMessage[] messages = new ActiveMQMessage[tmpList.size()];
417         return (ActiveMQMessage[]) tmpList.toArray(messages);
418     }
419 
420     /***
421      * Indicates the Subscription it's reached it's pre-fetch limit
422      *
423      * @return true/false
424      * @throws JMSException
425      */
426     public synchronized boolean isAtPrefetchLimit() throws JMSException {
427         if (usePrefetch) {
428             int underlivedMessageCount = messagePtrs.size() - unconsumedMessagesDispatched.get();
429             return underlivedMessageCount >= prefetchLimit;
430         }
431         else {
432             return false;
433         }
434     }
435 
436     /***
437      * Indicates if this Subscription has more messages to send to the Consumer
438      *
439      * @return true if more messages available to dispatch
440      */
441     public synchronized boolean isReadyToDispatch() throws JMSException {
442         /*** TODO we may have dispatched messags inside messagePtrs */
443         boolean answer = active && messagePtrs.size() > 0;
444         return answer;
445     }
446 
447     /***
448      * @return Returns the destination.
449      */
450     public ActiveMQDestination getDestination() {
451         return destination;
452     }
453 
454     /***
455      * @return Returns the selector.
456      */
457     public String getSelector() {
458         return selector;
459     }
460 
461     /***
462      * @return Returns the active.
463      */
464     public synchronized boolean isActive() {
465         return active;
466     }
467 
468     /***
469      * @param active The active to set.
470      */
471     public synchronized void setActive(boolean active) throws JMSException {
472         this.active = active;
473         if (!active) {
474             reset();
475         }
476     }
477 
478     /***
479      * @return Returns the consumerNumber.
480      */
481     public int getConsumerNumber() {
482         return consumerNumber;
483     }
484 
485     /***
486      * @return the consumer Id for the active consumer
487      */
488     public String getConsumerId() {
489         return consumerId;
490     }
491 
492     /***
493      * Indicates the Subscriber is a Durable Subscriber
494      *
495      * @return true if the subscriber is a durable topic
496      * @throws JMSException
497      */
498     public boolean isDurableTopic() throws JMSException {
499         return destination.isTopic() && subscriberName != null && subscriberName.length() > 0;
500     }
501 
502     /***
503      * Indicates the consumer is a browser only
504      *
505      * @return true if a Browser
506      * @throws JMSException
507      */
508     public boolean isBrowser() throws JMSException {
509         return browser;
510     }
511 
512     public MessageIdentity getLastMessageIdentity() throws JMSException {
513         return lastMessageIdentity;
514     }
515 
516     public void setLastMessageIdentifier(MessageIdentity messageIdentity) throws JMSException {
517         this.lastMessageIdentity = messageIdentity;
518     }
519 
520     /***
521      * Consume a message. If we are inside a transaction then we just update the consumed messages dispatched counter
522      * and we don't actually remove the message until a future call.
523      *
524      * @param ack    the ack command
525      * @param remove whether we should actually remove the message (i.e. really consume the message) or should we just
526      *               update the counters for the dispatcher / prefetch logic to work
527      */
528     protected synchronized void doMessageConsume(MessageAck ack, boolean remove) throws JMSException {
529         //remove up to this message
530         int count = 0;
531         boolean found = false;
532         QueueListEntry entry = messagePtrs.getFirstEntry();
533         while (entry != null) {
534             MessagePointer pointer = (MessagePointer) entry.getElement();
535             if (remove) {
536                 messagePtrs.remove(entry);
537                 if (ack.isMessageRead() && !browser) {
538                     pointer.delete(ack);//delete message from the container (if possible)
539                 }
540             }
541             count++;
542             // in transactions, we decrement on the first call and then don't decrement
543             // the second call when we really remove the pointer
544             if (remove && !ack.isPartOfTransaction()) {
545                 unconsumedMessagesDispatched.decrement();
546             }
547             if (pointer.getMessageIdentity().equals(ack.getMessageIdentity())) {
548                 if (!remove && ack.isPartOfTransaction()) {
549                     // only decrement by one on the last message
550                     // as we will be keeping around all the unconsumedMessages
551                     unconsumedMessagesDispatched.decrement();
552                 }
553                 found = true;
554                 break;
555             }
556             entry = messagePtrs.getNextEntry(entry);
557         }
558         if (!found && log.isDebugEnabled()) {
559             log.debug("Did not find a matching message for identity: " + ack.getMessageIdentity());
560         }
561         dispatch.wakeup(this);
562     }
563 
564     protected boolean clientIDsEqual(ActiveMQMessage message) {
565         String msgClientID = message.getJMSClientID();
566         String producerClientID = message.getProducerID();
567         String subClientID = clientId;
568         if (producerClientID != null && producerClientID.equals(subClientID)) {
569             return true;
570         }
571         else if (msgClientID == null || subClientID == null) {
572             return false;
573         }
574         else {
575             return msgClientID.equals(subClientID);
576         }
577     }
578 
579     protected static final boolean equal(Object left, Object right) {
580         return left == right || (left != null && right != null && left.equals(right));
581     }
582 }