1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.service.impl;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.codehaus.activemq.broker.BrokerClient;
23 import org.codehaus.activemq.filter.FilterFactory;
24 import org.codehaus.activemq.filter.FilterFactoryImpl;
25 import org.codehaus.activemq.message.ActiveMQDestination;
26 import org.codehaus.activemq.message.ActiveMQMessage;
27 import org.codehaus.activemq.message.ConsumerInfo;
28 import org.codehaus.activemq.service.Dispatcher;
29 import org.codehaus.activemq.service.MessageContainer;
30 import org.codehaus.activemq.service.Subscription;
31 import org.codehaus.activemq.service.SubscriptionContainer;
32 import org.codehaus.activemq.service.RedeliveryPolicy;
33 import org.codehaus.activemq.store.PersistenceAdapter;
34
35 import javax.jms.DeliveryMode;
36 import javax.jms.JMSException;
37 import java.util.Iterator;
38 import java.util.Set;
39
40 /***
41 * A default implementation of a Broker of Topic messages for transient consumers
42 *
43 * @version $Revision: 1.6 $
44 */
45 public class TransientTopicMessageContainerManager extends DurableTopicMessageContainerManager {
46 private static final Log log = LogFactory.getLog(TransientTopicMessageContainerManager.class);
47
48 public TransientTopicMessageContainerManager(PersistenceAdapter persistenceAdapter) {
49 this(persistenceAdapter, new SubscriptionContainerImpl(new RedeliveryPolicy()), new FilterFactoryImpl(), new DispatcherImpl());
50 }
51
52 public TransientTopicMessageContainerManager(PersistenceAdapter persistenceAdapter, SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) {
53 super(persistenceAdapter, subscriptionContainer, filterFactory, dispatcher);
54 }
55
56 /***
57 * @param client
58 * @param info
59 * @throws javax.jms.JMSException
60 */
61 public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
62 if (info.getDestination().isTopic()) {
63 doAddMessageConsumer(client, info);
64 }
65 }
66
67
68 /***
69 * @param client
70 * @param info
71 * @throws javax.jms.JMSException
72 */
73 public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
74 Subscription sub = (Subscription) activeSubscriptions.remove(info.getConsumerId());
75 if (sub != null) {
76 sub.setActive(false);
77 dispatcher.removeActiveSubscription(client, sub);
78 subscriptionContainer.removeSubscription(info.getConsumerId());
79 sub.clear();
80 }
81 }
82
83
84 /***
85 * @param client
86 * @param message
87 * @throws javax.jms.JMSException
88 */
89 public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
90 ActiveMQDestination destination = message.getJMSActiveMQDestination();
91 if (destination != null && destination.isTopic()) {
92 MessageContainer container = null;
93 if (log.isDebugEnabled()) {
94 log.debug("Dispaching to " + subscriptionContainer + " subscriptions with message: " + message);
95 }
96 Set subscriptions = subscriptionContainer.getSubscriptions(destination);
97 for (Iterator i = subscriptions.iterator(); i.hasNext();) {
98 Subscription sub = (Subscription) i.next();
99 if (sub.isTarget(message) && (!sub.isDurableTopic() || message.getJMSDeliveryMode() == DeliveryMode.NON_PERSISTENT)) {
100 if (container == null) {
101 container = getContainer(message.getJMSDestination().toString());
102 container.addMessage(message);
103 }
104 sub.addMessage(container, message);
105 }
106 }
107 updateSendStats(client, message);
108 }
109 }
110
111 /***
112 * Delete a durable subscriber
113 *
114 * @param clientId
115 * @param subscriberName
116 * @throws javax.jms.JMSException if the subscriber doesn't exist or is still active
117 */
118 public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
119 }
120 }