View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.service.boundedvm;
20  import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
21  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.codehaus.activemq.broker.BrokerClient;
25  import org.codehaus.activemq.filter.Filter;
26  import org.codehaus.activemq.message.ActiveMQDestination;
27  import org.codehaus.activemq.message.ActiveMQMessage;
28  import org.codehaus.activemq.message.ConsumerInfo;
29  import org.codehaus.activemq.message.MessageAck;
30  import org.codehaus.activemq.message.util.BoundedPacketQueue;
31  import org.codehaus.activemq.service.MessageContainer;
32  import org.codehaus.activemq.service.MessageIdentity;
33  import org.codehaus.activemq.service.Service;
34  import javax.jms.JMSException;
35  import java.util.ArrayList;
36  import java.util.Iterator;
37  import java.util.List;
38  
39  /***
40   * A MessageContainer for transient topics One of these exists for every active Connection consuming transient Topic
41   * messages
42   * 
43   * @version $Revision: 1.7 $
44   */
45  public class TransientTopicBoundedMessageContainer implements MessageContainer, Service, Runnable {
46      private SynchronizedBoolean started;
47      private BrokerClient client;
48      private BoundedPacketQueue queue;
49      private Thread worker;
50      private CopyOnWriteArrayList subscriptions;
51      private Log log;
52  
53      /***
54       * Construct this beast
55       * 
56       * @param client
57       * @param queue
58       */
59      public TransientTopicBoundedMessageContainer(BrokerClient client, BoundedPacketQueue queue) {
60          this.client = client;
61          this.queue = queue;
62          this.started = new SynchronizedBoolean(false);
63          this.subscriptions = new CopyOnWriteArrayList();
64          this.log = LogFactory.getLog("TransientTopicBoundedMessageContainer:- " + client);
65      }
66  
67      /***
68       * @return true if this Container has no active subscriptions
69       */
70      public boolean isInactive() {
71          return subscriptions.isEmpty();
72      }
73  
74      /***
75       * @return the BrokerClient this Container is dispatching to
76       */
77      public BrokerClient getBrokerClient() {
78          return client;
79      }
80  
81      /***
82       * Add a consumer to dispatch messages to
83       * 
84       * @param filter
85       * @param info
86       */
87      public void addConsumer(Filter filter, ConsumerInfo info) {
88          TransientTopicSubscription ts = findMatch(info);
89          if (ts == null) {
90              ts = new TransientTopicSubscription(filter, info);
91              subscriptions.add(ts);
92          }
93      }
94  
95      /***
96       * Remove a consumer
97       * 
98       * @param info
99       */
100     public void removeConsumer(ConsumerInfo info) {
101         TransientTopicSubscription ts = findMatch(info);
102         if (ts != null) {
103             subscriptions.remove(ts);
104         }
105     }
106 
107     /***
108      * start working
109      */
110     public void start() {
111         if (started.commit(false, true)) {
112             worker = new Thread(this, "TransientTopicDispatcher");
113             worker.setPriority(Thread.NORM_PRIORITY + 1);
114             worker.start();
115         }
116     }
117 
118     /***
119      * See if this container should get this message and dispatch it
120      * 
121      * @param sender the BrokerClient the message came from
122      * @param message
123      * @return true if it is a valid container
124      * @throws JMSException
125      */
126     public boolean targetAndDispatch(BrokerClient sender, ActiveMQMessage message) throws JMSException {
127         boolean result = false;
128         if (!this.client.isClusteredConnection() || !sender.isClusteredConnection()) {
129             List tmpList = null;
130             for (Iterator i = subscriptions.iterator();i.hasNext();) {
131                 TransientTopicSubscription ts = (TransientTopicSubscription) i.next();
132                 if (ts.isTarget(message)) {
133                     if (tmpList == null) {
134                         tmpList = new ArrayList();
135                     }
136                     tmpList.add(ts);
137                 }
138             }
139             dispatchToQueue(message, tmpList);
140             result = tmpList != null;
141         }
142         return result;
143     }
144 
145     /***
146      * stop working
147      */
148     public void stop() {
149         started.set(false);
150         queue.clear();
151     }
152 
153     /***
154      * close down this container
155      */
156     public void close() {
157         if (started.get()) {
158             stop();
159         }
160         queue.close();
161     }
162 
163     /***
164      * do some dispatching
165      */
166     public void run() {
167         int count = 0;
168         ActiveMQMessage message = null;
169         while (started.get()) {
170             try {
171                 message = (ActiveMQMessage) queue.dequeue(2000);
172                 if (message != null && !message.isExpired()) {
173                     client.dispatch(message);
174                     if (++count == 250) {
175                         count = 0;
176                         Thread.yield();
177                     }
178                 }
179             }
180             catch (Exception e) {
181                 stop();
182                 log.warn("stop dispatching", e);
183             }
184         }
185     }
186 
187     private void dispatchToQueue(ActiveMQMessage message, List list) throws JMSException {
188         if (list != null && !list.isEmpty()) {
189             int[] ids = new int[list.size()];
190             for (int i = 0;i < list.size();i++) {
191                 TransientTopicSubscription ts = (TransientTopicSubscription) list.get(i);
192                 ids[i] = ts.getConsumerInfo().getConsumerNo();
193             }
194             message = message.shallowCopy();
195             message.setConsumerNos(ids);
196             try {
197                 queue.enqueue(message);
198             }
199             catch (InterruptedException e) {
200                 log.warn("queue interuppted, closing", e);
201                 close();
202             }
203         }
204     }
205 
206     private TransientTopicSubscription findMatch(ConsumerInfo info) {
207         TransientTopicSubscription result = null;
208         for (Iterator i = subscriptions.iterator();i.hasNext();) {
209             TransientTopicSubscription ts = (TransientTopicSubscription) i.next();
210             if (ts.getConsumerInfo().equals(info)) {
211                 result = ts;
212                 break;
213             }
214         }
215         return result;
216     }
217 
218     /***
219      * @param destination
220      * @return true if a
221      */
222     public boolean hasConsumerFor(ActiveMQDestination destination) {
223         for (Iterator i = subscriptions.iterator();i.hasNext();) {
224             TransientTopicSubscription ts = (TransientTopicSubscription) i.next();
225             ConsumerInfo info = ts.getConsumerInfo();
226             if (info.getDestination().matches(destination)) {
227                 return true;
228             }
229         }
230         return false;
231     }
232 
233     /***
234      * @return the destination name
235      */
236     public String getDestinationName() {
237         return "";
238     }
239 
240     /***
241      * @param msg
242      * @return @throws JMSException
243      */
244     public MessageIdentity addMessage(ActiveMQMessage msg) throws JMSException {
245         return null;
246     }
247 
248     /***
249      * @param messageIdentity
250      * @param ack
251      * @throws JMSException
252      */
253     public void delete(MessageIdentity messageIdentity, MessageAck ack) throws JMSException {
254     }
255 
256     /***
257      * @param messageIdentity
258      * @return @throws JMSException
259      */
260     public ActiveMQMessage getMessage(MessageIdentity messageIdentity) throws JMSException {
261         return null;
262     }
263 
264     /***
265      * @param messageIdentity
266      * @throws JMSException
267      */
268     public void registerMessageInterest(MessageIdentity messageIdentity) throws JMSException {
269     }
270 
271     /***
272      * @param messageIdentity
273      * @param ack
274      * @throws JMSException
275      */
276     public void unregisterMessageInterest(MessageIdentity messageIdentity, MessageAck ack) throws JMSException {
277     }
278 
279     /***
280      * @param messageIdentity
281      * @return @throws JMSException
282      */
283     public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException {
284         return false;
285     }
286 }