1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.store.vm;
19
20 import org.codehaus.activemq.message.ConsumerInfo;
21 import org.codehaus.activemq.message.MessageAck;
22 import org.codehaus.activemq.service.MessageContainer;
23 import org.codehaus.activemq.service.MessageIdentity;
24 import org.codehaus.activemq.service.SubscriberEntry;
25 import org.codehaus.activemq.service.Subscription;
26 import org.codehaus.activemq.store.TopicMessageStore;
27
28 import javax.jms.JMSException;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.LinkedHashMap;
32 import java.util.Map;
33
34 /***
35 * @version $Revision: 1.2 $
36 */
37 public class VMTopicMessageStore extends VMMessageStore implements TopicMessageStore {
38 private static final Integer ONE = new Integer(1);
39
40 private Map ackDatabase;
41 private Map messageCounts;
42 private Map subscriberDatabase;
43
44 public VMTopicMessageStore() {
45 this(new LinkedHashMap(), makeMap(), makeMap(), makeMap());
46 }
47
48 public VMTopicMessageStore(LinkedHashMap messageTable, Map subscriberDatabase, Map ackDatabase, Map messageCounts) {
49 super(messageTable);
50 this.subscriberDatabase = subscriberDatabase;
51 this.ackDatabase = ackDatabase;
52 this.messageCounts = messageCounts;
53 }
54
55 public void setMessageContainer(MessageContainer container) {
56 }
57
58 public synchronized void incrementMessageCount(MessageIdentity messageId) throws JMSException {
59 Integer number = (Integer) messageCounts.get(messageId);
60 if (number == null) {
61 number = ONE;
62 }
63 else {
64 number = new Integer(number.intValue() + 1);
65 }
66 messageCounts.put(messageId, number);
67 }
68
69 public synchronized void decrementMessageCountAndMaybeDelete(MessageIdentity messageIdentity, MessageAck ack) throws JMSException {
70 Integer number = (Integer) messageCounts.get(messageIdentity);
71 if (number == null || number.intValue() <= 1) {
72 removeMessage(messageIdentity, ack);
73 if (number != null) {
74 messageCounts.remove(messageIdentity);
75 }
76 }
77 else {
78 messageCounts.put(messageIdentity, new Integer(number.intValue() - 1));
79 number = ONE;
80 }
81 }
82
83 public void setLastAcknowledgedMessageIdentity(Subscription subscription, MessageIdentity messageIdentity) throws JMSException {
84 String consumerId = subscription.getConsumerId();
85 ackDatabase.put(consumerId, messageIdentity);
86 }
87
88 public void recoverSubscription(Subscription subscription, MessageIdentity lastDispatchedMessage) {
89 }
90
91 public MessageIdentity getLastestMessageIdentity() throws JMSException {
92 return null; /*** TODO */
93 }
94
95 public SubscriberEntry getSubscriberEntry(ConsumerInfo info) throws JMSException {
96 Object key = info.getConsumerKey();
97 return (SubscriberEntry) subscriberDatabase.get(key);
98 }
99
100 public void setSubscriberEntry(ConsumerInfo info, SubscriberEntry subscriberEntry) throws JMSException {
101 subscriberDatabase.put(info.getConsumerKey(), subscriberEntry);
102 }
103
104 public void stop() throws JMSException {
105 ackDatabase.clear();
106 messageCounts.clear();
107 super.stop();
108 }
109
110 protected static Map makeMap() {
111 return Collections.synchronizedMap(new HashMap());
112 }
113 }