1 /*** 2 * 3 * Copyright 2004 Protique Ltd 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 **/ 18 19 package org.codehaus.activemq.service.boundedvm; 20 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList; 21 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 22 import org.apache.commons.logging.Log; 23 import org.apache.commons.logging.LogFactory; 24 import org.codehaus.activemq.broker.BrokerClient; 25 import org.codehaus.activemq.filter.Filter; 26 import org.codehaus.activemq.message.ActiveMQDestination; 27 import org.codehaus.activemq.message.ActiveMQMessage; 28 import org.codehaus.activemq.message.ConsumerInfo; 29 import org.codehaus.activemq.message.MessageAck; 30 import org.codehaus.activemq.message.util.BoundedPacketQueue; 31 import org.codehaus.activemq.service.MessageContainer; 32 import org.codehaus.activemq.service.MessageIdentity; 33 import org.codehaus.activemq.service.Service; 34 import javax.jms.JMSException; 35 import java.util.ArrayList; 36 import java.util.Iterator; 37 import java.util.List; 38 39 /*** 40 * A MessageContainer for transient topics One of these exists for every active Connection consuming transient Topic 41 * messages 42 * 43 * @version $Revision: 1.7 $ 44 */ 45 public class TransientTopicBoundedMessageContainer implements MessageContainer, Service, Runnable { 46 private SynchronizedBoolean started; 47 private BrokerClient client; 48 private BoundedPacketQueue queue; 49 private Thread worker; 50 private CopyOnWriteArrayList subscriptions; 51 private Log log; 52 53 /*** 54 * Construct this beast 55 * 56 * @param client 57 * @param queue 58 */ 59 public TransientTopicBoundedMessageContainer(BrokerClient client, BoundedPacketQueue queue) { 60 this.client = client; 61 this.queue = queue; 62 this.started = new SynchronizedBoolean(false); 63 this.subscriptions = new CopyOnWriteArrayList(); 64 this.log = LogFactory.getLog("TransientTopicBoundedMessageContainer:- " + client); 65 } 66 67 /*** 68 * @return true if this Container has no active subscriptions 69 */ 70 public boolean isInactive() { 71 return subscriptions.isEmpty(); 72 } 73 74 /*** 75 * @return the BrokerClient this Container is dispatching to 76 */ 77 public BrokerClient getBrokerClient() { 78 return client; 79 } 80 81 /*** 82 * Add a consumer to dispatch messages to 83 * 84 * @param filter 85 * @param info 86 */ 87 public void addConsumer(Filter filter, ConsumerInfo info) { 88 TransientTopicSubscription ts = findMatch(info); 89 if (ts == null) { 90 ts = new TransientTopicSubscription(filter, info); 91 subscriptions.add(ts); 92 } 93 } 94 95 /*** 96 * Remove a consumer 97 * 98 * @param info 99 */ 100 public void removeConsumer(ConsumerInfo info) { 101 TransientTopicSubscription ts = findMatch(info); 102 if (ts != null) { 103 subscriptions.remove(ts); 104 } 105 } 106 107 /*** 108 * start working 109 */ 110 public void start() { 111 if (started.commit(false, true)) { 112 worker = new Thread(this, "TransientTopicDispatcher"); 113 worker.setPriority(Thread.NORM_PRIORITY + 1); 114 worker.start(); 115 } 116 } 117 118 /*** 119 * See if this container should get this message and dispatch it 120 * 121 * @param sender the BrokerClient the message came from 122 * @param message 123 * @return true if it is a valid container 124 * @throws JMSException 125 */ 126 public boolean targetAndDispatch(BrokerClient sender, ActiveMQMessage message) throws JMSException { 127 boolean result = false; 128 if (!this.client.isClusteredConnection() || !sender.isClusteredConnection()) { 129 List tmpList = null; 130 for (Iterator i = subscriptions.iterator();i.hasNext();) { 131 TransientTopicSubscription ts = (TransientTopicSubscription) i.next(); 132 if (ts.isTarget(message)) { 133 if (tmpList == null) { 134 tmpList = new ArrayList(); 135 } 136 tmpList.add(ts); 137 } 138 } 139 dispatchToQueue(message, tmpList); 140 result = tmpList != null; 141 } 142 return result; 143 } 144 145 /*** 146 * stop working 147 */ 148 public void stop() { 149 started.set(false); 150 queue.clear(); 151 } 152 153 /*** 154 * close down this container 155 */ 156 public void close() { 157 if (started.get()) { 158 stop(); 159 } 160 queue.close(); 161 } 162 163 /*** 164 * do some dispatching 165 */ 166 public void run() { 167 int count = 0; 168 ActiveMQMessage message = null; 169 while (started.get()) { 170 try { 171 message = (ActiveMQMessage) queue.dequeue(2000); 172 if (message != null && !message.isExpired()) { 173 client.dispatch(message); 174 if (++count == 250) { 175 count = 0; 176 Thread.yield(); 177 } 178 } 179 } 180 catch (Exception e) { 181 stop(); 182 log.warn("stop dispatching", e); 183 } 184 } 185 } 186 187 private void dispatchToQueue(ActiveMQMessage message, List list) throws JMSException { 188 if (list != null && !list.isEmpty()) { 189 int[] ids = new int[list.size()]; 190 for (int i = 0;i < list.size();i++) { 191 TransientTopicSubscription ts = (TransientTopicSubscription) list.get(i); 192 ids[i] = ts.getConsumerInfo().getConsumerNo(); 193 } 194 message = message.shallowCopy(); 195 message.setConsumerNos(ids); 196 try { 197 queue.enqueue(message); 198 } 199 catch (InterruptedException e) { 200 log.warn("queue interuppted, closing", e); 201 close(); 202 } 203 } 204 } 205 206 private TransientTopicSubscription findMatch(ConsumerInfo info) { 207 TransientTopicSubscription result = null; 208 for (Iterator i = subscriptions.iterator();i.hasNext();) { 209 TransientTopicSubscription ts = (TransientTopicSubscription) i.next(); 210 if (ts.getConsumerInfo().equals(info)) { 211 result = ts; 212 break; 213 } 214 } 215 return result; 216 } 217 218 /*** 219 * @param destination 220 * @return true if a 221 */ 222 public boolean hasConsumerFor(ActiveMQDestination destination) { 223 for (Iterator i = subscriptions.iterator();i.hasNext();) { 224 TransientTopicSubscription ts = (TransientTopicSubscription) i.next(); 225 ConsumerInfo info = ts.getConsumerInfo(); 226 if (info.getDestination().matches(destination)) { 227 return true; 228 } 229 } 230 return false; 231 } 232 233 /*** 234 * @return the destination name 235 */ 236 public String getDestinationName() { 237 return ""; 238 } 239 240 /*** 241 * @param msg 242 * @return @throws JMSException 243 */ 244 public MessageIdentity addMessage(ActiveMQMessage msg) throws JMSException { 245 return null; 246 } 247 248 /*** 249 * @param messageIdentity 250 * @param ack 251 * @throws JMSException 252 */ 253 public void delete(MessageIdentity messageIdentity, MessageAck ack) throws JMSException { 254 } 255 256 /*** 257 * @param messageIdentity 258 * @return @throws JMSException 259 */ 260 public ActiveMQMessage getMessage(MessageIdentity messageIdentity) throws JMSException { 261 return null; 262 } 263 264 /*** 265 * @param messageIdentity 266 * @throws JMSException 267 */ 268 public void registerMessageInterest(MessageIdentity messageIdentity) throws JMSException { 269 } 270 271 /*** 272 * @param messageIdentity 273 * @param ack 274 * @throws JMSException 275 */ 276 public void unregisterMessageInterest(MessageIdentity messageIdentity, MessageAck ack) throws JMSException { 277 } 278 279 /*** 280 * @param messageIdentity 281 * @return @throws JMSException 282 */ 283 public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException { 284 return false; 285 } 286 }