1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.store.jdbm;
19
20 import jdbm.btree.BTree;
21 import jdbm.helper.Tuple;
22 import jdbm.helper.TupleBrowser;
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.codehaus.activemq.AlreadyClosedException;
26 import org.codehaus.activemq.message.ActiveMQMessage;
27 import org.codehaus.activemq.message.MessageAck;
28 import org.codehaus.activemq.service.MessageContainer;
29 import org.codehaus.activemq.service.MessageIdentity;
30 import org.codehaus.activemq.service.QueueMessageContainer;
31 import org.codehaus.activemq.service.impl.MessageEntry;
32 import org.codehaus.activemq.store.MessageStore;
33 import org.codehaus.activemq.util.JMSExceptionHelper;
34
35 import javax.jms.JMSException;
36 import java.io.IOException;
37
38 /***
39 * @version $Revision: 1.4 $
40 */
41 public class JdbmMessageStore implements MessageStore {
42 private static final Log log = LogFactory.getLog(JdbmMessageStore.class);
43
44 private MessageContainer container;
45 private BTree messageTable;
46 private BTree orderedIndex;
47 private long lastSequenceNumber = 0;
48
49 public JdbmMessageStore(BTree messageTable, BTree orderedIndex) {
50 this.messageTable = messageTable;
51 this.orderedIndex = orderedIndex;
52 }
53
54 public void setMessageContainer(MessageContainer container) {
55 this.container = container;
56 }
57
58 public synchronized MessageIdentity addMessage(ActiveMQMessage message) throws JMSException {
59 if (log.isDebugEnabled()) {
60 log.debug("Adding message to container: " + message);
61 }
62 MessageEntry entry = new MessageEntry(message);
63 Object sequenceNumber = null;
64 synchronized (this) {
65 sequenceNumber = new Long(++lastSequenceNumber);
66 }
67 try {
68 String messageID = message.getJMSMessageID();
69 getMessageTable().insert(messageID, entry, true);
70 getOrderedIndex().insert(sequenceNumber, messageID, true);
71
72 MessageIdentity answer = message.getJMSMessageIdentity();
73 answer.setSequenceNumber(sequenceNumber);
74 return answer;
75 }
76 catch (IOException e) {
77 throw JMSExceptionHelper.newJMSException("Failed to add message: " + message + " in container: " + e, e);
78 }
79 }
80
81 public synchronized ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException {
82 String messageID = identity.getMessageID();
83 ActiveMQMessage message = null;
84 try {
85 MessageEntry entry = (MessageEntry) getMessageTable().find(messageID);
86 if (entry != null) {
87 message = entry.getMessage();
88 message.getJMSMessageIdentity().setSequenceNumber(identity.getSequenceNumber());
89 }
90 }
91 catch (IOException e) {
92 throw JMSExceptionHelper.newJMSException("Failed to get message for messageID: " + messageID + " " + e, e);
93 }
94 return message;
95 }
96
97 public synchronized void removeMessage(MessageIdentity identity, MessageAck ack) throws JMSException {
98 String messageID = identity.getMessageID();
99 Object sequenceNumber = null;
100 if (messageID == null) {
101 throw new JMSException("Cannot remove message with null messageID for sequence number: " + identity.getSequenceNumber());
102 }
103 try {
104 sequenceNumber = identity.getSequenceNumber();
105 if (sequenceNumber == null) {
106 sequenceNumber = findSequenceNumber(messageID);
107 identity.setSequenceNumber(sequenceNumber);
108 }
109 getMessageTable().remove(messageID);
110 getOrderedIndex().remove(sequenceNumber);
111 }
112 catch (IOException e) {
113 throw JMSExceptionHelper.newJMSException("Failed to delete message for messageID: " + messageID + " " + e, e);
114 }
115 catch (IllegalArgumentException e) {
116 log.warn("Could not find sequence number: " + sequenceNumber + " in queue. " + e);
117 }
118 }
119
120 public synchronized void recover(QueueMessageContainer container) throws JMSException {
121 try {
122 Tuple tuple = new Tuple();
123 TupleBrowser iter = getOrderedIndex().browse();
124 while (iter.getNext(tuple)) {
125 Long key = (Long) tuple.getKey();
126 MessageIdentity messageIdentity = null;
127 if (key != null) {
128 String messageID = (String) tuple.getValue();
129 if (messageID != null) {
130 messageIdentity = new MessageIdentity(messageID, key);
131 }
132 }
133 if (messageIdentity != null) {
134 container.recoverMessageToBeDelivered(messageIdentity);
135 }
136 else {
137 log.warn("Could not find message for sequenceNumber: " + key);
138 }
139 }
140 }
141 catch (IOException e) {
142 throw JMSExceptionHelper.newJMSException("Failed to recover the durable queue store. Reason: " + e, e);
143 }
144 }
145
146 public synchronized void start() throws JMSException {
147 try {
148
149 Tuple tuple = new Tuple();
150 Long lastSequenceNumber = null;
151 TupleBrowser iter = getOrderedIndex().browse();
152 while (iter.getNext(tuple)) {
153 lastSequenceNumber = (Long) tuple.getKey();
154 }
155 if (lastSequenceNumber != null) {
156 this.lastSequenceNumber = lastSequenceNumber.longValue();
157 if (log.isDebugEnabled()) {
158 log.debug("Last sequence number is: " + lastSequenceNumber + " for: " + this);
159 }
160 }
161 else {
162 if (log.isDebugEnabled()) {
163 log.debug("Started empty database for: " + this);
164 }
165 }
166 }
167 catch (IOException e) {
168 throw JMSExceptionHelper.newJMSException("Failed to find the last sequence number. Reason: " + e, e);
169 }
170 }
171
172 public synchronized void stop() throws JMSException {
173 JMSException firstException = closeTable(orderedIndex, null);
174 firstException = closeTable(messageTable, firstException);
175 orderedIndex = null;
176 messageTable = null;
177 if (firstException != null) {
178 throw firstException;
179 }
180 }
181
182
183
184
185
186 protected MessageContainer getContainer() {
187 return container;
188 }
189
190 protected long getLastSequenceNumber() {
191 return lastSequenceNumber;
192 }
193
194 protected BTree getMessageTable() throws AlreadyClosedException {
195 if (messageTable == null) {
196 throw new AlreadyClosedException("JDBM MessageStore");
197 }
198 return messageTable;
199 }
200
201 protected BTree getOrderedIndex() throws AlreadyClosedException {
202 if (orderedIndex == null) {
203 throw new AlreadyClosedException("JDBM MessageStore");
204 }
205 return orderedIndex;
206 }
207
208
209 /***
210 * Looks up the message using the given sequence number
211 */
212 protected ActiveMQMessage getMessageBySequenceNumber(Long sequenceNumber) throws IOException, JMSException {
213 ActiveMQMessage message = null;
214 String messageID = (String) getOrderedIndex().find(sequenceNumber);
215 if (messageID != null) {
216 message = getMessage(new MessageIdentity(messageID, sequenceNumber));
217 }
218 return message;
219 }
220
221 /***
222 * Finds the sequence number for the given messageID
223 *
224 * @param messageID
225 * @return
226 */
227 protected Object findSequenceNumber(String messageID) throws IOException, AlreadyClosedException {
228 log.warn("Having to table scan to find the sequence number for messageID: " + messageID);
229
230 Tuple tuple = new Tuple();
231 TupleBrowser iter = getOrderedIndex().browse();
232 while (iter.getNext(tuple)) {
233 Object value = tuple.getValue();
234 if (messageID.equals(value)) {
235 return tuple.getKey();
236 }
237 }
238 return null;
239 }
240
241 protected JMSException closeTable(BTree table, JMSException firstException) {
242 table = null;
243 return null;
244 }
245 }