1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.service.boundedvm;
20 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
21 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.codehaus.activemq.broker.BrokerClient;
25 import org.codehaus.activemq.filter.Filter;
26 import org.codehaus.activemq.message.ActiveMQDestination;
27 import org.codehaus.activemq.message.ActiveMQMessage;
28 import org.codehaus.activemq.message.ConsumerInfo;
29 import org.codehaus.activemq.message.MessageAck;
30 import org.codehaus.activemq.message.util.BoundedPacketQueue;
31 import org.codehaus.activemq.service.MessageContainer;
32 import org.codehaus.activemq.service.MessageIdentity;
33 import org.codehaus.activemq.service.Service;
34 import javax.jms.JMSException;
35 import java.util.ArrayList;
36 import java.util.Iterator;
37 import java.util.List;
38
39 /***
40 * A MessageContainer for transient topics One of these exists for every active Connection consuming transient Topic
41 * messages
42 *
43 * @version $Revision: 1.7 $
44 */
45 public class TransientTopicBoundedMessageContainer implements MessageContainer, Service, Runnable {
46 private SynchronizedBoolean started;
47 private BrokerClient client;
48 private BoundedPacketQueue queue;
49 private Thread worker;
50 private CopyOnWriteArrayList subscriptions;
51 private Log log;
52
53 /***
54 * Construct this beast
55 *
56 * @param client
57 * @param queue
58 */
59 public TransientTopicBoundedMessageContainer(BrokerClient client, BoundedPacketQueue queue) {
60 this.client = client;
61 this.queue = queue;
62 this.started = new SynchronizedBoolean(false);
63 this.subscriptions = new CopyOnWriteArrayList();
64 this.log = LogFactory.getLog("TransientTopicBoundedMessageContainer:- " + client);
65 }
66
67 /***
68 * @return true if this Container has no active subscriptions
69 */
70 public boolean isInactive() {
71 return subscriptions.isEmpty();
72 }
73
74 /***
75 * @return the BrokerClient this Container is dispatching to
76 */
77 public BrokerClient getBrokerClient() {
78 return client;
79 }
80
81 /***
82 * Add a consumer to dispatch messages to
83 *
84 * @param filter
85 * @param info
86 */
87 public void addConsumer(Filter filter, ConsumerInfo info) {
88 TransientTopicSubscription ts = findMatch(info);
89 if (ts == null) {
90 ts = new TransientTopicSubscription(filter, info);
91 subscriptions.add(ts);
92 }
93 }
94
95 /***
96 * Remove a consumer
97 *
98 * @param info
99 */
100 public void removeConsumer(ConsumerInfo info) {
101 TransientTopicSubscription ts = findMatch(info);
102 if (ts != null) {
103 subscriptions.remove(ts);
104 }
105 }
106
107 /***
108 * start working
109 */
110 public void start() {
111 if (started.commit(false, true)) {
112 worker = new Thread(this, "TransientTopicDispatcher");
113 worker.setPriority(Thread.NORM_PRIORITY + 1);
114 worker.start();
115 }
116 }
117
118 /***
119 * See if this container should get this message and dispatch it
120 *
121 * @param sender the BrokerClient the message came from
122 * @param message
123 * @return true if it is a valid container
124 * @throws JMSException
125 */
126 public boolean targetAndDispatch(BrokerClient sender, ActiveMQMessage message) throws JMSException {
127 boolean result = false;
128 if (!this.client.isClusteredConnection() || !sender.isClusteredConnection()) {
129 List tmpList = null;
130 for (Iterator i = subscriptions.iterator();i.hasNext();) {
131 TransientTopicSubscription ts = (TransientTopicSubscription) i.next();
132 if (ts.isTarget(message)) {
133 if (tmpList == null) {
134 tmpList = new ArrayList();
135 }
136 tmpList.add(ts);
137 }
138 }
139 dispatchToQueue(message, tmpList);
140 result = tmpList != null;
141 }
142 return result;
143 }
144
145 /***
146 * stop working
147 */
148 public void stop() {
149 started.set(false);
150 queue.clear();
151 }
152
153 /***
154 * close down this container
155 */
156 public void close() {
157 if (started.get()) {
158 stop();
159 }
160 queue.close();
161 }
162
163 /***
164 * do some dispatching
165 */
166 public void run() {
167 int count = 0;
168 ActiveMQMessage message = null;
169 while (started.get()) {
170 try {
171 message = (ActiveMQMessage) queue.dequeue(2000);
172 if (message != null && !message.isExpired()) {
173 client.dispatch(message);
174 if (++count == 250) {
175 count = 0;
176 Thread.yield();
177 }
178 }
179 }
180 catch (Exception e) {
181 stop();
182 log.warn("stop dispatching", e);
183 }
184 }
185 }
186
187 private void dispatchToQueue(ActiveMQMessage message, List list) throws JMSException {
188 if (list != null && !list.isEmpty()) {
189 int[] ids = new int[list.size()];
190 for (int i = 0;i < list.size();i++) {
191 TransientTopicSubscription ts = (TransientTopicSubscription) list.get(i);
192 ids[i] = ts.getConsumerInfo().getConsumerNo();
193 }
194 message = message.shallowCopy();
195 message.setConsumerNos(ids);
196 try {
197 queue.enqueue(message);
198 }
199 catch (InterruptedException e) {
200 log.warn("queue interuppted, closing", e);
201 close();
202 }
203 }
204 }
205
206 private TransientTopicSubscription findMatch(ConsumerInfo info) {
207 TransientTopicSubscription result = null;
208 for (Iterator i = subscriptions.iterator();i.hasNext();) {
209 TransientTopicSubscription ts = (TransientTopicSubscription) i.next();
210 if (ts.getConsumerInfo().equals(info)) {
211 result = ts;
212 break;
213 }
214 }
215 return result;
216 }
217
218 /***
219 * @param destination
220 * @return true if a
221 */
222 public boolean hasConsumerFor(ActiveMQDestination destination) {
223 for (Iterator i = subscriptions.iterator();i.hasNext();) {
224 TransientTopicSubscription ts = (TransientTopicSubscription) i.next();
225 ConsumerInfo info = ts.getConsumerInfo();
226 if (info.getDestination().matches(destination)) {
227 return true;
228 }
229 }
230 return false;
231 }
232
233 /***
234 * @return the destination name
235 */
236 public String getDestinationName() {
237 return "";
238 }
239
240 /***
241 * @param msg
242 * @return @throws JMSException
243 */
244 public MessageIdentity addMessage(ActiveMQMessage msg) throws JMSException {
245 return null;
246 }
247
248 /***
249 * @param messageIdentity
250 * @param ack
251 * @throws JMSException
252 */
253 public void delete(MessageIdentity messageIdentity, MessageAck ack) throws JMSException {
254 }
255
256 /***
257 * @param messageIdentity
258 * @return @throws JMSException
259 */
260 public ActiveMQMessage getMessage(MessageIdentity messageIdentity) throws JMSException {
261 return null;
262 }
263
264 /***
265 * @param messageIdentity
266 * @throws JMSException
267 */
268 public void registerMessageInterest(MessageIdentity messageIdentity) throws JMSException {
269 }
270
271 /***
272 * @param messageIdentity
273 * @param ack
274 * @throws JMSException
275 */
276 public void unregisterMessageInterest(MessageIdentity messageIdentity, MessageAck ack) throws JMSException {
277 }
278
279 /***
280 * @param messageIdentity
281 * @return @throws JMSException
282 */
283 public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException {
284 return false;
285 }
286 }