1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.service.boundedvm;
20
21 import java.util.Collections;
22 import java.util.Iterator;
23 import java.util.Map;
24 import java.util.Set;
25 import javax.jms.JMSException;
26 import org.codehaus.activemq.broker.BrokerClient;
27 import org.codehaus.activemq.filter.AndFilter;
28 import org.codehaus.activemq.filter.DestinationMap;
29 import org.codehaus.activemq.filter.Filter;
30 import org.codehaus.activemq.filter.FilterFactory;
31 import org.codehaus.activemq.filter.FilterFactoryImpl;
32 import org.codehaus.activemq.filter.NoLocalFilter;
33 import org.codehaus.activemq.message.ActiveMQDestination;
34 import org.codehaus.activemq.message.ActiveMQMessage;
35 import org.codehaus.activemq.message.ConsumerInfo;
36 import org.codehaus.activemq.message.MessageAck;
37 import org.codehaus.activemq.message.util.MemoryBoundedQueue;
38 import org.codehaus.activemq.message.util.MemoryBoundedQueueManager;
39 import org.codehaus.activemq.service.MessageContainer;
40 import org.codehaus.activemq.service.MessageContainerManager;
41 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
42 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
43
44 /***
45 * A MessageContainerManager for transient topics
46 *
47 * @version $Revision: 1.11 $
48 */
49
50 /***
51 * A manager of MessageContainer instances
52 */
53 public class TransientTopicBoundedMessageManager implements MessageContainerManager {
54 private MemoryBoundedQueueManager queueManager;
55 private ConcurrentHashMap containers;
56 private DestinationMap destinationMap;
57 private FilterFactory filterFactory;
58 private SynchronizedBoolean started;
59 private Map destinations;
60
61 /***
62 * Constructor for TransientTopicBoundedMessageManager
63 *
64 * @param mgr
65 */
66 public TransientTopicBoundedMessageManager(MemoryBoundedQueueManager mgr) {
67 this.queueManager = mgr;
68 this.containers = new ConcurrentHashMap();
69 this.destinationMap = new DestinationMap();
70 this.destinations = new ConcurrentHashMap();
71 this.filterFactory = new FilterFactoryImpl();
72 this.started = new SynchronizedBoolean(false);
73 }
74
75 /***
76 * start the manager
77 *
78 * @throws JMSException
79 */
80 public void start() throws JMSException {
81 if (started.commit(false, true)) {
82 for (Iterator i = containers.values().iterator(); i.hasNext();) {
83 TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) i.next();
84 container.start();
85 }
86 }
87 }
88
89 /***
90 * stop the manager
91 *
92 * @throws JMSException
93 */
94 public void stop() throws JMSException {
95 if (started.commit(true, false)) {
96 for (Iterator i = containers.values().iterator(); i.hasNext();) {
97 TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) i.next();
98 container.stop();
99 }
100 }
101 }
102
103 /***
104 * Add a consumer if appropiate
105 *
106 * @param client
107 * @param info
108 * @throws JMSException
109 */
110 public synchronized void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
111 ActiveMQDestination destination = info.getDestination();
112 if (destination.isTopic()) {
113 TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) containers
114 .get(client);
115 if (container == null) {
116 MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(client.toString());
117 container = new TransientTopicBoundedMessageContainer(client, queue);
118 containers.put(client, container);
119 if (started.get()) {
120 container.start();
121 }
122 }
123 container.addConsumer(createFilter(info), info);
124 destinationMap.put(destination,container);
125 String name = destination.getPhysicalName();
126 if (!destinations.containsKey(name)) {
127 destinations.put(name, destination);
128 }
129 }
130 }
131
132 /***
133 * @param client
134 * @param info
135 * @throws JMSException
136 */
137 public synchronized void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
138 ActiveMQDestination destination = info.getDestination();
139 if (destination.isTopic()) {
140 TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) containers
141 .get(client);
142 if (container != null) {
143 container.removeConsumer(info);
144 if (container.isInactive()) {
145 containers.remove(client);
146 container.close();
147 destinationMap.remove(destination, container);
148 }
149
150
151 if (!hasConsumerFor(destination)) {
152 destinations.remove(destination.getPhysicalName());
153 }
154 }
155 }
156 }
157
158 /***
159 * Delete a durable subscriber
160 *
161 * @param clientId
162 * @param subscriberName
163 * @throws JMSException if the subscriber doesn't exist or is still active
164 */
165 public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
166 }
167
168 /***
169 * @param client
170 * @param message
171 * @throws JMSException
172 */
173 public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
174 if (message != null && message.getJMSActiveMQDestination().isTopic()) {
175
176 Set set = destinationMap.get(message.getJMSActiveMQDestination());
177 for (Iterator i = set.iterator(); i.hasNext();) {
178 TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) i.next();
179 container.targetAndDispatch(client,message);
180 }
181 }
182 }
183
184 /***
185 * @param client
186 * @param ack
187 * @throws JMSException
188 *
189 */
190 public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
191 }
192
193 /***
194 * @param client
195 * @param transactionId
196 * @param ack
197 * @throws JMSException
198 *
199 */
200 public void acknowledgeTransactedMessage(BrokerClient client, String transactionId, MessageAck ack) throws JMSException {
201 }
202
203 /***
204 * @param client
205 * @param ack
206 * @throws JMSException
207 *
208 */
209 public void redeliverMessage(BrokerClient client, MessageAck ack) throws JMSException {
210 }
211
212 /***
213 * @throws JMSException
214 *
215 */
216
217 public void poll() throws JMSException {
218 }
219
220 /***
221 * A hook when the transaction is about to be commited; so apply all outstanding commands to the Journal if using a
222 * Journal (transaction log)
223 *
224 * @param client
225 * @param transactionId
226 * @throws JMSException
227 */
228 public void commitTransaction(BrokerClient client, String transactionId) throws JMSException {
229 }
230
231 /***
232 * A hook when the transaction is about to be rolled back; so discard all outstanding commands that are pending to
233 * be written to the Journal
234 *
235 * @param client
236 * @param transactionId
237 */
238 public void rollbackTransaction(BrokerClient client, String transactionId) {
239 }
240
241 /***
242 * For Transient topics - a MessageContainer maps on to the messages
243 * to be dispatched through a BrokerClient, not a destination
244 * @param physicalName
245 * @return the MessageContainer used for dispatching - always returns null
246 * @throws JMSException
247 */
248 public MessageContainer getContainer(String physicalName) throws JMSException {
249 return null;
250 }
251
252 /***
253 * @return a map of all the destinations
254 */
255 public Map getDestinations() {
256 return Collections.unmodifiableMap(destinations);
257 }
258
259 /***
260 * Create filter for a Consumer
261 *
262 * @param info
263 * @return the Fitler
264 * @throws javax.jms.JMSException
265 */
266 protected Filter createFilter(ConsumerInfo info) throws JMSException {
267 Filter filter = filterFactory.createFilter(info.getDestination(), info.getSelector());
268 if (info.isNoLocal()) {
269 filter = new AndFilter(filter, new NoLocalFilter(info.getClientId()));
270 }
271 return filter;
272 }
273
274 protected boolean hasConsumerFor(ActiveMQDestination destination) {
275 for (Iterator i = containers.values().iterator(); i.hasNext();) {
276 TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) i.next();
277 if (container.hasConsumerFor(destination)) {
278 return true;
279 }
280 }
281 return false;
282 }
283
284 }