1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.service.boundedvm;
20
21 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.codehaus.activemq.broker.BrokerClient;
25 import org.codehaus.activemq.filter.Filter;
26 import org.codehaus.activemq.message.ActiveMQDestination;
27 import org.codehaus.activemq.message.ActiveMQMessage;
28 import org.codehaus.activemq.message.ConsumerInfo;
29 import org.codehaus.activemq.message.MessageAck;
30 import org.codehaus.activemq.message.util.MemoryBoundedQueue;
31 import org.codehaus.activemq.message.util.MemoryBoundedQueueManager;
32 import org.codehaus.activemq.service.MessageContainer;
33 import org.codehaus.activemq.service.MessageIdentity;
34 import org.codehaus.activemq.service.QueueListEntry;
35 import org.codehaus.activemq.service.Service;
36 import org.codehaus.activemq.service.impl.DefaultQueueList;
37
38 import javax.jms.JMSException;
39 import java.util.List;
40 import java.util.ListIterator;
41
42 /***
43 * A MessageContainer for transient queues
44 *
45 * @version $Revision: 1.6 $
46 */
47 public class TransientQueueBoundedMessageContainer implements MessageContainer, Service, Runnable {
48 private MemoryBoundedQueueManager queueManager;
49 private ActiveMQDestination destination;
50 private SynchronizedBoolean started;
51 private MemoryBoundedQueue queue;
52 private Thread worker;
53 private DefaultQueueList subscriptions;
54 private Log log;
55
56 /***
57 * Construct this beast
58 *
59 * @param queueManager
60 * @param destination
61 */
62 public TransientQueueBoundedMessageContainer(MemoryBoundedQueueManager queueManager, ActiveMQDestination destination) {
63 this.queueManager = queueManager;
64 this.destination = destination;
65 this.queue = queueManager.getMemoryBoundedQueue("TRANSIENT_QUEUE:-" + destination.getPhysicalName());
66 this.started = new SynchronizedBoolean(false);
67 this.subscriptions = new DefaultQueueList();
68 this.log = LogFactory.getLog("TransientQueueBoundedMessageContainer:- " + destination);
69 }
70
71 /***
72 * @return true if this Container has no active subscriptions and there are no messages to dispatch
73 */
74 public boolean isInactive() {
75 return subscriptions.isEmpty() && queue.isEmpty();
76 }
77
78 /***
79 * Add a consumer to dispatch messages to
80 *
81 * @param filter
82 * @param info
83 * @param client
84 * @return
85 * @throws JMSException
86 */
87 public TransientQueueSubscription addConsumer(Filter filter, ConsumerInfo info, BrokerClient client)
88 throws JMSException {
89 TransientQueueSubscription ts = findMatch(info);
90 if (ts == null) {
91 MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(client.toString() + info.getConsumerId());
92 ts = new TransientQueueSubscription(client, queue, filter, info);
93 subscriptions.add(ts);
94 }
95 return ts;
96 }
97
98 /***
99 * Remove a consumer
100 *
101 * @param info
102 * @throws JMSException
103 */
104 public void removeConsumer(ConsumerInfo info) throws JMSException {
105 TransientQueueSubscription ts = findMatch(info);
106 if (ts != null) {
107 subscriptions.remove(ts);
108
109 List list = ts.getUndeliveredMessages();
110 for (ListIterator i = list.listIterator(list.size()); i.hasPrevious();) {
111 ActiveMQMessage message = (ActiveMQMessage) i.previous();
112 message.setJMSRedelivered(true);
113 queue.enqueueFirstNoBlock(message);
114 }
115 list.clear();
116 ts.close();
117 }
118 }
119
120 /***
121 * start working
122 */
123 public void start() {
124 if (started.commit(false, true)) {
125 worker = new Thread(this, "TransientQueueDispatcher");
126 worker.setPriority(Thread.NORM_PRIORITY + 1);
127 worker.start();
128 }
129 }
130
131 /***
132 * enqueue a message for dispatching
133 *
134 * @param message
135 */
136 public void enqueue(ActiveMQMessage message) {
137 queue.enqueue(message);
138 }
139
140 /***
141 * enqueue a message for dispatching
142 *
143 * @param message
144 */
145 public void enqueueFirst(ActiveMQMessage message) {
146 queue.enqueueFirstNoBlock(message);
147 }
148
149
150
151 /***
152 * stop working
153 */
154 public void stop() {
155 started.set(false);
156 queue.clear();
157 }
158
159 /***
160 * close down this container
161 *
162 * @throws JMSException
163 */
164 public void close() throws JMSException {
165 if (started.get()) {
166 stop();
167 }
168 queue.close();
169 QueueListEntry entry = subscriptions.getFirstEntry();
170 while (entry != null) {
171 TransientQueueSubscription ts = (TransientQueueSubscription) entry.getElement();
172 ts.close();
173 entry = subscriptions.getNextEntry(entry);
174 }
175 subscriptions.clear();
176 }
177
178 /***
179 * do some dispatching
180 */
181 public void run() {
182 boolean dispatched = false;
183 boolean targeted = false;
184 ActiveMQMessage message = null;
185 try {
186 while (started.get()) {
187 dispatched = false;
188 targeted = false;
189 if (!subscriptions.isEmpty()) {
190 message = (ActiveMQMessage) queue.dequeue(2000);
191 if (message != null) {
192 if (!message.isExpired()) {
193 QueueListEntry entry = subscriptions.getFirstEntry();
194 while (entry != null) {
195 TransientQueueSubscription ts = (TransientQueueSubscription) entry.getElement();
196 if (ts.isTarget(message)) {
197 targeted = true;
198 if (ts.canAcceptMessages()) {
199 ts.doDispatch(message);
200 message = null;
201 dispatched = true;
202 subscriptions.rotate();
203 break;
204 }
205 }
206 entry = subscriptions.getNextEntry(entry);
207 }
208 }
209 else {
210
211 if (log.isDebugEnabled()) {
212 log.debug("expired message: " + message);
213 }
214 message = null;
215 }
216 }
217 }
218 if (!dispatched) {
219 if (message != null) {
220 if (targeted) {
221 queue.enqueueFirstNoBlock(message);
222 }
223 else {
224
225 queue.enqueueNoBlock(message);
226 }
227 }
228 Thread.sleep(250);
229 }
230 }
231 }
232 catch (Exception e) {
233 stop();
234 log.warn("stop dispatching", e);
235 }
236 }
237
238
239 private TransientQueueSubscription findMatch(ConsumerInfo info) throws JMSException {
240 TransientQueueSubscription result = null;
241 QueueListEntry entry = subscriptions.getFirstEntry();
242 while (entry != null) {
243 TransientQueueSubscription ts = (TransientQueueSubscription) entry.getElement();
244 if (ts.getConsumerInfo().equals(info)) {
245 result = ts;
246 break;
247 }
248 entry = subscriptions.getNextEntry(entry);
249 }
250 return result;
251 }
252
253 /***
254 * @return the destination associated with this container
255 */
256 public ActiveMQDestination getDestination() {
257 return destination;
258 }
259
260 /***
261 * @return the destination name
262 */
263 public String getDestinationName() {
264 return destination.getPhysicalName();
265 }
266
267
268 /***
269 * @param msg
270 * @return
271 * @throws JMSException
272 */
273 public MessageIdentity addMessage(ActiveMQMessage msg) throws JMSException {
274 return null;
275 }
276
277 /***
278 * @param messageIdentity
279 * @param ack
280 * @throws JMSException
281 */
282 public void delete(MessageIdentity messageIdentity, MessageAck ack) throws JMSException {
283 }
284
285 /***
286 * @param messageIdentity
287 * @return
288 * @throws JMSException
289 */
290 public ActiveMQMessage getMessage(MessageIdentity messageIdentity) throws JMSException {
291 return null;
292 }
293
294 /***
295 * @param messageIdentity
296 * @throws JMSException
297 */
298 public void registerMessageInterest(MessageIdentity messageIdentity) throws JMSException {
299 }
300
301 /***
302 * @param messageIdentity
303 * @param ack
304 * @throws JMSException
305 */
306 public void unregisterMessageInterest(MessageIdentity messageIdentity, MessageAck ack) throws JMSException {
307 }
308
309 /***
310 * @param messageIdentity
311 * @return
312 * @throws JMSException
313 */
314 public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException {
315 return false;
316 }
317
318 protected boolean hasActiveSubscribers() {
319 return !subscriptions.isEmpty();
320 }
321
322 protected void clear() {
323 queue.clear();
324 }
325
326 protected void removeExpiredMessages() {
327 long currentTime = System.currentTimeMillis();
328 List list = queue.getContents();
329 for (int i = 0; i < list.size(); i++) {
330 ActiveMQMessage msg = (ActiveMQMessage) list.get(i);
331 if (msg.isExpired(currentTime)) {
332 queue.remove(msg);
333 if (log.isDebugEnabled()) {
334 log.debug("expired message: " + msg);
335 }
336 }
337 }
338 }
339 }