View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.service.boundedvm;
20  
21  import java.util.Collections;
22  import java.util.Iterator;
23  import java.util.Map;
24  import java.util.Set;
25  import javax.jms.JMSException;
26  import org.codehaus.activemq.broker.BrokerClient;
27  import org.codehaus.activemq.filter.AndFilter;
28  import org.codehaus.activemq.filter.DestinationMap;
29  import org.codehaus.activemq.filter.Filter;
30  import org.codehaus.activemq.filter.FilterFactory;
31  import org.codehaus.activemq.filter.FilterFactoryImpl;
32  import org.codehaus.activemq.filter.NoLocalFilter;
33  import org.codehaus.activemq.message.ActiveMQDestination;
34  import org.codehaus.activemq.message.ActiveMQMessage;
35  import org.codehaus.activemq.message.ConsumerInfo;
36  import org.codehaus.activemq.message.MessageAck;
37  import org.codehaus.activemq.message.util.MemoryBoundedQueue;
38  import org.codehaus.activemq.message.util.MemoryBoundedQueueManager;
39  import org.codehaus.activemq.service.MessageContainer;
40  import org.codehaus.activemq.service.MessageContainerManager;
41  import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
42  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
43  
44  /***
45   * A MessageContainerManager for transient topics
46   * 
47   * @version $Revision: 1.11 $
48   */
49  
50  /***
51   * A manager of MessageContainer instances
52   */
53  public class TransientTopicBoundedMessageManager implements MessageContainerManager {
54      private MemoryBoundedQueueManager queueManager;
55      private ConcurrentHashMap containers;
56      private DestinationMap destinationMap;
57      private FilterFactory filterFactory;
58      private SynchronizedBoolean started;
59      private Map destinations;
60  
61      /***
62       * Constructor for TransientTopicBoundedMessageManager
63       *
64       * @param mgr
65       */
66      public TransientTopicBoundedMessageManager(MemoryBoundedQueueManager mgr) {
67          this.queueManager = mgr;
68          this.containers = new ConcurrentHashMap();
69          this.destinationMap = new DestinationMap();
70          this.destinations = new ConcurrentHashMap();
71          this.filterFactory = new FilterFactoryImpl();
72          this.started = new SynchronizedBoolean(false);
73      }
74  
75      /***
76       * start the manager
77       *
78       * @throws JMSException
79       */
80      public void start() throws JMSException {
81          if (started.commit(false, true)) {
82              for (Iterator i = containers.values().iterator(); i.hasNext();) {
83                  TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) i.next();
84                  container.start();
85              }
86          }
87      }
88  
89      /***
90       * stop the manager
91       *
92       * @throws JMSException
93       */
94      public void stop() throws JMSException {
95          if (started.commit(true, false)) {
96              for (Iterator i = containers.values().iterator(); i.hasNext();) {
97                  TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) i.next();
98                  container.stop();
99              }
100         }
101     }
102 
103     /***
104      * Add a consumer if appropiate
105      *
106      * @param client
107      * @param info
108      * @throws JMSException
109      */
110     public synchronized void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
111         ActiveMQDestination destination = info.getDestination();
112         if (destination.isTopic()) {
113             TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) containers
114                     .get(client);
115             if (container == null) {
116                 MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(client.toString());
117                 container = new TransientTopicBoundedMessageContainer(client, queue);
118                 containers.put(client, container);
119                 if (started.get()) {
120                     container.start();
121                 }
122             }
123             container.addConsumer(createFilter(info), info);
124             destinationMap.put(destination,container);
125             String name = destination.getPhysicalName();
126             if (!destinations.containsKey(name)) {
127                 destinations.put(name, destination);
128             }
129         }
130     }
131 
132     /***
133      * @param client
134      * @param info
135      * @throws JMSException
136      */
137     public synchronized void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
138         ActiveMQDestination destination = info.getDestination();
139         if (destination.isTopic()) {
140             TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) containers
141                     .get(client);
142             if (container != null) {
143                 container.removeConsumer(info);
144                 if (container.isInactive()) {
145                     containers.remove(client);
146                     container.close();
147                     destinationMap.remove(destination, container);
148                 }
149 
150                 // lets check if we've no more consumers for this destination
151                 if (!hasConsumerFor(destination)) {
152                     destinations.remove(destination.getPhysicalName());
153                 }
154             }
155         }
156     }
157 
158     /***
159      * Delete a durable subscriber
160      *
161      * @param clientId
162      * @param subscriberName
163      * @throws JMSException if the subscriber doesn't exist or is still active
164      */
165     public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
166     }
167 
168     /***
169      * @param client
170      * @param message
171      * @throws JMSException
172      */
173     public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
174         if (message != null && message.getJMSActiveMQDestination().isTopic()) {
175 
176             Set set = destinationMap.get(message.getJMSActiveMQDestination());
177             for (Iterator i = set.iterator(); i.hasNext();) {
178                 TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) i.next();
179                 container.targetAndDispatch(client,message);
180             }
181         }
182     }
183 
184     /***
185      * @param client
186      * @param ack
187      * @throws JMSException
188      * 
189      */
190     public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
191     }
192 
193     /***
194      * @param client
195      * @param transactionId
196      * @param ack
197      * @throws JMSException
198      * 
199      */
200     public void acknowledgeTransactedMessage(BrokerClient client, String transactionId, MessageAck ack) throws JMSException {
201     }
202 
203     /***
204      * @param client
205      * @param ack
206      * @throws JMSException
207      * 
208      */
209     public void redeliverMessage(BrokerClient client, MessageAck ack) throws JMSException {
210     }
211     
212     /***
213      * @throws JMSException
214      * 
215      */
216 
217     public void poll() throws JMSException {
218     }
219 
220     /***
221      * A hook when the transaction is about to be commited; so apply all outstanding commands to the Journal if using a
222      * Journal (transaction log)
223      *
224      * @param client
225      * @param transactionId
226      * @throws JMSException
227      */
228     public void commitTransaction(BrokerClient client, String transactionId) throws JMSException {
229     }
230 
231     /***
232      * A hook when the transaction is about to be rolled back; so discard all outstanding commands that are pending to
233      * be written to the Journal
234      *
235      * @param client
236      * @param transactionId
237      */
238     public void rollbackTransaction(BrokerClient client, String transactionId) {
239     }
240 
241     /***
242      * For Transient topics - a MessageContainer maps on to the messages
243      * to be dispatched through a BrokerClient, not a destination
244      * @param physicalName
245      * @return the MessageContainer used for dispatching - always returns null
246      * @throws JMSException
247      */
248     public MessageContainer getContainer(String physicalName) throws JMSException {
249         return null; 
250     }
251 
252     /***
253      * @return a map of all the destinations
254      */
255     public Map getDestinations() {
256         return Collections.unmodifiableMap(destinations);
257     }
258 
259     /***
260      * Create filter for a Consumer
261      *
262      * @param info
263      * @return the Fitler
264      * @throws javax.jms.JMSException
265      */
266     protected Filter createFilter(ConsumerInfo info) throws JMSException {
267         Filter filter = filterFactory.createFilter(info.getDestination(), info.getSelector());
268         if (info.isNoLocal()) {
269             filter = new AndFilter(filter, new NoLocalFilter(info.getClientId()));
270         }
271         return filter;
272     }
273 
274     protected boolean hasConsumerFor(ActiveMQDestination destination) {
275         for (Iterator i = containers.values().iterator(); i.hasNext();) {
276             TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) i.next();
277             if (container.hasConsumerFor(destination)) {
278                 return true;
279             }
280         }
281         return false;
282     }
283     
284 }