View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.service.boundedvm;
20  
21  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.codehaus.activemq.broker.BrokerClient;
25  import org.codehaus.activemq.filter.Filter;
26  import org.codehaus.activemq.message.ActiveMQDestination;
27  import org.codehaus.activemq.message.ActiveMQMessage;
28  import org.codehaus.activemq.message.ConsumerInfo;
29  import org.codehaus.activemq.message.MessageAck;
30  import org.codehaus.activemq.message.util.MemoryBoundedQueue;
31  import org.codehaus.activemq.message.util.MemoryBoundedQueueManager;
32  import org.codehaus.activemq.service.MessageContainer;
33  import org.codehaus.activemq.service.MessageIdentity;
34  import org.codehaus.activemq.service.QueueListEntry;
35  import org.codehaus.activemq.service.Service;
36  import org.codehaus.activemq.service.impl.DefaultQueueList;
37  
38  import javax.jms.JMSException;
39  import java.util.List;
40  import java.util.ListIterator;
41  
42  /***
43   * A MessageContainer for transient queues
44   *
45   * @version $Revision: 1.6 $
46   */
47  public class TransientQueueBoundedMessageContainer implements MessageContainer, Service, Runnable {
48      private MemoryBoundedQueueManager queueManager;
49      private ActiveMQDestination destination;
50      private SynchronizedBoolean started;
51      private MemoryBoundedQueue queue;
52      private Thread worker;
53      private DefaultQueueList subscriptions;
54      private Log log;
55  
56      /***
57       * Construct this beast
58       *
59       * @param queueManager
60       * @param destination
61       */
62      public TransientQueueBoundedMessageContainer(MemoryBoundedQueueManager queueManager, ActiveMQDestination destination) {
63          this.queueManager = queueManager;
64          this.destination = destination;
65          this.queue = queueManager.getMemoryBoundedQueue("TRANSIENT_QUEUE:-" + destination.getPhysicalName());
66          this.started = new SynchronizedBoolean(false);
67          this.subscriptions = new DefaultQueueList();
68          this.log = LogFactory.getLog("TransientQueueBoundedMessageContainer:- " + destination);
69      }
70  
71      /***
72       * @return true if this Container has no active subscriptions and there are no messages to dispatch
73       */
74      public boolean isInactive() {
75          return subscriptions.isEmpty() && queue.isEmpty();
76      }
77  
78      /***
79       * Add a consumer to dispatch messages to
80       *
81       * @param filter
82       * @param info
83       * @param client
84       * @return
85       * @throws JMSException
86       */
87      public TransientQueueSubscription addConsumer(Filter filter, ConsumerInfo info, BrokerClient client)
88              throws JMSException {
89          TransientQueueSubscription ts = findMatch(info);
90          if (ts == null) {
91              MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(client.toString() + info.getConsumerId());
92              ts = new TransientQueueSubscription(client, queue, filter, info);
93              subscriptions.add(ts);
94          }
95          return ts;
96      }
97  
98      /***
99       * Remove a consumer
100      *
101      * @param info
102      * @throws JMSException
103      */
104     public void removeConsumer(ConsumerInfo info) throws JMSException {
105         TransientQueueSubscription ts = findMatch(info);
106         if (ts != null) {
107             subscriptions.remove(ts);
108             //get unacknowledged messages and re-enqueue them
109             List list = ts.getUndeliveredMessages();
110             for (ListIterator i = list.listIterator(list.size()); i.hasPrevious();) {
111                 ActiveMQMessage message = (ActiveMQMessage) i.previous();
112                 message.setJMSRedelivered(true);
113                 queue.enqueueFirstNoBlock(message);
114             }
115             list.clear();
116             ts.close();
117         }
118     }
119 
120     /***
121      * start working
122      */
123     public void start() {
124         if (started.commit(false, true)) {
125             worker = new Thread(this, "TransientQueueDispatcher");
126             worker.setPriority(Thread.NORM_PRIORITY + 1);
127             worker.start();
128         }
129     }
130 
131     /***
132      * enqueue a message for dispatching
133      *
134      * @param message
135      */
136     public void enqueue(ActiveMQMessage message) {
137         queue.enqueue(message);
138     }
139     
140     /***
141      * enqueue a message for dispatching
142      *
143      * @param message
144      */
145     public void enqueueFirst(ActiveMQMessage message) {
146         queue.enqueueFirstNoBlock(message);
147     }
148     
149     
150 
151     /***
152      * stop working
153      */
154     public void stop() {
155         started.set(false);
156         queue.clear();
157     }
158 
159     /***
160      * close down this container
161      *
162      * @throws JMSException
163      */
164     public void close() throws JMSException {
165         if (started.get()) {
166             stop();
167         }
168         queue.close();
169         QueueListEntry entry = subscriptions.getFirstEntry();
170         while (entry != null) {
171             TransientQueueSubscription ts = (TransientQueueSubscription) entry.getElement();
172             ts.close();
173             entry = subscriptions.getNextEntry(entry);
174         }
175         subscriptions.clear();
176     }
177 
178     /***
179      * do some dispatching
180      */
181     public void run() {
182         boolean dispatched = false;
183         boolean targeted = false;
184         ActiveMQMessage message = null;
185         try {
186             while (started.get()) {
187                 dispatched = false;
188                 targeted = false;
189                 if (!subscriptions.isEmpty()) {
190                     message = (ActiveMQMessage) queue.dequeue(2000);
191                     if (message != null) {
192                         if (!message.isExpired()) {
193                             QueueListEntry entry = subscriptions.getFirstEntry();
194                             while (entry != null) {
195                                 TransientQueueSubscription ts = (TransientQueueSubscription) entry.getElement();
196                                 if (ts.isTarget(message)) {
197                                     targeted = true;
198                                     if (ts.canAcceptMessages()) {
199                                         ts.doDispatch(message);
200                                         message = null;
201                                         dispatched = true;
202                                         subscriptions.rotate();
203                                         break;
204                                     }
205                                 }
206                                 entry = subscriptions.getNextEntry(entry);
207                             }
208                         }
209                         else {
210                             //expire message
211                             if (log.isDebugEnabled()) {
212                                 log.debug("expired message: " + message);
213                             }
214                             message = null;
215                         }
216                     }
217                 }
218                 if (!dispatched) {
219                     if (message != null) {
220                         if (targeted) {
221                             queue.enqueueFirstNoBlock(message);
222                         }
223                         else {
224                             //no matching subscribers - dump to end and hope one shows up ...
225                             queue.enqueueNoBlock(message);
226                         }
227                     }
228                     Thread.sleep(250);
229                 }
230             }
231         }
232         catch (Exception e) {
233             stop();
234             log.warn("stop dispatching", e);
235         }
236     }
237 
238 
239     private TransientQueueSubscription findMatch(ConsumerInfo info) throws JMSException {
240         TransientQueueSubscription result = null;
241         QueueListEntry entry = subscriptions.getFirstEntry();
242         while (entry != null) {
243             TransientQueueSubscription ts = (TransientQueueSubscription) entry.getElement();
244             if (ts.getConsumerInfo().equals(info)) {
245                 result = ts;
246                 break;
247             }
248             entry = subscriptions.getNextEntry(entry);
249         }
250         return result;
251     }
252 
253     /***
254      * @return the destination associated with this container
255      */
256     public ActiveMQDestination getDestination() {
257         return destination;
258     }
259 
260     /***
261      * @return the destination name
262      */
263     public String getDestinationName() {
264         return destination.getPhysicalName();
265     }
266 
267 
268     /***
269      * @param msg
270      * @return
271      * @throws JMSException
272      */
273     public MessageIdentity addMessage(ActiveMQMessage msg) throws JMSException {
274         return null;
275     }
276 
277     /***
278      * @param messageIdentity
279      * @param ack
280      * @throws JMSException
281      */
282     public void delete(MessageIdentity messageIdentity, MessageAck ack) throws JMSException {
283     }
284 
285     /***
286      * @param messageIdentity
287      * @return
288      * @throws JMSException
289      */
290     public ActiveMQMessage getMessage(MessageIdentity messageIdentity) throws JMSException {
291         return null;
292     }
293 
294     /***
295      * @param messageIdentity
296      * @throws JMSException
297      */
298     public void registerMessageInterest(MessageIdentity messageIdentity) throws JMSException {
299     }
300 
301     /***
302      * @param messageIdentity
303      * @param ack
304      * @throws JMSException
305      */
306     public void unregisterMessageInterest(MessageIdentity messageIdentity, MessageAck ack) throws JMSException {
307     }
308 
309     /***
310      * @param messageIdentity
311      * @return
312      * @throws JMSException
313      */
314     public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException {
315         return false;
316     }
317 
318     protected boolean hasActiveSubscribers() {
319         return !subscriptions.isEmpty();
320     }
321 
322     protected void clear() {
323         queue.clear();
324     }
325 
326     protected void removeExpiredMessages() {
327         long currentTime = System.currentTimeMillis();
328         List list = queue.getContents();
329         for (int i = 0; i < list.size(); i++) {
330             ActiveMQMessage msg = (ActiveMQMessage) list.get(i);
331             if (msg.isExpired(currentTime)) {
332                 queue.remove(msg);
333                 if (log.isDebugEnabled()) {
334                     log.debug("expired message: " + msg);
335                 }
336             }
337         }
338     }
339 }