1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.store.bdb;
19
20 import com.sleepycat.je.*;
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.codehaus.activemq.AlreadyClosedException;
24 import org.codehaus.activemq.message.ActiveMQMessage;
25 import org.codehaus.activemq.message.MessageAck;
26 import org.codehaus.activemq.message.WireFormat;
27 import org.codehaus.activemq.service.MessageContainer;
28 import org.codehaus.activemq.service.MessageIdentity;
29 import org.codehaus.activemq.service.QueueMessageContainer;
30 import org.codehaus.activemq.store.MessageStore;
31 import org.codehaus.activemq.util.JMSExceptionHelper;
32
33 import javax.jms.JMSException;
34 import java.io.IOException;
35
36 /***
37 * @version $Revision: 1.2 $
38 */
39 public class BDbMessageStore implements MessageStore {
40 private static final Log log = LogFactory.getLog(BDbMessageStore.class);
41
42 private Database database;
43 private WireFormat wireFormat;
44 private SecondaryDatabase secondaryDatabase;
45 private SecondaryConfig secondaryConfig;
46 private SequenceNumberCreator sequenceNumberCreator;
47 private MessageContainer container;
48 private CursorConfig cursorConfig;
49
50
51 public BDbMessageStore(Database database, SecondaryDatabase secondaryDatabase, SecondaryConfig secondaryConfig, SequenceNumberCreator sequenceNumberCreator, WireFormat wireFormat) {
52 this.database = database;
53 this.secondaryDatabase = secondaryDatabase;
54 this.secondaryConfig = secondaryConfig;
55 this.sequenceNumberCreator = sequenceNumberCreator;
56 this.wireFormat = wireFormat;
57 }
58
59 public void setMessageContainer(MessageContainer container) {
60 this.container = container;
61 }
62
63 public MessageIdentity addMessage(ActiveMQMessage message) throws JMSException {
64 checkClosed();
65 String messageID = message.getJMSMessageID();
66 try {
67 Transaction transaction = BDbHelper.getTransaction();
68 DatabaseEntry key = createKey(messageID);
69 DatabaseEntry value = new DatabaseEntry(asBytes(message));
70 database.put(transaction, key, value);
71
72 MessageIdentity answer = message.getJMSMessageIdentity();
73 answer.setSequenceNumber(sequenceNumberCreator.getLastKey());
74 return answer;
75 }
76 catch (DatabaseException e) {
77 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);
78 }
79 catch (IOException e) {
80 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);
81 }
82 }
83
84 public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException {
85 checkClosed();
86 ActiveMQMessage answer = null;
87 String messageID = identity.getMessageID();
88 try {
89 DatabaseEntry key = createKey(messageID);
90 DatabaseEntry value = new DatabaseEntry();
91 if (database.get(null, key, value, null) == OperationStatus.SUCCESS) {
92 answer = extractMessage(value);
93 }
94 return answer;
95 }
96 catch (DatabaseException e) {
97 throw JMSExceptionHelper.newJMSException("Failed to peek next message after: " + messageID + " from container: " + e, e);
98 }
99 catch (IOException e) {
100 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);
101 }
102 }
103
104 public void removeMessage(MessageIdentity identity, MessageAck ack) throws JMSException {
105 checkClosed();
106 String messageID = identity.getMessageID();
107 try {
108 Transaction transaction = BDbHelper.getTransaction();
109
110
111 DatabaseEntry sequenceNumber = getSequenceNumberKey(identity);
112
113
114
115 sequenceNumberCreator.setDeleteKey(sequenceNumber);
116
117 OperationStatus status = secondaryDatabase.delete(transaction, sequenceNumber);
118 if (status != OperationStatus.SUCCESS) {
119 log.error("Could not delete sequenece number for: " + identity + " status: " + status);
120 }
121 }
122 catch (DatabaseException e) {
123 throw JMSExceptionHelper.newJMSException("Failed to delete message: " + messageID + " from container: " + e, e);
124 }
125 }
126
127 public void recover(QueueMessageContainer container) throws JMSException {
128 checkClosed();
129 SecondaryCursor cursor = null;
130 try {
131 cursor = secondaryDatabase.openSecondaryCursor(BDbHelper.getTransaction(), cursorConfig);
132 DatabaseEntry sequenceNumberEntry = new DatabaseEntry();
133 DatabaseEntry keyEntry = new DatabaseEntry();
134 DatabaseEntry valueEntry = new DatabaseEntry();
135 OperationStatus status = cursor.getFirst(sequenceNumberEntry, keyEntry, valueEntry, LockMode.DEFAULT);
136 while (status == OperationStatus.SUCCESS) {
137 String messageID = extractString(keyEntry);
138 container.recoverMessageToBeDelivered(new MessageIdentity(messageID, sequenceNumberEntry));
139 status = cursor.getNext(sequenceNumberEntry, keyEntry, valueEntry, LockMode.DEFAULT);
140 }
141 if (status != OperationStatus.NOTFOUND) {
142 log.warn("Unexpected status code while recovering: " + status);
143 }
144 }
145 catch (DatabaseException e) {
146 throw JMSExceptionHelper.newJMSException("Failed to recover container. Reason: " + e, e);
147 }
148 finally {
149 if (cursor != null) {
150 try {
151 cursor.close();
152 }
153 catch (DatabaseException e) {
154 log.warn("Caught exception closing cursor: " + e, e);
155 }
156 }
157 }
158
159 }
160
161
162 public void start() throws JMSException {
163 }
164
165 public void stop() throws JMSException {
166 JMSException firstException = BDbPersistenceAdapter.closeDatabase(secondaryDatabase, null);
167 firstException = BDbPersistenceAdapter.closeDatabase(database, firstException);
168
169 secondaryDatabase = null;
170 database = null;
171
172 if (firstException != null) {
173 throw firstException;
174 }
175 }
176
177
178
179 protected SecondaryDatabase getSecondaryDatabase() {
180 return secondaryDatabase;
181 }
182
183 protected Database getDatabase() {
184 return database;
185 }
186
187 public CursorConfig getCursorConfig() {
188 return cursorConfig;
189 }
190
191 public MessageContainer getContainer() {
192 return container;
193 }
194
195
196 protected void checkClosed() throws AlreadyClosedException {
197 if (database == null) {
198 throw new AlreadyClosedException("Berkeley DB MessageStore");
199 }
200 }
201
202 /***
203 * Returns the sequence number key for the given message identity. If the
204 * sequence number is not available it will be queried (which is slow & will generate a warning
205 * as it is not recommended) and then it'll be cached inside the MessageIdentity
206 *
207 * @param identity
208 * @return
209 * @throws DatabaseException
210 */
211 protected DatabaseEntry getSequenceNumberKey(MessageIdentity identity) throws DatabaseException {
212 DatabaseEntry sequenceNumber = (DatabaseEntry) identity.getSequenceNumber();
213 if (sequenceNumber == null) {
214 sequenceNumber = findSequenceNumber(identity.getMessageID());
215 }
216 return sequenceNumber;
217 }
218
219 protected DatabaseEntry createKey(String messageID) {
220 DatabaseEntry key = new DatabaseEntry(asBytes(messageID));
221 return key;
222 }
223
224
225 /***
226 * Iterates through from the start of the collection until the given message ID is found
227 *
228 * @param messageID
229 * @return
230 */
231 protected DatabaseEntry findSequenceNumber(String messageID) throws DatabaseException {
232 log.warn("Having to table scan to find the sequence number for messageID: " + messageID);
233
234 SecondaryCursor cursor = null;
235 try {
236 cursor = secondaryDatabase.openSecondaryCursor(BDbHelper.getTransaction(), cursorConfig);
237 DatabaseEntry sequenceNumberEntry = new DatabaseEntry();
238 DatabaseEntry keyEntry = new DatabaseEntry();
239 DatabaseEntry valueEntry = new DatabaseEntry();
240 OperationStatus status = cursor.getFirst(sequenceNumberEntry, keyEntry, valueEntry, LockMode.DEFAULT);
241 while (status == OperationStatus.SUCCESS) {
242 String value = extractString(keyEntry);
243 if (messageID.equals(value)) {
244 return sequenceNumberEntry;
245 }
246 status = cursor.getNext(sequenceNumberEntry, keyEntry, valueEntry, LockMode.DEFAULT);
247 }
248 }
249 finally {
250 if (cursor != null) {
251 try {
252 cursor.close();
253 }
254 catch (DatabaseException e) {
255 log.warn("Caught exception closing cursor: " + e, e);
256 }
257 }
258 }
259 return null;
260 }
261
262 protected String extractString(DatabaseEntry entry) {
263 return new String(entry.getData(), entry.getOffset(), entry.getSize());
264 }
265
266 protected ActiveMQMessage extractMessage(DatabaseEntry value) throws IOException {
267
268 synchronized (wireFormat) {
269 return (ActiveMQMessage) wireFormat.fromBytes(value.getData(), value.getOffset(), value.getSize());
270 }
271 }
272
273 protected byte[] asBytes(ActiveMQMessage message) throws IOException, JMSException {
274
275 synchronized (wireFormat) {
276 return wireFormat.toBytes(message);
277 }
278 }
279
280 protected byte[] asBytes(String messageID) {
281 return messageID.getBytes();
282 }
283
284 }