1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.store.howl;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.codehaus.activemq.message.DefaultWireFormat;
23 import org.codehaus.activemq.service.impl.PersistenceAdapterSupport;
24 import org.codehaus.activemq.store.MessageStore;
25 import org.codehaus.activemq.store.PersistenceAdapter;
26 import org.codehaus.activemq.store.PreparedTransactionStore;
27 import org.codehaus.activemq.store.TopicMessageStore;
28 import org.codehaus.activemq.store.jdbm.JdbmPersistenceAdapter;
29 import org.codehaus.activemq.util.JMSExceptionHelper;
30 import org.objectweb.howl.log.Configuration;
31 import org.objectweb.howl.log.LogConfigurationException;
32 import org.objectweb.howl.log.Logger;
33
34 import javax.jms.JMSException;
35 import java.io.File;
36 import java.io.IOException;
37 import java.io.InputStream;
38 import java.util.Map;
39 import java.util.Properties;
40
41 /***
42 * An implementation of {@link PersistenceAdapter} designed for
43 * optimal use with <a href="http://howl.objectweb.org/">Howl</a>
44 * as the transaction log and then checkpointing asynchronously
45 * on a timeout with some other persistent storage.
46 *
47 * @version $Revision: 1.7 $
48 */
49 public class HowlPersistenceAdapter extends PersistenceAdapterSupport {
50
51 private static final Log log = LogFactory.getLog(HowlPersistenceAdapter.class);
52
53 private PersistenceAdapter longTermPersistence;
54 private Configuration configuration;
55 private int maximumTotalCachedMessages = 10000;
56 private int maximumCachedMessagesPerStore = 100;
57 private int cachedMessageCount;
58 private File directory;
59 private Logger transactionLog;
60
61 /***
62 * Factory method to create an instance using the defaults
63 *
64 * @param directory the directory in which to store the persistent files
65 * @return
66 * @throws JMSException
67 */
68 public static HowlPersistenceAdapter newInstance(File directory) throws JMSException {
69 return new HowlPersistenceAdapter(directory, JdbmPersistenceAdapter.newInstance(directory));
70 }
71
72 public HowlPersistenceAdapter() {
73 }
74
75 public HowlPersistenceAdapter(File directory, PersistenceAdapter longTermPersistence) {
76 this.directory = directory;
77 this.longTermPersistence = longTermPersistence;
78 }
79
80 public Map getInitialDestinations() {
81 return longTermPersistence.getInitialDestinations();
82 }
83
84 public MessageStore createQueueMessageStore(String destinationName) throws JMSException {
85 MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destinationName);
86 return new HowlMessageStore(this, checkpointStore, transactionLog, new DefaultWireFormat());
87 }
88
89 public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException {
90 /*** TODO not yet implemented for topics */
91 return longTermPersistence.createTopicMessageStore(destinationName);
92 }
93
94 public PreparedTransactionStore createPreparedTransactionStore() throws JMSException {
95
96 return longTermPersistence.createPreparedTransactionStore();
97 }
98
99 public void beginTransaction() throws JMSException {
100 }
101
102 public void commitTransaction() throws JMSException {
103 }
104
105 public void rollbackTransaction() {
106 }
107
108 public void start() throws JMSException {
109 if (transactionLog == null) {
110 if (directory != null) {
111 directory.mkdirs();
112 }
113 try {
114 transactionLog = createTransactionLog();
115 }
116 catch (Exception e) {
117 throw JMSExceptionHelper.newJMSException("Failed to create Howl based message store due to: " + e, e);
118 }
119 }
120
121 try {
122 log.info("Using Howl transaction log in directory: " + getLogFileDir());
123
124 transactionLog.open();
125 }
126 catch (Exception e) {
127 throw JMSExceptionHelper.newJMSException("Failed to open Howl transaction log: " + e, e);
128 }
129 longTermPersistence.start();
130 }
131
132 public void stop() throws JMSException {
133 try {
134 transactionLog.close();
135 }
136 catch (Exception e) {
137 throw JMSExceptionHelper.newJMSException("Failed to close Howl transaction log due to: " + e, e);
138 }
139 }
140
141 /***
142 * Return true if a store is allowed to cache a message.
143 * Called by a store when its about to store a message in its cache.
144 *
145 * @param messageStore
146 * @return true if the cache is allowed to cache the mesage
147 */
148 public synchronized boolean hasCacheCapacity(HowlMessageStore messageStore) {
149 if (cachedMessageCount < maximumTotalCachedMessages) {
150 cachedMessageCount++;
151 return true;
152 }
153 return false;
154 }
155
156 public synchronized void onMessageRemove(HowlMessageStore messageStore) {
157 cachedMessageCount--;
158 }
159
160
161
162 public PersistenceAdapter getLongTermPersistence() {
163 return longTermPersistence;
164 }
165
166 public void setLongTermPersistence(PersistenceAdapter longTermPersistence) {
167 this.longTermPersistence = longTermPersistence;
168 }
169
170 public int getMaximumCachedMessagesPerStore() {
171 return maximumCachedMessagesPerStore;
172 }
173
174 public void setMaximumCachedMessagesPerStore(int maximumCachedMessagesPerStore) {
175 this.maximumCachedMessagesPerStore = maximumCachedMessagesPerStore;
176 }
177
178 public int getMaximumTotalCachedMessages() {
179 return maximumTotalCachedMessages;
180 }
181
182 public void setMaximumTotalCachedMessages(int maximumTotalCachedMessages) {
183 this.maximumTotalCachedMessages = maximumTotalCachedMessages;
184 }
185
186 public File getDirectory() {
187 return directory;
188 }
189
190 public void setDirectory(File directory) {
191 this.directory = directory;
192 }
193
194 public Configuration getConfiguration() throws LogConfigurationException, IOException {
195 if (configuration == null) {
196 configuration = createConfiguration();
197 }
198 return configuration;
199 }
200
201 public void setConfiguration(Configuration configuration) {
202 this.configuration = configuration;
203 }
204
205 public Logger getTransactionLog() {
206 return transactionLog;
207 }
208
209 public void setTransactionLog(Logger transactionLog) {
210 this.transactionLog = transactionLog;
211 }
212
213
214
215
216 public String getBufferClassName() throws LogConfigurationException, IOException {
217 return getConfiguration().getBufferClassName();
218 }
219
220 public int getBufferSize() throws LogConfigurationException, IOException {
221 return getConfiguration().getBufferSize();
222 }
223
224 public int getFlushSleepTime() throws LogConfigurationException, IOException {
225 return getConfiguration().getFlushSleepTime();
226 }
227
228 public String getLogFileDir() throws LogConfigurationException, IOException {
229 return getConfiguration().getLogFileDir();
230 }
231
232 public String getLogFileExt() throws LogConfigurationException, IOException {
233 return getConfiguration().getLogFileExt();
234 }
235
236 public String getLogFileName() throws LogConfigurationException, IOException {
237 return getConfiguration().getLogFileName();
238 }
239
240 public int getMaxBlocksPerFile() throws LogConfigurationException, IOException {
241 return getConfiguration().getMaxBlocksPerFile();
242 }
243
244 public int getMaxBuffers() throws LogConfigurationException, IOException {
245 return getConfiguration().getMaxBuffers();
246 }
247
248 public int getMaxLogFiles() throws LogConfigurationException, IOException {
249 return getConfiguration().getMaxLogFiles();
250 }
251
252 public int getMinBuffers() throws LogConfigurationException, IOException {
253 return getConfiguration().getMinBuffers();
254 }
255
256 public int getThreadsWaitingForceThreshold() throws LogConfigurationException, IOException {
257 return getConfiguration().getThreadsWaitingForceThreshold();
258 }
259
260 public boolean isChecksumEnabled() throws LogConfigurationException, IOException {
261 return getConfiguration().isChecksumEnabled();
262 }
263
264 public void setBufferClassName(String s) throws LogConfigurationException, IOException {
265 getConfiguration().setBufferClassName(s);
266 }
267
268 public void setBufferSize(int i) throws LogConfigurationException, IOException {
269 getConfiguration().setBufferSize(i);
270 }
271
272 public void setChecksumEnabled(boolean b) throws LogConfigurationException, IOException {
273 getConfiguration().setChecksumEnabled(b);
274 }
275
276 public void setFlushSleepTime(int i) throws LogConfigurationException, IOException {
277 getConfiguration().setFlushSleepTime(i);
278 }
279
280 public void setLogFileDir(String s) throws LogConfigurationException, IOException {
281 getConfiguration().setLogFileDir(s);
282 }
283
284 public void setLogFileExt(String s) throws LogConfigurationException, IOException {
285 getConfiguration().setLogFileExt(s);
286 }
287
288 public void setLogFileName(String s) throws LogConfigurationException, IOException {
289 getConfiguration().setLogFileName(s);
290 }
291
292 public void setMaxBlocksPerFile(int i) throws LogConfigurationException, IOException {
293 getConfiguration().setMaxBlocksPerFile(i);
294 }
295
296 public void setMaxBuffers(int i) throws LogConfigurationException, IOException {
297 getConfiguration().setMaxBuffers(i);
298 }
299
300 public void setMaxLogFiles(int i) throws LogConfigurationException, IOException {
301 getConfiguration().setMaxLogFiles(i);
302 }
303
304 public void setMinBuffers(int i) throws LogConfigurationException, IOException {
305 getConfiguration().setMinBuffers(i);
306 }
307
308 public void setThreadsWaitingForceThreshold(int i) throws LogConfigurationException, IOException {
309 getConfiguration().setThreadsWaitingForceThreshold(i);
310 }
311
312
313
314
315
316 protected Logger createTransactionLog() throws IOException, LogConfigurationException {
317 return new Logger(getConfiguration());
318 }
319
320 protected Configuration createConfiguration() throws IOException, LogConfigurationException {
321 String[] names = {"org/codehaus/activemq/howl.properties", "org/codehaus/activemq/defaultHowl.properties"};
322
323 Configuration answer = null;
324 for (int i = 0; i < names.length; i++) {
325 InputStream in = Thread.currentThread().getContextClassLoader().getResourceAsStream(names[i]);
326 if (in == null) {
327 in = getClass().getClassLoader().getResourceAsStream(names[i]);
328 }
329 if (in != null) {
330 Properties properties = new Properties();
331 properties.load(in);
332 answer = new Configuration(properties);
333 }
334 }
335 if (answer == null) {
336 log.warn("Could not find file: " + names[0] + " or: " + names[1] + " on the classpath to initialise Howl");
337 answer = new Configuration();
338 }
339 if (directory != null) {
340 answer.setLogFileDir(directory.getAbsolutePath());
341 }
342 return answer;
343 }
344 }