View Javadoc

1   /***
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.service.impl;
19  
20  import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
21  import org.codehaus.activemq.broker.BrokerClient;
22  import org.codehaus.activemq.filter.DestinationMap;
23  import org.codehaus.activemq.filter.Filter;
24  import org.codehaus.activemq.message.ActiveMQDestination;
25  import org.codehaus.activemq.message.ConsumerInfo;
26  import org.codehaus.activemq.service.Dispatcher;
27  import org.codehaus.activemq.service.Subscription;
28  import org.codehaus.activemq.service.SubscriptionContainer;
29  import org.codehaus.activemq.service.RedeliveryPolicy;
30  
31  import java.util.HashSet;
32  import java.util.Iterator;
33  import java.util.Map;
34  import java.util.Set;
35  
36  /***
37   * A default RAM only implementation of the {@link SubscriptionContainer}
38   *
39   * @version $Revision: 1.6 $
40   */
41  public class SubscriptionContainerImpl implements SubscriptionContainer {
42      private Map subscriptions;
43      private DestinationMap destinationIndex = new DestinationMap();
44      private RedeliveryPolicy redeliveryPolicy;
45  
46      public SubscriptionContainerImpl(RedeliveryPolicy redeliveryPolicy) {
47          this(new ConcurrentHashMap(), redeliveryPolicy);
48      }
49  
50      public SubscriptionContainerImpl(Map subscriptions, RedeliveryPolicy redeliveryPolicy) {
51          this.subscriptions = subscriptions;
52          this.redeliveryPolicy = redeliveryPolicy;
53      }
54  
55      public String toString() {
56          return super.toString() + "[size:" + subscriptions.size() + "]";
57      }
58  
59      public RedeliveryPolicy getRedeliveryPolicy() {
60          return redeliveryPolicy;
61      }
62  
63      public Subscription getSubscription(String consumerId) {
64          return (Subscription) subscriptions.get(consumerId);
65      }
66  
67      public Subscription removeSubscription(String consumerId) {
68          Subscription subscription = (Subscription) subscriptions.remove(consumerId);
69          if (subscription != null) {
70              destinationIndex.remove(subscription.getDestination(), subscription);
71          }
72          return subscription;
73      }
74  
75      public Set getSubscriptions(ActiveMQDestination destination) {
76          Object answer = destinationIndex.get(destination);
77          if (answer instanceof Set) {
78              return (Set) answer;
79          }
80          else {
81              Set set = new HashSet(1);
82              set.add(answer);
83              return set;
84          }
85      }
86  
87      public Iterator subscriptionIterator() {
88          return subscriptions.values().iterator();
89      }
90  
91      public Subscription makeSubscription(Dispatcher dispatcher,BrokerClient client, ConsumerInfo info, Filter filter) {
92          Subscription subscription = createSubscription(dispatcher, client,info, filter);
93          subscriptions.put(info.getConsumerId(), subscription);
94          destinationIndex.put(subscription.getDestination(), subscription);
95          return subscription;
96      }
97  
98      protected Subscription createSubscription(Dispatcher dispatcher, BrokerClient client,ConsumerInfo info, Filter filter) {
99          return new SubscriptionImpl(dispatcher, client,info, filter, getRedeliveryPolicy());
100     }
101 }