1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.service.impl;
20
21 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.codehaus.activemq.broker.BrokerClient;
25 import org.codehaus.activemq.broker.BrokerConnector;
26 import org.codehaus.activemq.filter.Filter;
27 import org.codehaus.activemq.message.ActiveMQDestination;
28 import org.codehaus.activemq.message.ActiveMQMessage;
29 import org.codehaus.activemq.message.BrokerInfo;
30 import org.codehaus.activemq.message.ConsumerInfo;
31 import org.codehaus.activemq.message.MessageAck;
32 import org.codehaus.activemq.service.Dispatcher;
33 import org.codehaus.activemq.service.MessageContainer;
34 import org.codehaus.activemq.service.MessageIdentity;
35 import org.codehaus.activemq.service.QueueList;
36 import org.codehaus.activemq.service.QueueListEntry;
37 import org.codehaus.activemq.service.SubscriberEntry;
38 import org.codehaus.activemq.service.Subscription;
39 import org.codehaus.activemq.service.RedeliveryPolicy;
40
41 import javax.jms.JMSException;
42 import java.util.ArrayList;
43 import java.util.List;
44
45 /***
46 * A Subscription holds messages to be dispatched to a a Client Consumer
47 *
48 * @version $Revision: 1.25 $
49 */
50 public class SubscriptionImpl implements Subscription {
51 private static final Log log = LogFactory.getLog(SubscriptionImpl.class);
52 private String clientId;
53 private String subscriberName;
54 private ActiveMQDestination destination;
55 private String selector;
56 private int prefetchLimit;
57 private boolean noLocal;
58 private boolean active;
59 private int consumerNumber;
60 private String consumerId;
61 private boolean browser;
62 protected Dispatcher dispatch;
63 protected String brokerName;
64 protected String clusterName;
65 private MessageIdentity lastMessageIdentity;
66 Filter filter;
67 protected SynchronizedInt unconsumedMessagesDispatched = new SynchronizedInt(0);
68 QueueList messagePtrs = new DefaultQueueList();
69 private boolean usePrefetch = false;
70 private SubscriberEntry subscriberEntry;
71 private ConsumerInfo activeConsumer;
72 private BrokerClient activeClient;
73 private RedeliveryPolicy redeliveryPolicy;
74
75 /***
76 * Create a Subscription object that holds messages to be dispatched to a Consumer
77 *
78 * @param dispatcher
79 * @param client
80 * @param info
81 * @param filter
82 */
83 public SubscriptionImpl(Dispatcher dispatcher, BrokerClient client, ConsumerInfo info, Filter filter, RedeliveryPolicy redeliveryPolicy) {
84 this.dispatch = dispatcher;
85 this.filter = filter;
86 this.redeliveryPolicy = redeliveryPolicy;
87 setActiveConsumer(client, info);
88 }
89
90 /***
91 * Set the active consumer info
92 *
93 * @param client
94 * @param info
95 */
96 public void setActiveConsumer(BrokerClient client, ConsumerInfo info) {
97 if (info != null) {
98 this.clientId = info.getClientId();
99 this.subscriberName = info.getConsumerName();
100 this.noLocal = info.isNoLocal();
101 this.destination = info.getDestination();
102 this.selector = info.getSelector();
103 this.prefetchLimit = info.getPrefetchNumber();
104 this.consumerNumber = info.getConsumerNo();
105 this.consumerId = info.getConsumerId();
106 this.browser = info.isBrowser();
107 }
108 this.activeClient = client;
109 this.activeConsumer = info;
110 if (client != null) {
111 BrokerConnector brokerConnector = client.getBrokerConnector();
112 if (brokerConnector != null) {
113 BrokerInfo brokerInfo = brokerConnector.getBrokerInfo();
114 if (brokerInfo != null) {
115 brokerName = brokerInfo.getBrokerName();
116 clusterName = brokerInfo.getClusterName();
117 }
118 }
119 }
120 }
121
122 /***
123 * @return pretty print of the Subscription
124 */
125 public String toString() {
126 String str = "SubscriptionImpl(" + super.hashCode() + ")[" + consumerId + "]" + clientId + ": "
127 + subscriberName + " : " + destination;
128 return str;
129 }
130
131 /***
132 * Called when the Subscription is discarded
133 *
134 * @throws JMSException
135 */
136 public synchronized void clear() throws JMSException {
137 QueueListEntry entry = messagePtrs.getFirstEntry();
138 while (entry != null) {
139 MessagePointer pointer = (MessagePointer) entry.getElement();
140 pointer.clear();
141 entry = messagePtrs.getNextEntry(entry);
142 }
143 messagePtrs.clear();
144 }
145
146 /***
147 * Called when an active subscriber has closed. This resets all MessagePtrs
148 */
149 public synchronized void reset() throws JMSException {
150 QueueListEntry entry = messagePtrs.getFirstEntry();
151 while (entry != null) {
152 MessagePointer pointer = (MessagePointer) entry.getElement();
153 if (pointer.isDispatched()) {
154 pointer.reset();
155 pointer.setRedelivered(true);
156 }
157 else {
158 break;
159 }
160 entry = messagePtrs.getNextEntry(entry);
161 }
162 }
163
164 /***
165 * @return Returns the clientId.
166 */
167 public String getClientId() {
168 return clientId;
169 }
170
171 /***
172 * @param clientId The clientId to set.
173 */
174 public void setClientId(String clientId) {
175 this.clientId = clientId;
176 }
177
178 /***
179 * @return Returns the filter.
180 */
181 public Filter getFilter() {
182 return filter;
183 }
184
185 /***
186 * @param filter The filter to set.
187 */
188 public void setFilter(Filter filter) {
189 this.filter = filter;
190 }
191
192 public boolean isWildcard() {
193 return filter.isWildcard();
194 }
195
196 public String getPersistentKey() {
197
198 return null;
199 }
200
201 public boolean isSameDurableSubscription(ConsumerInfo info) throws JMSException {
202 if (isDurableTopic()) {
203 return equal(clientId, info.getClientId()) && equal(subscriberName, info.getConsumerName());
204 }
205 return false;
206 }
207
208 /***
209 * @return Returns the noLocal.
210 */
211 public boolean isNoLocal() {
212 return noLocal;
213 }
214
215 /***
216 * @param noLocal The noLocal to set.
217 */
218 public void setNoLocal(boolean noLocal) {
219 this.noLocal = noLocal;
220 }
221
222 /***
223 * @return Returns the subscriberName.
224 */
225 public String getSubscriberName() {
226 return subscriberName;
227 }
228
229 /***
230 * @param subscriberName The subscriberName to set.
231 */
232 public void setSubscriberName(String subscriberName) {
233 this.subscriberName = subscriberName;
234 }
235
236 public RedeliveryPolicy getRedeliveryPolicy() {
237 return redeliveryPolicy;
238 }
239
240 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
241 this.redeliveryPolicy = redeliveryPolicy;
242 }
243
244 /***
245 * determines if the Subscription is interested in the message
246 *
247 * @param message
248 * @return true if this Subscription will accept the message
249 * @throws JMSException
250 */
251 public boolean isTarget(ActiveMQMessage message) throws JMSException {
252 boolean result = false;
253 if (message != null) {
254 if (activeClient == null || brokerName == null || clusterName == null
255 || !activeClient.isClusteredConnection() || !message.isEntryCluster(clusterName)
256 || message.isEntryBroker(brokerName)) {
257 result = filter.matches(message);
258
259 if (noLocal && result) {
260 if (clientIDsEqual(message)) {
261 result = false;
262 }
263 }
264 }
265 }
266 return result;
267 }
268
269 /***
270 * If the Subscription is a target for the message, the subscription will add a reference to the message and
271 * register an interest in the message to the container
272 *
273 * @param container
274 * @param message
275 * @throws JMSException
276 */
277 public synchronized void addMessage(MessageContainer container, ActiveMQMessage message) throws JMSException {
278
279 if (log.isDebugEnabled()) {
280 log.debug("Adding to subscription: " + this + " message: " + message);
281 }
282 MessagePointer pointer = new MessagePointer(container, message.getJMSMessageIdentity());
283 messagePtrs.add(pointer);
284 dispatch.wakeup(this);
285 lastMessageIdentity = message.getJMSMessageIdentity();
286 }
287
288 /***
289 * Indicates a message has been delivered to a MessageConsumer
290 *
291 * @param ack
292 * @throws JMSException
293 */
294 public synchronized void messageConsumed(MessageAck ack) throws JMSException {
295 doMessageConsume(ack, true);
296 }
297
298 public synchronized void onAcknowledgeTransactedMessageBeforeCommit(MessageAck ack) throws JMSException {
299 doMessageConsume(ack, false);
300 }
301
302 public synchronized void redeliverMessage(MessageContainer container, MessageAck ack) throws JMSException {
303 QueueListEntry entry = messagePtrs.getFirstEntry();
304 while (entry != null) {
305 MessagePointer pointer = (MessagePointer) entry.getElement();
306 if (pointer.getMessageIdentity().getMessageID().equals(ack.getMessageID())) {
307 break;
308 }
309 entry = messagePtrs.getNextEntry(entry);
310 }
311 if (entry != null) {
312 MessagePointer pointer = (MessagePointer) entry.getElement();
313 if (pointer != null) {
314 unconsumedMessagesDispatched.increment();
315
316 pointer.reset();
317 pointer.setRedelivered(true);
318 dispatch.wakeup(this);
319 }
320 }
321 }
322
323 /***
324 * Retrieve messages to dispatch
325 *
326 * @return the messages to dispatch
327 * @throws JMSException
328 */
329 public synchronized ActiveMQMessage[] getMessagesToDispatch() throws JMSException {
330 if (usePrefetch) {
331 return getMessagesWithPrefetch();
332 }
333 List tmpList = new ArrayList();
334 QueueListEntry entry = messagePtrs.getFirstEntry();
335 while (entry != null) {
336 MessagePointer pointer = (MessagePointer) entry.getElement();
337 if (!pointer.isDispatched()) {
338 ActiveMQMessage msg = pointer.getContainer().getMessage(pointer.getMessageIdentity());
339 if (msg != null) {
340 if (pointer.isDispatched() || pointer.isRedelivered()) {
341
342 msg.setJMSRedelivered(true);
343 }
344 pointer.setDispatched(true);
345 tmpList.add(msg);
346 }
347 else {
348
349 log.info("Message probably expired: " + msg);
350 QueueListEntry discarded = entry;
351 entry = messagePtrs.getPrevEntry(discarded);
352 messagePtrs.remove(discarded);
353 }
354 }
355 entry = messagePtrs.getNextEntry(entry);
356 }
357 ActiveMQMessage[] messages = new ActiveMQMessage[tmpList.size()];
358 return (ActiveMQMessage[]) tmpList.toArray(messages);
359 }
360
361 public synchronized SubscriberEntry getSubscriptionEntry() {
362 if (subscriberEntry == null) {
363 subscriberEntry = createSubscriptionEntry();
364 }
365 return subscriberEntry;
366 }
367
368
369
370 protected SubscriberEntry createSubscriptionEntry() {
371 SubscriberEntry answer = new SubscriberEntry();
372 answer.setClientID(clientId);
373 answer.setConsumerName(subscriberName);
374 answer.setDestination(destination.getPhysicalName());
375 answer.setSelector(selector);
376 return answer;
377 }
378
379 protected synchronized ActiveMQMessage[] getMessagesWithPrefetch() throws JMSException {
380 List tmpList = new ArrayList();
381 QueueListEntry entry = messagePtrs.getFirstEntry();
382 int count = 0;
383 int maxNumberToDispatch = prefetchLimit - unconsumedMessagesDispatched.get();
384 while (entry != null && count < maxNumberToDispatch) {
385 MessagePointer pointer = (MessagePointer) entry.getElement();
386 if (!pointer.isDispatched()) {
387 ActiveMQMessage msg = pointer.getContainer().getMessage(pointer.getMessageIdentity());
388 if (msg != null) {
389 if (pointer.isDispatched() || pointer.isRedelivered()) {
390
391 msg.setJMSRedelivered(true);
392 }
393 pointer.setDispatched(true);
394 tmpList.add(msg);
395 unconsumedMessagesDispatched.increment();
396 count++;
397 }
398 else {
399
400 log.info("Message probably expired: " + msg);
401 QueueListEntry discarded = entry;
402 entry = messagePtrs.getPrevEntry(discarded);
403 messagePtrs.remove(discarded);
404 }
405 }
406 entry = messagePtrs.getNextEntry(entry);
407 }
408 /***
409 * if (tmpList.isEmpty() && ! messagePtrs.isEmpty()) { System.out.println("### Nothing to dispatch but
410 * messagePtrs still has: " + messagePtrs.size() + " to dispatch, prefetchLimit: " + prefetchLimit + "
411 * unconsumedMessagesDispatched: " + unconsumedMessagesDispatched.get() + " maxNumberToDispatch: " +
412 * maxNumberToDispatch); MessagePointer first = (MessagePointer) messagePtrs.getFirst(); System.out.println("###
413 * First: " + first + " dispatched: " + first.isDispatched() + " id: " + first.getMessageIdentity()); } else {
414 * if (! tmpList.isEmpty()) { System.out.println("### dispatching: " + tmpList.size() + " items = " + tmpList); } }
415 */
416 ActiveMQMessage[] messages = new ActiveMQMessage[tmpList.size()];
417 return (ActiveMQMessage[]) tmpList.toArray(messages);
418 }
419
420 /***
421 * Indicates the Subscription it's reached it's pre-fetch limit
422 *
423 * @return true/false
424 * @throws JMSException
425 */
426 public synchronized boolean isAtPrefetchLimit() throws JMSException {
427 if (usePrefetch) {
428 int underlivedMessageCount = messagePtrs.size() - unconsumedMessagesDispatched.get();
429 return underlivedMessageCount >= prefetchLimit;
430 }
431 else {
432 return false;
433 }
434 }
435
436 /***
437 * Indicates if this Subscription has more messages to send to the Consumer
438 *
439 * @return true if more messages available to dispatch
440 */
441 public synchronized boolean isReadyToDispatch() throws JMSException {
442 /*** TODO we may have dispatched messags inside messagePtrs */
443 boolean answer = active && messagePtrs.size() > 0;
444 return answer;
445 }
446
447 /***
448 * @return Returns the destination.
449 */
450 public ActiveMQDestination getDestination() {
451 return destination;
452 }
453
454 /***
455 * @return Returns the selector.
456 */
457 public String getSelector() {
458 return selector;
459 }
460
461 /***
462 * @return Returns the active.
463 */
464 public synchronized boolean isActive() {
465 return active;
466 }
467
468 /***
469 * @param active The active to set.
470 */
471 public synchronized void setActive(boolean active) throws JMSException {
472 this.active = active;
473 if (!active) {
474 reset();
475 }
476 }
477
478 /***
479 * @return Returns the consumerNumber.
480 */
481 public int getConsumerNumber() {
482 return consumerNumber;
483 }
484
485 /***
486 * @return the consumer Id for the active consumer
487 */
488 public String getConsumerId() {
489 return consumerId;
490 }
491
492 /***
493 * Indicates the Subscriber is a Durable Subscriber
494 *
495 * @return true if the subscriber is a durable topic
496 * @throws JMSException
497 */
498 public boolean isDurableTopic() throws JMSException {
499 return destination.isTopic() && subscriberName != null && subscriberName.length() > 0;
500 }
501
502 /***
503 * Indicates the consumer is a browser only
504 *
505 * @return true if a Browser
506 * @throws JMSException
507 */
508 public boolean isBrowser() throws JMSException {
509 return browser;
510 }
511
512 public MessageIdentity getLastMessageIdentity() throws JMSException {
513 return lastMessageIdentity;
514 }
515
516 public void setLastMessageIdentifier(MessageIdentity messageIdentity) throws JMSException {
517 this.lastMessageIdentity = messageIdentity;
518 }
519
520 /***
521 * Consume a message. If we are inside a transaction then we just update the consumed messages dispatched counter
522 * and we don't actually remove the message until a future call.
523 *
524 * @param ack the ack command
525 * @param remove whether we should actually remove the message (i.e. really consume the message) or should we just
526 * update the counters for the dispatcher / prefetch logic to work
527 */
528 protected synchronized void doMessageConsume(MessageAck ack, boolean remove) throws JMSException {
529
530 int count = 0;
531 boolean found = false;
532 QueueListEntry entry = messagePtrs.getFirstEntry();
533 while (entry != null) {
534 MessagePointer pointer = (MessagePointer) entry.getElement();
535 if (remove) {
536 messagePtrs.remove(entry);
537 if (ack.isMessageRead() && !browser) {
538 pointer.delete(ack);
539 }
540 }
541 count++;
542
543
544 if (remove && !ack.isPartOfTransaction()) {
545 unconsumedMessagesDispatched.decrement();
546 }
547 if (pointer.getMessageIdentity().equals(ack.getMessageIdentity())) {
548 if (!remove && ack.isPartOfTransaction()) {
549
550
551 unconsumedMessagesDispatched.decrement();
552 }
553 found = true;
554 break;
555 }
556 entry = messagePtrs.getNextEntry(entry);
557 }
558 if (!found && log.isDebugEnabled()) {
559 log.debug("Did not find a matching message for identity: " + ack.getMessageIdentity());
560 }
561 dispatch.wakeup(this);
562 }
563
564 protected boolean clientIDsEqual(ActiveMQMessage message) {
565 String msgClientID = message.getJMSClientID();
566 String producerClientID = message.getProducerID();
567 String subClientID = clientId;
568 if (producerClientID != null && producerClientID.equals(subClientID)) {
569 return true;
570 }
571 else if (msgClientID == null || subClientID == null) {
572 return false;
573 }
574 else {
575 return msgClientID.equals(subClientID);
576 }
577 }
578
579 protected static final boolean equal(Object left, Object right) {
580 return left == right || (left != null && right != null && left.equals(right));
581 }
582 }