View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.store.bdb;
19  
20  import com.sleepycat.je.Database;
21  import com.sleepycat.je.DatabaseConfig;
22  import com.sleepycat.je.DatabaseException;
23  import com.sleepycat.je.Environment;
24  import com.sleepycat.je.SecondaryConfig;
25  import com.sleepycat.je.SecondaryDatabase;
26  import com.sleepycat.je.SecondaryKeyCreator;
27  import com.sleepycat.je.Transaction;
28  import com.sleepycat.je.TransactionConfig;
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.codehaus.activemq.message.DefaultWireFormat;
32  import org.codehaus.activemq.message.WireFormat;
33  import org.codehaus.activemq.service.impl.PersistenceAdapterSupport;
34  import org.codehaus.activemq.store.MessageStore;
35  import org.codehaus.activemq.store.PersistenceAdapter;
36  import org.codehaus.activemq.store.PreparedTransactionStore;
37  import org.codehaus.activemq.store.TopicMessageStore;
38  import org.codehaus.activemq.util.JMSExceptionHelper;
39  
40  import javax.jms.JMSException;
41  import java.io.File;
42  import java.util.Map;
43  
44  /***
45   * A {@link PersistenceAdapter} implementation using
46   * <a href="http://www.sleepycat.com">Berkeley DB Java Edition</a>
47   *
48   * @version $Revision: 1.6 $
49   */
50  public class BDbPersistenceAdapter extends PersistenceAdapterSupport {
51      private static final Log log = LogFactory.getLog(BDbPersistenceAdapter.class);
52  
53      private Environment environment;
54      private WireFormat wireFormat;
55      private DatabaseConfig config;
56      private TransactionConfig transactionConfig;
57      private File directory = new File("ActiveMQ");
58  
59  
60      /***
61       * Factory method to create an instance using the defaults
62       *
63       * @param directory the directory in which to store the persistent files
64       * @return
65       * @throws JMSException
66       */
67      public static BDbPersistenceAdapter newInstance(File directory) throws JMSException {
68          return new BDbPersistenceAdapter(directory);
69      }
70  
71  
72      public BDbPersistenceAdapter() {
73          this(null, new DefaultWireFormat());
74      }
75  
76      public BDbPersistenceAdapter(File directory) {
77          this();
78          this.directory = directory;
79      }
80  
81      public BDbPersistenceAdapter(Environment environment, WireFormat wireFormat) {
82          this(environment, wireFormat, BDbHelper.createDatabaseConfig(), new TransactionConfig());
83      }
84  
85      public BDbPersistenceAdapter(Environment environment, WireFormat wireFormat, DatabaseConfig config, TransactionConfig transactionConfig) {
86          this.environment = environment;
87          this.wireFormat = wireFormat;
88          this.config = config;
89          this.transactionConfig = transactionConfig;
90      }
91  
92      public Map getInitialDestinations() {
93          return null;  /*** TODO */
94      }
95  
96      public MessageStore createQueueMessageStore(String destinationName) throws JMSException {
97          try {
98              Database database = createDatabase("Queue_" + destinationName);
99              SequenceNumberCreator sequenceNumberCreator = new SequenceNumberCreator();
100             SecondaryConfig secondaryConfig = createSecondaryConfig(sequenceNumberCreator);
101             SecondaryDatabase secondaryDatabase = createSecondaryDatabase("Queue_Index_" + destinationName, database, secondaryConfig);
102             sequenceNumberCreator.initialise(secondaryDatabase);
103             return new BDbMessageStore(database, secondaryDatabase, secondaryConfig, sequenceNumberCreator, wireFormat.copy());
104         }
105         catch (DatabaseException e) {
106             throw JMSExceptionHelper.newJMSException("Could not create Queue MessageContainer for destination: "
107                     + destinationName + ". Reason: " + e, e);
108         }
109     }
110 
111     public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException {
112         try {
113             Database database = createDatabase("Topic_" + destinationName);
114             SequenceNumberCreator sequenceNumberCreator = new SequenceNumberCreator();
115             SecondaryConfig secondaryConfig = createSecondaryConfig(sequenceNumberCreator);
116             SecondaryDatabase secondaryDatabase = createSecondaryDatabase("Topic_Index_" + destinationName, database, secondaryConfig);
117             sequenceNumberCreator.initialise(secondaryDatabase);
118             Database subscriptionDatabase = createDatabase("ConsumeAck_" + destinationName);
119             return new BDbTopicMessageStore(database, secondaryDatabase, secondaryConfig, sequenceNumberCreator, wireFormat.copy(), subscriptionDatabase);
120         }
121         catch (DatabaseException e) {
122             throw JMSExceptionHelper.newJMSException("Could not create Topic MessageContainer for destination: "
123                     + destinationName + ". Reason: " + e, e);
124         }
125     }
126 
127     public PreparedTransactionStore createPreparedTransactionStore() throws JMSException {
128         try {
129             return new BDbPreparedTransactionStore(createDatabase("XaPrepareTxnDb"));
130         }
131         catch (DatabaseException e) {
132             throw JMSExceptionHelper.newJMSException("Could not create XA Prepare Transaction Database. Reason: " + e, e);
133         }
134     }
135 
136     public void beginTransaction() throws JMSException {
137         try {
138             // TODO temporary hack until BDB supports nested transactions
139             if (BDbHelper.getTransactionCount() == 0) {
140                 Transaction transaction = environment.beginTransaction(BDbHelper.getTransaction(), transactionConfig);
141                 BDbHelper.pushTransaction(transaction);
142             }
143             else {
144                 Transaction transaction = BDbHelper.getTransaction();
145                 BDbHelper.pushTransaction(transaction);
146             }
147         }
148         catch (DatabaseException e) {
149             throw JMSExceptionHelper.newJMSException("Failed to begin transaction: " + e, e);
150         }
151     }
152 
153     public void commitTransaction() throws JMSException {
154         // TODO temporary hack until BDB supports nested transactions
155         if (BDbHelper.getTransactionCount() == 1) {
156             Transaction transaction = BDbHelper.getTransaction();
157             if (transaction == null) {
158                 log.warn("Attempt to commit transaction when non in progress");
159             }
160             else {
161                 try {
162                     transaction.commit();
163                 }
164                 catch (DatabaseException e) {
165                     throw JMSExceptionHelper.newJMSException("Failed to commit transaction: " + transaction + ": " + e, e);
166                 }
167                 finally {
168                     BDbHelper.popTransaction();
169                 }
170             }
171         }
172         else {
173             BDbHelper.popTransaction();
174         }
175     }
176 
177     public void rollbackTransaction() {
178         Transaction transaction = BDbHelper.getTransaction();
179         if (transaction != null) {
180             if (BDbHelper.getTransactionCount() == 1) {
181                 try {
182                     transaction.abort();
183                 }
184                 catch (DatabaseException e) {
185                     log.warn("Cannot rollback transaction due to: " + e, e);
186                 }
187                 finally {
188                     BDbHelper.popTransaction();
189                 }
190             }
191             else {
192                 BDbHelper.popTransaction();
193             }
194         }
195     }
196 
197 
198     public void start() throws JMSException {
199         if (environment == null) {
200             directory.mkdirs();
201 
202             log.info("Creating Berkeley DB based message store in directory: " + directory.getAbsolutePath());
203 
204             try {
205                 environment = BDbHelper.createEnvironment(directory);
206             }
207             catch (DatabaseException e) {
208                 throw JMSExceptionHelper.newJMSException("Failed to open Berkeley DB persistent store at directory: "
209                         + directory + ". Reason: " + e, e);
210             }
211         }
212     }
213 
214     public synchronized void stop() throws JMSException {
215         if (environment != null) {
216             try {
217                 environment.close();
218             }
219             catch (DatabaseException e) {
220                 throw JMSExceptionHelper.newJMSException("Failed to close environment. Reason: " + e, e);
221             }
222             finally {
223                 environment = null;
224             }
225         }
226     }
227 
228     // Properties
229     //-------------------------------------------------------------------------
230     public File getDirectory() {
231         return directory;
232     }
233 
234     public void setDirectory(File directory) {
235         this.directory = directory;
236     }
237 
238     public WireFormat getWireFormat() {
239         return wireFormat;
240     }
241 
242     public void setWireFormat(WireFormat wireFormat) {
243         this.wireFormat = wireFormat;
244     }
245 
246     public TransactionConfig getTransactionConfig() {
247         return transactionConfig;
248     }
249 
250     public void setTransactionConfig(TransactionConfig transactionConfig) {
251         this.transactionConfig = transactionConfig;
252     }
253 
254     public Environment getEnvironment() {
255         return environment;
256     }
257 
258     public void setEnvironment(Environment environment) {
259         this.environment = environment;
260     }
261 
262     public DatabaseConfig getConfig() {
263         return config;
264     }
265 
266     public void setConfig(DatabaseConfig config) {
267         this.config = config;
268     }
269 
270     // Implementation methods
271     //-------------------------------------------------------------------------
272     protected Database createDatabase(String name) throws DatabaseException {
273         //System.out.println("#####   Opening database: " + name);
274 
275         if (log.isTraceEnabled()) {
276             log.trace("Opening database: " + name);
277         }
278         return environment.openDatabase(null, name, config);
279     }
280 
281     protected SecondaryDatabase createSecondaryDatabase(String name, Database database, SecondaryConfig secondaryConfig) throws DatabaseException {
282         //System.out.println("#####   Opening secondary database: " + name);
283 
284         if (log.isTraceEnabled()) {
285             log.trace("Opening secondary database: " + name);
286         }
287         return environment.openSecondaryDatabase(null, name, database, secondaryConfig);
288     }
289 
290     public static JMSException closeDatabase(Database db, JMSException firstException) {
291         if (db != null) {
292 
293             if (log.isTraceEnabled()) {
294                 try {
295                     log.trace("Closing database: " + db.getDatabaseName());
296                 }
297                 catch (DatabaseException e) {
298                     log.trace("Closing database: " + db + " but could not get the name: " + e);
299                 }
300             }
301             try {
302                 //System.out.println("#####  Closing database: " + db.getDatabaseName() + " " + db);
303                 db.close();
304             }
305             catch (DatabaseException e) {
306                 if (firstException == null) {
307                     firstException = JMSExceptionHelper.newJMSException("Failed to close database. Reason: " + e, e);
308                 }
309             }
310         }
311         return firstException;
312     }
313 
314     protected SecondaryConfig createSecondaryConfig(SecondaryKeyCreator keyGenerator) {
315         SecondaryConfig answer = new SecondaryConfig();
316         answer.setKeyCreator(keyGenerator);
317         answer.setAllowCreate(true);
318         answer.setAllowPopulate(true);
319         answer.setTransactional(true);
320         return answer;
321     }
322 }