1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.store.jdbm;
19
20 import jdbm.btree.BTree;
21 import jdbm.helper.Tuple;
22 import jdbm.helper.TupleBrowser;
23 import org.codehaus.activemq.AlreadyClosedException;
24 import org.codehaus.activemq.message.ActiveMQMessage;
25 import org.codehaus.activemq.message.ConsumerInfo;
26 import org.codehaus.activemq.message.MessageAck;
27 import org.codehaus.activemq.service.MessageIdentity;
28 import org.codehaus.activemq.service.SubscriberEntry;
29 import org.codehaus.activemq.service.Subscription;
30 import org.codehaus.activemq.store.TopicMessageStore;
31 import org.codehaus.activemq.util.JMSExceptionHelper;
32
33 import javax.jms.JMSException;
34 import java.io.IOException;
35
36 /***
37 * @version $Revision: 1.5 $
38 */
39 public class JdbmTopicMessageStore extends JdbmMessageStore implements TopicMessageStore {
40 private static final Integer ONE = new Integer(1);
41
42 private BTree ackDatabase;
43 private BTree messageCounts;
44 private BTree subscriberDetails;
45
46 public JdbmTopicMessageStore(BTree messageTable, BTree orderedIndex, BTree ackDatabase, BTree subscriberDetails, BTree messageCounts) {
47 super(messageTable, orderedIndex);
48 this.ackDatabase = ackDatabase;
49 this.subscriberDetails = subscriberDetails;
50 this.messageCounts = messageCounts;
51 }
52
53 public synchronized void incrementMessageCount(MessageIdentity messageId) throws JMSException {
54 try {
55 Integer number = (Integer) getMessageCounts().find(messageId);
56 if (number == null) {
57 number = ONE;
58 }
59 else {
60 number = new Integer(number.intValue() + 1);
61 }
62 getMessageCounts().insert(messageId, number, true);
63 }
64 catch (IOException e) {
65 throw JMSExceptionHelper.newJMSException("Failed to increment messageCount for messageID: " + messageId + ". Reason: " + e, e);
66 }
67 }
68
69 public synchronized void decrementMessageCountAndMaybeDelete(MessageIdentity messageIdentity, MessageAck ack) throws JMSException {
70 try {
71 Integer number = (Integer) getMessageCounts().find(messageIdentity);
72 if (number == null || number.intValue() <= 1) {
73 removeMessage(messageIdentity, ack);
74 if (number != null) {
75 getMessageCounts().remove(messageIdentity);
76 }
77 }
78 else {
79 getMessageCounts().insert(messageIdentity, new Integer(number.intValue() - 1), true);
80 number = ONE;
81 }
82 }
83 catch (IOException e) {
84 throw JMSExceptionHelper.newJMSException("Failed to increment messageCount for messageID: " + messageIdentity + ". Reason: " + e, e);
85 }
86 }
87
88 public synchronized void setLastAcknowledgedMessageIdentity(Subscription subscription, MessageIdentity messageIdentity) throws JMSException {
89 String key = subscription.getPersistentKey();
90 try {
91 getAckDatabase().insert(key, messageIdentity, true);
92 }
93 catch (IOException e) {
94 throw JMSExceptionHelper.newJMSException("Failed to set ack messageID: " + messageIdentity + " for consumerId: " + key + ". Reason: " + e, e);
95 }
96 }
97
98 public synchronized void recoverSubscription(Subscription subscription, MessageIdentity lastDispatchedMessage) throws JMSException {
99 try {
100 MessageIdentity lastAcked = getLastAcknowledgedMessageIdentity(subscription);
101 if (lastAcked == null) {
102
103
104
105 setLastAcknowledgedMessageIdentity(subscription, lastDispatchedMessage);
106 return;
107 }
108 Object lastAckedSequenceNumber = lastAcked.getSequenceNumber();
109
110
111
112
113 Tuple tuple = getOrderedIndex().findGreaterOrEqual(lastAckedSequenceNumber);
114
115 TupleBrowser iter = getOrderedIndex().browse();
116 while (iter.getNext(tuple)) {
117 Long sequenceNumber = (Long) tuple.getKey();
118 if (sequenceNumber.compareTo(lastAckedSequenceNumber) > 0) {
119 ActiveMQMessage message = null;
120
121
122 message = getMessageBySequenceNumber(sequenceNumber);
123 if (message != null) {
124 subscription.addMessage(getContainer(), message);
125 }
126 }
127 }
128 }
129 catch (IOException e) {
130 throw JMSExceptionHelper.newJMSException("Failed to recover subscription: " + subscription + ". Reason: " + e, e);
131 }
132 }
133
134 public synchronized MessageIdentity getLastestMessageIdentity() throws JMSException {
135 return new MessageIdentity(null, new Long(getLastSequenceNumber()));
136 }
137
138 public SubscriberEntry getSubscriberEntry(ConsumerInfo info) throws JMSException {
139 Object key = info.getConsumerKey();
140 try {
141 return (SubscriberEntry) subscriberDetails.find(key);
142 }
143 catch (IOException e) {
144 throw JMSExceptionHelper.newJMSException("Failed to lookup subscription for info: " + info + ". Reason: " + e, e);
145 }
146 }
147
148 public void setSubscriberEntry(ConsumerInfo info, SubscriberEntry subscriberEntry) throws JMSException {
149 Object key = info.getConsumerKey();
150 try {
151 subscriberDetails.insert(key, subscriberEntry, true);
152 }
153 catch (IOException e) {
154 throw JMSExceptionHelper.newJMSException("Failed to lookup subscription for info: " + info + ". Reason: " + e, e);
155 }
156 }
157
158 public synchronized void stop() throws JMSException {
159 JMSException firstException = closeTable(ackDatabase, null);
160 firstException = closeTable(messageCounts, firstException);
161 ackDatabase = null;
162 messageCounts = null;
163 super.stop();
164 if (firstException != null) {
165 throw firstException;
166 }
167 }
168
169
170
171 protected BTree getMessageCounts() throws AlreadyClosedException {
172 if (messageCounts == null) {
173 throw new AlreadyClosedException("JDBM TopicMessageStore");
174 }
175 return messageCounts;
176 }
177
178 protected BTree getAckDatabase() throws AlreadyClosedException {
179 if (ackDatabase == null) {
180 throw new AlreadyClosedException("JDBM TopicMessageStore");
181 }
182 return ackDatabase;
183 }
184
185 protected MessageIdentity getLastAcknowledgedMessageIdentity(Subscription subscription) throws IOException, AlreadyClosedException {
186 return (MessageIdentity) getAckDatabase().find(subscription.getPersistentKey());
187 }
188
189
190 }