1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.service.boundedvm;
20 import java.util.ArrayList;
21 import java.util.Iterator;
22 import java.util.List;
23 import javax.jms.JMSException;
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.codehaus.activemq.broker.BrokerClient;
27 import org.codehaus.activemq.filter.Filter;
28 import org.codehaus.activemq.message.ActiveMQMessage;
29 import org.codehaus.activemq.message.ConsumerInfo;
30 import org.codehaus.activemq.message.util.BoundedPacketQueue;
31 import org.codehaus.activemq.service.Service;
32 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
33 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
34
35 /***
36 * A MessageContainer for transient topics One of these exists for every active Connection consuming transient Topic
37 * messages
38 *
39 * @version $Revision: 1.2 $
40 */
41 public class TransientTopicBoundedMessageContainer implements Service, Runnable {
42 private SynchronizedBoolean started;
43 private BrokerClient client;
44 private BoundedPacketQueue queue;
45 private Thread worker;
46 private CopyOnWriteArrayList subscriptions;
47 private Log log;
48
49 /***
50 * Construct this beast
51 *
52 * @param client
53 * @param queue
54 */
55 public TransientTopicBoundedMessageContainer(BrokerClient client, BoundedPacketQueue queue) {
56 this.client = client;
57 this.queue = queue;
58 this.started = new SynchronizedBoolean(false);
59 this.subscriptions = new CopyOnWriteArrayList();
60 this.log = LogFactory.getLog("TransientTopicBoundedMessageContainer for client: " + client);
61 }
62
63 /***
64 * @return true if this Container has no active subscriptions
65 */
66 public boolean isInactive() {
67 return subscriptions.isEmpty();
68 }
69
70 /***
71 * Add a consumer to dispatch messages to
72 *
73 * @param filter
74 * @param info
75 */
76 public void addConsumer(Filter filter, ConsumerInfo info) {
77 TransientTopicSubscription ts = findMatch(info);
78 if (ts == null) {
79 ts = new TransientTopicSubscription(filter, info);
80 subscriptions.add(ts);
81 }
82 }
83
84 /***
85 * Remove a consumer
86 *
87 * @param info
88 */
89 public void removeConsumer(ConsumerInfo info) {
90 TransientTopicSubscription ts = findMatch(info);
91 if (ts != null) {
92 subscriptions.remove(ts);
93 }
94 }
95
96 /***
97 * start working
98 */
99 public void start() {
100 if (started.commit(false, true)) {
101 worker = new Thread(this);
102 worker.setPriority(Thread.NORM_PRIORITY + 1);
103 worker.start();
104 }
105 }
106
107 /***
108 * See if this container should get this message and dispatch it
109 *
110 * @param message
111 * @return true if it is a valid container
112 * @throws JMSException
113 */
114 public boolean targetAndDispatch(ActiveMQMessage message) throws JMSException {
115 boolean result = false;
116 List tmpList = null;
117 for (Iterator i = subscriptions.iterator();i.hasNext();) {
118 TransientTopicSubscription ts = (TransientTopicSubscription) i.next();
119 if (ts.isTarget(message)) {
120 if (tmpList == null) {
121 tmpList = new ArrayList();
122 }
123 tmpList.add(ts);
124 }
125 }
126 dispatchToQueue(message, tmpList);
127 return tmpList != null;
128 }
129
130 /***
131 * stop working
132 */
133 public void stop() {
134 started.set(false);
135 queue.clear();
136 }
137
138 /***
139 * close down this container
140 */
141 public void close() {
142 if(started.get()) {
143 stop();
144 }
145 queue.close();
146 }
147
148 /***
149 * do some dispatching
150 */
151 public void run() {
152 int count = 0;
153 while (started.get()) {
154 try {
155 ActiveMQMessage message = (ActiveMQMessage) queue.dequeue(2000);
156 if (message != null) {
157 client.dispatch(message);
158 if (++count == 250) {
159 count = 0;
160 Thread.yield();
161 }
162 }
163 }
164 catch (Exception e) {
165 stop();
166 log.warn("stop dispatching", e);
167 }
168 }
169 }
170
171 private void dispatchToQueue(ActiveMQMessage message, List list) throws JMSException {
172 if (list != null) {
173 int[] ids = new int[list.size()];
174 for (int i = 0;i < list.size();i++) {
175 TransientTopicSubscription ts = (TransientTopicSubscription) list.get(i);
176 ids[i] = ts.getConsumerInfo().getConsumerNo();
177 }
178 message.setConsumerNos(ids);
179 try {
180 queue.enqueue(message);
181 }
182 catch (InterruptedException e) {
183 log.warn("queue interuppted, closing", e);
184 close();
185 }
186 }
187 }
188
189 private TransientTopicSubscription findMatch(ConsumerInfo info) {
190 TransientTopicSubscription result = null;
191 for (Iterator i = subscriptions.iterator();i.hasNext();) {
192 TransientTopicSubscription ts = (TransientTopicSubscription) i.next();
193 if (ts.getConsumerInfo().equals(info)) {
194 result = ts;
195 break;
196 }
197 }
198 return result;
199 }
200 }