View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.store.bdb;
19  
20  import com.sleepycat.je.*;
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.codehaus.activemq.AlreadyClosedException;
24  import org.codehaus.activemq.message.ActiveMQMessage;
25  import org.codehaus.activemq.message.MessageAck;
26  import org.codehaus.activemq.message.WireFormat;
27  import org.codehaus.activemq.service.MessageContainer;
28  import org.codehaus.activemq.service.MessageIdentity;
29  import org.codehaus.activemq.service.QueueMessageContainer;
30  import org.codehaus.activemq.store.MessageStore;
31  import org.codehaus.activemq.util.JMSExceptionHelper;
32  
33  import javax.jms.JMSException;
34  import java.io.IOException;
35  
36  /***
37   * @version $Revision: 1.2 $
38   */
39  public class BDbMessageStore implements MessageStore {
40      private static final Log log = LogFactory.getLog(BDbMessageStore.class);
41  
42      private Database database;
43      private WireFormat wireFormat;
44      private SecondaryDatabase secondaryDatabase;
45      private SecondaryConfig secondaryConfig;
46      private SequenceNumberCreator sequenceNumberCreator;
47      private MessageContainer container;
48      private CursorConfig cursorConfig;
49  
50  
51      public BDbMessageStore(Database database, SecondaryDatabase secondaryDatabase, SecondaryConfig secondaryConfig, SequenceNumberCreator sequenceNumberCreator, WireFormat wireFormat) {
52          this.database = database;
53          this.secondaryDatabase = secondaryDatabase;
54          this.secondaryConfig = secondaryConfig;
55          this.sequenceNumberCreator = sequenceNumberCreator;
56          this.wireFormat = wireFormat;
57      }
58  
59      public void setMessageContainer(MessageContainer container) {
60          this.container = container;
61      }
62  
63      public MessageIdentity addMessage(ActiveMQMessage message) throws JMSException {
64          checkClosed();
65          String messageID = message.getJMSMessageID();
66          try {
67              Transaction transaction = BDbHelper.getTransaction();
68              DatabaseEntry key = createKey(messageID);
69              DatabaseEntry value = new DatabaseEntry(asBytes(message));
70              database.put(transaction, key, value);
71  
72              MessageIdentity answer = message.getJMSMessageIdentity();
73              answer.setSequenceNumber(sequenceNumberCreator.getLastKey());
74              return answer;
75          }
76          catch (DatabaseException e) {
77              throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);
78          }
79          catch (IOException e) {
80              throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);
81          }
82      }
83  
84      public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException {
85          checkClosed();
86          ActiveMQMessage answer = null;
87          String messageID = identity.getMessageID();
88          try {
89              DatabaseEntry key = createKey(messageID);
90              DatabaseEntry value = new DatabaseEntry();
91              if (database.get(null, key, value, null) == OperationStatus.SUCCESS) {
92                  answer = extractMessage(value);
93              }
94              return answer;
95          }
96          catch (DatabaseException e) {
97              throw JMSExceptionHelper.newJMSException("Failed to peek next message after: " + messageID + " from container: " + e, e);
98          }
99          catch (IOException e) {
100             throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);
101         }
102     }
103 
104     public void removeMessage(MessageIdentity identity, MessageAck ack) throws JMSException {
105         checkClosed();
106         String messageID = identity.getMessageID();
107         try {
108             Transaction transaction = BDbHelper.getTransaction();
109 
110             // we need to find the alternative primary key for the given messageID
111             DatabaseEntry sequenceNumber = getSequenceNumberKey(identity);
112 
113             //System.out.println("Deleting sequenceNumber: " + BDbHelper.longFromBytes(sequenceNumber.getData()));
114 
115             sequenceNumberCreator.setDeleteKey(sequenceNumber);
116 
117             OperationStatus status = secondaryDatabase.delete(transaction, sequenceNumber);
118             if (status != OperationStatus.SUCCESS) {
119                 log.error("Could not delete sequenece number for: " + identity + " status: " + status);
120             }
121         }
122         catch (DatabaseException e) {
123             throw JMSExceptionHelper.newJMSException("Failed to delete message: " + messageID + " from container: " + e, e);
124         }
125     }
126 
127     public void recover(QueueMessageContainer container) throws JMSException {
128         checkClosed();
129         SecondaryCursor cursor = null;
130         try {
131             cursor = secondaryDatabase.openSecondaryCursor(BDbHelper.getTransaction(), cursorConfig);
132             DatabaseEntry sequenceNumberEntry = new DatabaseEntry();
133             DatabaseEntry keyEntry = new DatabaseEntry();
134             DatabaseEntry valueEntry = new DatabaseEntry();
135             OperationStatus status = cursor.getFirst(sequenceNumberEntry, keyEntry, valueEntry, LockMode.DEFAULT);
136             while (status == OperationStatus.SUCCESS) {
137                 String messageID = extractString(keyEntry);
138                 container.recoverMessageToBeDelivered(new MessageIdentity(messageID, sequenceNumberEntry));
139                 status = cursor.getNext(sequenceNumberEntry, keyEntry, valueEntry, LockMode.DEFAULT);
140             }
141             if (status != OperationStatus.NOTFOUND) {
142                 log.warn("Unexpected status code while recovering: " + status);
143             }
144         }
145         catch (DatabaseException e) {
146             throw JMSExceptionHelper.newJMSException("Failed to recover container. Reason: " + e, e);
147         }
148         finally {
149             if (cursor != null) {
150                 try {
151                     cursor.close();
152                 }
153                 catch (DatabaseException e) {
154                     log.warn("Caught exception closing cursor: " + e, e);
155                 }
156             }
157         }
158 
159     }
160 
161 
162     public void start() throws JMSException {
163     }
164 
165     public void stop() throws JMSException {
166         JMSException firstException = BDbPersistenceAdapter.closeDatabase(secondaryDatabase, null);
167         firstException = BDbPersistenceAdapter.closeDatabase(database, firstException);
168 
169         secondaryDatabase = null;
170         database = null;
171 
172         if (firstException != null) {
173             throw firstException;
174         }
175     }
176 
177     // Implementation methods
178     //-------------------------------------------------------------------------
179     protected SecondaryDatabase getSecondaryDatabase() {
180         return secondaryDatabase;
181     }
182 
183     protected Database getDatabase() {
184         return database;
185     }
186 
187     public CursorConfig getCursorConfig() {
188         return cursorConfig;
189     }
190 
191     public MessageContainer getContainer() {
192         return container;
193     }
194 
195 
196     protected void checkClosed() throws AlreadyClosedException {
197         if (database == null) {
198             throw new AlreadyClosedException("Berkeley DB MessageStore");
199         }
200     }
201 
202     /***
203      * Returns the sequence number key for the given message identity. If the
204      * sequence number is not available it will be queried (which is slow & will generate a warning
205      * as it is not recommended) and then it'll be cached inside the MessageIdentity
206      *
207      * @param identity
208      * @return
209      * @throws DatabaseException
210      */
211     protected DatabaseEntry getSequenceNumberKey(MessageIdentity identity) throws DatabaseException {
212         DatabaseEntry sequenceNumber = (DatabaseEntry) identity.getSequenceNumber();
213         if (sequenceNumber == null) {
214             sequenceNumber = findSequenceNumber(identity.getMessageID());
215         }
216         return sequenceNumber;
217     }
218 
219     protected DatabaseEntry createKey(String messageID) {
220         DatabaseEntry key = new DatabaseEntry(asBytes(messageID));
221         return key;
222     }
223 
224 
225     /***
226      * Iterates through from the start of the collection until the given message ID is found
227      *
228      * @param messageID
229      * @return
230      */
231     protected DatabaseEntry findSequenceNumber(String messageID) throws DatabaseException {
232         log.warn("Having to table scan to find the sequence number for messageID: " + messageID);
233 
234         SecondaryCursor cursor = null;
235         try {
236             cursor = secondaryDatabase.openSecondaryCursor(BDbHelper.getTransaction(), cursorConfig);
237             DatabaseEntry sequenceNumberEntry = new DatabaseEntry();
238             DatabaseEntry keyEntry = new DatabaseEntry();
239             DatabaseEntry valueEntry = new DatabaseEntry();
240             OperationStatus status = cursor.getFirst(sequenceNumberEntry, keyEntry, valueEntry, LockMode.DEFAULT);
241             while (status == OperationStatus.SUCCESS) {
242                 String value = extractString(keyEntry);
243                 if (messageID.equals(value)) {
244                     return sequenceNumberEntry;
245                 }
246                 status = cursor.getNext(sequenceNumberEntry, keyEntry, valueEntry, LockMode.DEFAULT);
247             }
248         }
249         finally {
250             if (cursor != null) {
251                 try {
252                     cursor.close();
253                 }
254                 catch (DatabaseException e) {
255                     log.warn("Caught exception closing cursor: " + e, e);
256                 }
257             }
258         }
259         return null;
260     }
261 
262     protected String extractString(DatabaseEntry entry) {
263         return new String(entry.getData(), entry.getOffset(), entry.getSize());
264     }
265 
266     protected ActiveMQMessage extractMessage(DatabaseEntry value) throws IOException {
267         // we must synchronize access to wireFormat
268         synchronized (wireFormat) {
269             return (ActiveMQMessage) wireFormat.fromBytes(value.getData(), value.getOffset(), value.getSize());
270         }
271     }
272 
273     protected byte[] asBytes(ActiveMQMessage message) throws IOException, JMSException {
274         // we must synchronize access to wireFormat
275         synchronized (wireFormat) {
276             return wireFormat.toBytes(message);
277         }
278     }
279 
280     protected byte[] asBytes(String messageID) {
281         return messageID.getBytes();
282     }
283 
284 }