View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.store.vm;
19  
20  import org.codehaus.activemq.message.ConsumerInfo;
21  import org.codehaus.activemq.message.MessageAck;
22  import org.codehaus.activemq.service.MessageContainer;
23  import org.codehaus.activemq.service.MessageIdentity;
24  import org.codehaus.activemq.service.SubscriberEntry;
25  import org.codehaus.activemq.service.Subscription;
26  import org.codehaus.activemq.store.TopicMessageStore;
27  
28  import javax.jms.JMSException;
29  import java.util.Collections;
30  import java.util.HashMap;
31  import java.util.LinkedHashMap;
32  import java.util.Map;
33  
34  /***
35   * @version $Revision: 1.2 $
36   */
37  public class VMTopicMessageStore extends VMMessageStore implements TopicMessageStore {
38      private static final Integer ONE = new Integer(1);
39  
40      private Map ackDatabase;
41      private Map messageCounts;
42      private Map subscriberDatabase;
43  
44      public VMTopicMessageStore() {
45          this(new LinkedHashMap(), makeMap(), makeMap(), makeMap());
46      }
47  
48      public VMTopicMessageStore(LinkedHashMap messageTable, Map subscriberDatabase, Map ackDatabase, Map messageCounts) {
49          super(messageTable);
50          this.subscriberDatabase = subscriberDatabase;
51          this.ackDatabase = ackDatabase;
52          this.messageCounts = messageCounts;
53      }
54  
55      public void setMessageContainer(MessageContainer container) {
56      }
57  
58      public synchronized void incrementMessageCount(MessageIdentity messageId) throws JMSException {
59          Integer number = (Integer) messageCounts.get(messageId);
60          if (number == null) {
61              number = ONE;
62          }
63          else {
64              number = new Integer(number.intValue() + 1);
65          }
66          messageCounts.put(messageId, number);
67      }
68  
69      public synchronized void decrementMessageCountAndMaybeDelete(MessageIdentity messageIdentity, MessageAck ack) throws JMSException {
70          Integer number = (Integer) messageCounts.get(messageIdentity);
71          if (number == null || number.intValue() <= 1) {
72              removeMessage(messageIdentity, ack);
73              if (number != null) {
74                  messageCounts.remove(messageIdentity);
75              }
76          }
77          else {
78              messageCounts.put(messageIdentity, new Integer(number.intValue() - 1));
79              number = ONE;
80          }
81      }
82  
83      public void setLastAcknowledgedMessageIdentity(Subscription subscription, MessageIdentity messageIdentity) throws JMSException {
84          String consumerId = subscription.getConsumerId();
85          ackDatabase.put(consumerId, messageIdentity);
86      }
87  
88      public void recoverSubscription(Subscription subscription, MessageIdentity lastDispatchedMessage) {
89      }
90  
91      public MessageIdentity getLastestMessageIdentity() throws JMSException {
92          return null;  /*** TODO */
93      }
94  
95      public SubscriberEntry getSubscriberEntry(ConsumerInfo info) throws JMSException {
96          Object key = info.getConsumerKey();
97          return (SubscriberEntry) subscriberDatabase.get(key);
98      }
99  
100     public void setSubscriberEntry(ConsumerInfo info, SubscriberEntry subscriberEntry) throws JMSException {
101         subscriberDatabase.put(info.getConsumerKey(), subscriberEntry);
102     }
103 
104     public void stop() throws JMSException {
105         ackDatabase.clear();
106         messageCounts.clear();
107         super.stop();
108     }
109 
110     protected static Map makeMap() {
111         return Collections.synchronizedMap(new HashMap());
112     }
113 }