1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.service.boundedvm;
20 import java.util.Collections;
21 import java.util.Iterator;
22 import java.util.Map;
23 import java.util.Set;
24 import javax.jms.Destination;
25 import javax.jms.JMSException;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.codehaus.activemq.broker.BrokerClient;
29 import org.codehaus.activemq.filter.AndFilter;
30 import org.codehaus.activemq.filter.DestinationMap;
31 import org.codehaus.activemq.filter.Filter;
32 import org.codehaus.activemq.filter.FilterFactory;
33 import org.codehaus.activemq.filter.FilterFactoryImpl;
34 import org.codehaus.activemq.filter.NoLocalFilter;
35 import org.codehaus.activemq.message.ActiveMQDestination;
36 import org.codehaus.activemq.message.ActiveMQMessage;
37 import org.codehaus.activemq.message.ConsumerInfo;
38 import org.codehaus.activemq.message.MessageAck;
39 import org.codehaus.activemq.message.util.MemoryBoundedQueue;
40 import org.codehaus.activemq.message.util.MemoryBoundedQueueManager;
41 import org.codehaus.activemq.service.MessageContainer;
42 import org.codehaus.activemq.service.MessageContainerManager;
43 import org.codehaus.activemq.service.impl.DispatcherImpl;
44 import org.codehaus.activemq.service.impl.MessageContainerManagerSupport;
45 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
46 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
47
48 /***
49 * A MessageContainerManager for transient queues
50 *
51 * @version $Revision: 1.4 $
52 */
53 /***
54 * A manager of MessageContainer instances
55 */
56 public class TransientQueueBoundedMessageManager implements MessageContainerManager, Runnable {
57 private static final int GARBAGE_COLLECTION_CAPACITY_LIMIT = 20;
58 private static final Log log = LogFactory.getLog(TransientQueueBoundedMessageManager.class);
59 private MemoryBoundedQueueManager queueManager;
60 private ConcurrentHashMap containers;
61 private ConcurrentHashMap subscriptions;
62 private FilterFactory filterFactory;
63 private SynchronizedBoolean started;
64 private SynchronizedBoolean doingGarbageCollection;
65 private Map destinations;
66 private DestinationMap destinationMap;
67 private Thread garbageCollectionThread;
68
69 /***
70 * Constructor for TransientQueueBoundedMessageManager
71 *
72 * @param mgr
73 */
74 public TransientQueueBoundedMessageManager(MemoryBoundedQueueManager mgr) {
75 this.queueManager = mgr;
76 this.containers = new ConcurrentHashMap();
77 this.destinationMap = new DestinationMap();
78 this.destinations = new ConcurrentHashMap();
79 this.subscriptions = new ConcurrentHashMap();
80 this.filterFactory = new FilterFactoryImpl();
81 this.started = new SynchronizedBoolean(false);
82 this.doingGarbageCollection = new SynchronizedBoolean(false);
83 }
84
85 /***
86 * start the manager
87 *
88 * @throws JMSException
89 */
90 public void start() throws JMSException {
91 if (started.commit(false, true)) {
92 for (Iterator i = containers.values().iterator();i.hasNext();) {
93 TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next();
94 container.start();
95 }
96 garbageCollectionThread = new Thread(this);
97 garbageCollectionThread.setName("TQMCMGarbageCollector");
98 garbageCollectionThread.start();
99 }
100 }
101
102 /***
103 * stop the manager
104 *
105 * @throws JMSException
106 */
107 public void stop() throws JMSException {
108 if (started.commit(true, false)) {
109 for (Iterator i = containers.values().iterator();i.hasNext();) {
110 TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next();
111 container.stop();
112 }
113 if (garbageCollectionThread != null) {
114 garbageCollectionThread.interrupt();
115 }
116 }
117 }
118
119 /***
120 * collect expired messages
121 */
122 public void run() {
123 while (started.get()) {
124 doGarbageCollection();
125 try {
126 Thread.sleep(2000);
127 }
128 catch (InterruptedException e) {
129 }
130 }
131 }
132
133 /***
134 * Add a consumer if appropiate
135 *
136 * @param client
137 * @param info
138 * @throws JMSException
139 */
140 public synchronized void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
141 ActiveMQDestination destination = info.getDestination();
142 if (destination.isQueue()) {
143 TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) containers
144 .get(destination);
145 if (container == null) {
146 MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(client.toString());
147 container = new TransientQueueBoundedMessageContainer(queueManager, destination);
148 addContainer(container);
149 if (started.get()) {
150 container.start();
151 }
152 }
153 TransientQueueSubscription ts = container.addConsumer(createFilter(info), info, client);
154 if (ts != null) {
155 subscriptions.put(info.getConsumerId(), ts);
156 }
157 String name = destination.getPhysicalName();
158 if (!destinations.containsKey(name)) {
159 destinations.put(name, destination);
160 }
161 }
162 }
163
164 /***
165 * @param client
166 * @param info
167 * @throws JMSException
168 */
169 public synchronized void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
170 ActiveMQDestination destination = info.getDestination();
171 if (destination.isQueue()) {
172 for (Iterator i = containers.values().iterator();i.hasNext();) {
173 TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next();
174 if (container != null) {
175 container.removeConsumer(info);
176 }
177 }
178 subscriptions.remove(info.getConsumerId());
179 }
180 }
181
182 /***
183 * Delete a durable subscriber
184 *
185 * @param clientId
186 * @param subscriberName
187 * @throws JMSException if the subscriber doesn't exist or is still active
188 */
189 public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
190 }
191
192 /***
193 * @param client
194 * @param message
195 * @throws JMSException
196 */
197 public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
198 if (message != null && message.getJMSActiveMQDestination().isQueue() && (message.isTemporary())) {
199 if (queueManager.getCurrentCapacity() <= GARBAGE_COLLECTION_CAPACITY_LIMIT) {
200 doGarbageCollection();
201 }
202 Set set = destinationMap.get(message.getJMSActiveMQDestination());
203 for (Iterator i = set.iterator();i.hasNext();) {
204 TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next();
205 container.enqueue(message);
206 }
207 }
208 }
209
210 /***
211 * @param client
212 * @param ack
213 * @throws JMSException
214 */
215 public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
216 TransientQueueSubscription ts = (TransientQueueSubscription) subscriptions.get(ack.getConsumerId());
217 if (ts != null) {
218 ActiveMQMessage message = ts.acknowledgeMessage(ack.getMessageID());
219 if (message != null && !ack.isMessageRead()){
220 message.setJMSRedelivered(true);
221 Set set = destinationMap.get(message.getJMSActiveMQDestination());
222 for (Iterator i = set.iterator();i.hasNext();) {
223 TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next();
224 container.enqueueFirst(message);
225 break;
226 }
227 }
228 }
229 }
230
231 /***
232 * @param client
233 * @param transactionId
234 * @param ack
235 * @throws JMSException
236 */
237 public void acknowledgeTransactedMessage(BrokerClient client, String transactionId, MessageAck ack)
238 throws JMSException {
239 }
240
241 /***
242 * @param client
243 * @param ack
244 * @throws JMSException
245 */
246 public void redeliverMessage(BrokerClient client, MessageAck ack) throws JMSException {
247 TransientQueueSubscription ts = (TransientQueueSubscription) subscriptions.get(ack.getConsumerId());
248 if (ts != null) {
249 ActiveMQMessage message = ts.acknowledgeMessage(ack.getMessageID());
250 if (message != null){
251 message.setJMSRedelivered(true);
252 Set set = destinationMap.get(message.getJMSActiveMQDestination());
253 for (Iterator i = set.iterator();i.hasNext();) {
254 TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next();
255 container.enqueueFirst(message);
256 break;
257 }
258 }
259 }
260 }
261
262 /***
263 * @throws JMSException
264 */
265 public void poll() throws JMSException {
266 }
267
268 /***
269 * A hook when the transaction is about to be commited; so apply all outstanding commands to the Journal if using a
270 * Journal (transaction log)
271 *
272 * @param client
273 * @param transactionId
274 * @throws JMSException
275 */
276 public void commitTransaction(BrokerClient client, String transactionId) throws JMSException {
277 }
278
279 /***
280 * A hook when the transaction is about to be rolled back; so discard all outstanding commands that are pending to
281 * be written to the Journal
282 *
283 * @param client
284 * @param transactionId
285 */
286 public void rollbackTransaction(BrokerClient client, String transactionId) {
287 }
288
289 /***
290 * @param physicalName
291 * @return @throws JMSException
292 */
293 public MessageContainer getContainer(String physicalName) throws JMSException {
294 Object key = destinations.get(physicalName);
295 if (key != null) {
296 return (MessageContainer) containers.get(key);
297 }
298 return null;
299 }
300
301 /***
302 * @return a map of destinations
303 */
304 public Map getDestinations() {
305 return Collections.unmodifiableMap(destinations);
306 }
307
308 /***
309 * Create filter for a Consumer
310 *
311 * @param info
312 * @return the Fitler
313 * @throws javax.jms.JMSException
314 */
315 protected Filter createFilter(ConsumerInfo info) throws JMSException {
316 Filter filter = filterFactory.createFilter(info.getDestination(), info.getSelector());
317 if (info.isNoLocal()) {
318 filter = new AndFilter(filter, new NoLocalFilter(info.getClientId()));
319 }
320 return filter;
321 }
322
323 private void doGarbageCollection() {
324 if (doingGarbageCollection.commit(true, false)) {
325 if (queueManager.getCurrentCapacity() <= GARBAGE_COLLECTION_CAPACITY_LIMIT) {
326 for (Iterator i = containers.values().iterator();i.hasNext();) {
327 TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next();
328 container.removeExpiredMessages();
329 }
330 }
331
332 if (queueManager.getCurrentCapacity() <= GARBAGE_COLLECTION_CAPACITY_LIMIT) {
333 for (Iterator i = containers.values().iterator();i.hasNext();) {
334 TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next();
335 if (!container.hasActiveSubscribers()) {
336 container.clear();
337 }
338 }
339 }
340 for (Iterator i = containers.values().iterator();i.hasNext();) {
341 TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next();
342 if (container.isInactive()) {
343 try {
344 container.close();
345 log.info("closed inactive transient queue container: " + container.getDestinationName());
346 }
347 catch (JMSException e) {
348 log.warn("failure closing container", e);
349 }
350 removeContainer(container);
351 }
352 }
353
354 doingGarbageCollection.set(false);
355 }
356 }
357
358 private synchronized void addContainer(TransientQueueBoundedMessageContainer container) {
359 containers.put(container.getDestination(), container);
360 destinationMap.put(container.getDestination(), container);
361 }
362
363 private synchronized void removeContainer(TransientQueueBoundedMessageContainer container) {
364 containers.remove(container.getDestination());
365 destinationMap.remove(container.getDestination(), container);
366 }
367
368
369 protected Destination createDestination(String destinationName) {
370 return null;
371 }
372
373 protected MessageContainer createContainer(String destinationName) throws JMSException {
374 return null;
375 }
376 }