1 /*** 2 * 3 * Copyright 2004 Protique Ltd 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 **/ 18 package org.codehaus.activemq.store.vm; 19 20 import org.codehaus.activemq.message.ConsumerInfo; 21 import org.codehaus.activemq.message.MessageAck; 22 import org.codehaus.activemq.service.MessageContainer; 23 import org.codehaus.activemq.service.MessageIdentity; 24 import org.codehaus.activemq.service.SubscriberEntry; 25 import org.codehaus.activemq.service.Subscription; 26 import org.codehaus.activemq.store.TopicMessageStore; 27 28 import javax.jms.JMSException; 29 import java.util.Collections; 30 import java.util.HashMap; 31 import java.util.LinkedHashMap; 32 import java.util.Map; 33 34 /*** 35 * @version $Revision: 1.2 $ 36 */ 37 public class VMTopicMessageStore extends VMMessageStore implements TopicMessageStore { 38 private static final Integer ONE = new Integer(1); 39 40 private Map ackDatabase; 41 private Map messageCounts; 42 private Map subscriberDatabase; 43 44 public VMTopicMessageStore() { 45 this(new LinkedHashMap(), makeMap(), makeMap(), makeMap()); 46 } 47 48 public VMTopicMessageStore(LinkedHashMap messageTable, Map subscriberDatabase, Map ackDatabase, Map messageCounts) { 49 super(messageTable); 50 this.subscriberDatabase = subscriberDatabase; 51 this.ackDatabase = ackDatabase; 52 this.messageCounts = messageCounts; 53 } 54 55 public void setMessageContainer(MessageContainer container) { 56 } 57 58 public synchronized void incrementMessageCount(MessageIdentity messageId) throws JMSException { 59 Integer number = (Integer) messageCounts.get(messageId); 60 if (number == null) { 61 number = ONE; 62 } 63 else { 64 number = new Integer(number.intValue() + 1); 65 } 66 messageCounts.put(messageId, number); 67 } 68 69 public synchronized void decrementMessageCountAndMaybeDelete(MessageIdentity messageIdentity, MessageAck ack) throws JMSException { 70 Integer number = (Integer) messageCounts.get(messageIdentity); 71 if (number == null || number.intValue() <= 1) { 72 removeMessage(messageIdentity, ack); 73 if (number != null) { 74 messageCounts.remove(messageIdentity); 75 } 76 } 77 else { 78 messageCounts.put(messageIdentity, new Integer(number.intValue() - 1)); 79 number = ONE; 80 } 81 } 82 83 public void setLastAcknowledgedMessageIdentity(Subscription subscription, MessageIdentity messageIdentity) throws JMSException { 84 String consumerId = subscription.getConsumerId(); 85 ackDatabase.put(consumerId, messageIdentity); 86 } 87 88 public void recoverSubscription(Subscription subscription, MessageIdentity lastDispatchedMessage) { 89 } 90 91 public MessageIdentity getLastestMessageIdentity() throws JMSException { 92 return null; /*** TODO */ 93 } 94 95 public SubscriberEntry getSubscriberEntry(ConsumerInfo info) throws JMSException { 96 Object key = info.getConsumerKey(); 97 return (SubscriberEntry) subscriberDatabase.get(key); 98 } 99 100 public void setSubscriberEntry(ConsumerInfo info, SubscriberEntry subscriberEntry) throws JMSException { 101 subscriberDatabase.put(info.getConsumerKey(), subscriberEntry); 102 } 103 104 public void stop() throws JMSException { 105 ackDatabase.clear(); 106 messageCounts.clear(); 107 super.stop(); 108 } 109 110 protected static Map makeMap() { 111 return Collections.synchronizedMap(new HashMap()); 112 } 113 }