1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.store.jdbm;
19
20 import jdbm.RecordManager;
21 import jdbm.RecordManagerFactory;
22 import jdbm.btree.BTree;
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.codehaus.activemq.AlreadyClosedException;
26 import org.codehaus.activemq.service.impl.PersistenceAdapterSupport;
27 import org.codehaus.activemq.store.MessageStore;
28 import org.codehaus.activemq.store.PreparedTransactionStore;
29 import org.codehaus.activemq.store.TopicMessageStore;
30 import org.codehaus.activemq.util.DefaultComparator;
31 import org.codehaus.activemq.util.JMSExceptionHelper;
32
33 import javax.jms.JMSException;
34 import java.io.File;
35 import java.io.IOException;
36 import java.util.Comparator;
37 import java.util.Map;
38 import java.util.Properties;
39
40 /***
41 * A {@link org.codehaus.activemq.store.PersistenceAdapter} implementation for
42 * <a href="http://jdbm.sf.net/">JDBM</a>
43 *
44 * @version $Revision: 1.6 $
45 */
46 public class JdbmPersistenceAdapter extends PersistenceAdapterSupport {
47
48 private static final Log log = LogFactory.getLog(JdbmPersistenceAdapter.class);
49
50 private RecordManager manager;
51 private File directory = new File("ActiveMQ");
52 private Properties properties;
53
54 /***
55 * Factory method to create an instance using the defaults
56 *
57 * @param directory the directory in which to store the persistent files
58 * @return
59 * @throws JMSException
60 */
61 public static JdbmPersistenceAdapter newInstance(File directory) throws JMSException {
62 return new JdbmPersistenceAdapter(directory);
63 }
64
65
66 public JdbmPersistenceAdapter() {
67 }
68
69 public JdbmPersistenceAdapter(File directory) {
70 this.directory = directory;
71 }
72
73 public JdbmPersistenceAdapter(RecordManager manager) {
74 this.manager = manager;
75 }
76
77 public Map getInitialDestinations() {
78 return null; /*** TODO */
79 }
80
81 public MessageStore createQueueMessageStore(String destinationName) throws JMSException {
82 try {
83 BTree messageDb = createDatabase("Queue_" + destinationName);
84 BTree sequenceDb = createDatabase("Sequence_Queue_" + destinationName);
85 JdbmMessageStore messageStore = new JdbmMessageStore(messageDb, sequenceDb);
86 return messageStore;
87 }
88 catch (IOException e) {
89 throw JMSExceptionHelper.newJMSException("Failed to create a QueueMessageContainer for destination: " + destinationName + ". Reason: " + e, e);
90 }
91 }
92
93 public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException {
94 try {
95 BTree messageDb = createDatabase("Topic_" + destinationName);
96 BTree sequenceDb = createDatabase("Sequence_Topic_" + destinationName);
97 BTree consumerAckDb = createDatabase("Consumer_Acks_Topic_" + destinationName);
98 BTree subscriberDb = createDatabase("Subscriber_" + destinationName);
99 BTree messageCountDb = createDatabase("MessageCount_Topic_" + destinationName);
100 JdbmTopicMessageStore messageStore = new JdbmTopicMessageStore(messageDb, sequenceDb, consumerAckDb, subscriberDb, messageCountDb);
101 return messageStore;
102 }
103 catch (IOException e) {
104 throw JMSExceptionHelper.newJMSException("Failed to create a TopicMessageContainer for destination: " + destinationName + ". Reason: " + e, e);
105 }
106 }
107
108 public PreparedTransactionStore createPreparedTransactionStore() throws JMSException {
109 try {
110 return new JdbmPreparedTransactionStore(createDatabase("XaPrepareTxnDb"));
111 }
112 catch (IOException e) {
113 throw JMSExceptionHelper.newJMSException("Could not create XA Prepare Transaction Database. Reason: " + e, e);
114 }
115 }
116
117 public void beginTransaction() {
118 }
119
120 public void commitTransaction() throws JMSException {
121 try {
122 manager.commit();
123 }
124 catch (IOException e) {
125 throw JMSExceptionHelper.newJMSException("Could not commit transaction. Reason: " + e, e);
126 }
127 }
128
129 public void rollbackTransaction() {
130 try {
131 manager.rollback();
132 }
133 catch (IOException e) {
134 log.error("Could not rollback transaction. Reason: " + e, e);
135 }
136 }
137
138 public void start() throws JMSException {
139 if (manager == null) {
140 directory.mkdirs();
141
142 log.info("Creating JDBM based message store in directory: " + directory.getAbsolutePath());
143
144 try {
145 String name = directory.getAbsolutePath() + "/Store";
146 if (properties != null) {
147 manager = RecordManagerFactory.createRecordManager(name, properties);
148 }
149 else {
150 manager = RecordManagerFactory.createRecordManager(name);
151 }
152 }
153 catch (IOException e) {
154 throw JMSExceptionHelper.newJMSException("Failed to create JDBM persistent store at directory: "
155 + directory + ". Reason: " + e, e);
156 }
157 }
158 }
159
160 public synchronized void stop() throws JMSException {
161 if (manager != null) {
162 try {
163 manager.close();
164 }
165 catch (IOException e) {
166 throw JMSExceptionHelper.newJMSException("Failed to close PersistenceAdapter. Reason: " + e, e);
167 }
168 finally {
169 manager = null;
170 }
171 }
172 }
173
174
175
176 public RecordManager getManager() {
177 return manager;
178 }
179
180 public void setManager(RecordManager manager) {
181 this.manager = manager;
182 }
183
184 public File getDirectory() {
185 return directory;
186 }
187
188 public void setDirectory(File directory) {
189 this.directory = directory;
190 }
191
192
193
194 public synchronized BTree createDatabase(String name) throws IOException, AlreadyClosedException {
195 if (manager == null) {
196 throw new AlreadyClosedException("JDBM PersistenceAdapter");
197 }
198
199
200 long recid = manager.getNamedObject(name);
201 BTree tree = null;
202 if (recid != 0) {
203 tree = BTree.load(manager, recid);
204 }
205 else {
206 Comparator comparator = new DefaultComparator();
207
208 tree = BTree.createInstance(manager, comparator);
209 manager.setNamedObject(name, tree.getRecid());
210 }
211 return tree;
212 }
213
214 }