View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.store.howl;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.codehaus.activemq.message.ActiveMQMessage;
23  import org.codehaus.activemq.message.MessageAck;
24  import org.codehaus.activemq.message.Packet;
25  import org.codehaus.activemq.message.WireFormat;
26  import org.codehaus.activemq.service.MessageIdentity;
27  import org.codehaus.activemq.service.QueueMessageContainer;
28  import org.codehaus.activemq.store.MessageStore;
29  import org.codehaus.activemq.util.Callback;
30  import org.codehaus.activemq.util.JMSExceptionHelper;
31  import org.codehaus.activemq.util.TransactionTemplate;
32  import org.objectweb.howl.log.LogConfigurationException;
33  import org.objectweb.howl.log.LogException;
34  import org.objectweb.howl.log.LogRecord;
35  import org.objectweb.howl.log.Logger;
36  import org.objectweb.howl.log.ReplayListener;
37  
38  import javax.jms.JMSException;
39  import java.io.IOException;
40  import java.util.LinkedHashMap;
41  import java.util.Map;
42  
43  /***
44   * An implementation of {@link MessageStore} designed for
45   * optimal use with <a href="http://howl.objectweb.org/">Howl</a>
46   * as the transaction log and then checkpointing asynchronously
47   * on a timeout with some other persistent storage.
48   *
49   * @version $Revision: 1.3 $
50   */
51  public class HowlMessageStore implements MessageStore {
52  
53      private static final int DEFAULT_RECORD_SIZE = 64 * 1024;
54      private static final Log log = LogFactory.getLog(HowlMessageStore.class);
55  
56      private HowlPersistenceAdapter longTermPersistence;
57      private MessageStore longTermStore;
58      private Logger transactionLog;
59      private WireFormat wireFormat;
60      private TransactionTemplate transactionTemplate;
61      private int maximumCacheSize = 100;
62      private Map map = new LinkedHashMap();
63      private boolean sync = true;
64      private long lastLogMark;
65      private Exception firstException;
66  
67      public HowlMessageStore(HowlPersistenceAdapter adapter, MessageStore checkpointStore, Logger transactionLog, WireFormat wireFormat) {
68          this.longTermPersistence = adapter;
69          this.longTermStore = checkpointStore;
70          this.transactionLog = transactionLog;
71          this.wireFormat = wireFormat;
72          this.transactionTemplate = new TransactionTemplate(adapter);
73      }
74  
75  
76      /***
77       * This method is synchronized to ensure that only 1 thread can write to the log and cache
78       * and possibly checkpoint at once, to preserve order across the transaction log,
79       * cache and checkpointStore.
80       */
81      public synchronized MessageIdentity addMessage(ActiveMQMessage message) throws JMSException {
82          // write to howl
83          writePacket(message);
84  
85          // can we add it to the cache?
86          if (!addMessageToCache(message)) {
87              log.warn("Not enough RAM to store the active transaction log and so we're having to force" +
88                      "a checkpoint so that we can ensure that reads are efficient and do not have to " +
89                      "replay the transaction log");
90              checkpoint(message);
91  
92              // now lets add the current message to the checkpoint store
93              longTermStore.addMessage(message);
94          }
95          return message.getJMSMessageIdentity();
96      }
97  
98      /***
99       * Lets ensure that readers don't block writers so there only synchronization on
100      * the cache and checkpointStore.
101      */
102     public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException {
103         ActiveMQMessage answer = null;
104         synchronized (map) {
105             answer = (ActiveMQMessage) map.get(identity.getMessageID());
106         }
107         if (answer == null) {
108             answer = longTermStore.getMessage(identity);
109         }
110         return answer;
111     }
112 
113     /***
114      * Removes can be done in any order so we only synchronize on the cache and
115      * checkpointStore
116      */
117     public void removeMessage(MessageIdentity identity, MessageAck ack) throws JMSException {
118         // write to howl
119         writePacket(ack);
120 
121         synchronized (map) {
122             map.remove(identity.getMessageID());
123         }
124         longTermPersistence.onMessageRemove(this);
125     }
126 
127     /***
128      * Replays the checkpointStore first as those messages are the oldest ones,
129      * then messages are replayed from the transaction log and then
130      * the cache is updated.
131      *
132      * @param container
133      * @throws JMSException
134      */
135     public synchronized void recover(final QueueMessageContainer container) throws JMSException {
136         longTermStore.recover(container);
137 
138         // replay the transaction log, updating the cache and adding any messages to be dispatched
139         // to the container
140         firstException = null;
141         try {
142             transactionLog.replay(new ReplayListener() {
143                 LogRecord record = new LogRecord(DEFAULT_RECORD_SIZE);
144 
145                 public void onRecord(LogRecord logRecord) {
146                     readPacket(logRecord, container);
147                 }
148 
149                 public void onError(LogException e) {
150                     log.error("Error while recovering Howl transaction log: " + e, e);
151                 }
152 
153                 public LogRecord getLogRecord() {
154                     return record;
155                 }
156             });
157         }
158         catch (LogConfigurationException e) {
159             throw createRecoveryFailedException(e);
160         }
161         if (firstException != null) {
162             if (firstException instanceof JMSException) {
163                 throw (JMSException) firstException;
164             }
165             else {
166                 throw createRecoveryFailedException(firstException);
167             }
168         }
169     }
170 
171     public synchronized void start() throws JMSException {
172         longTermStore.start();
173     }
174 
175     public synchronized void stop() throws JMSException {
176         longTermStore.stop();
177     }
178 
179     /***
180      * Writes the current RAM cache to the long term, checkpoint store so that the
181      * transaction log can be truncated.
182      */
183     public synchronized void checkpoint() throws JMSException {
184         checkpoint(null);
185     }
186 
187 
188     // Properties
189     //-------------------------------------------------------------------------
190     public int getMaximumCacheSize() {
191         return maximumCacheSize;
192     }
193 
194     public void setMaximumCacheSize(int maximumCacheSize) {
195         this.maximumCacheSize = maximumCacheSize;
196     }
197 
198     // Implementation methods
199     //-------------------------------------------------------------------------
200 
201     /***
202      * Writes the current RAM image of the transaction log to stable, checkpoint store
203      *
204      * @param message is an optional message. This is null for timer based
205      *                checkpoints or is the message which cannot fit into the cache if cache-exhaustion
206      *                based checkpoints
207      * @throws JMSException
208      */
209     protected void checkpoint(final ActiveMQMessage message) throws JMSException {
210         // lets create a copy of the collection to avoid blocking readers
211         ActiveMQMessage[] temp = null;
212         synchronized (map) {
213             temp = new ActiveMQMessage[map.size()];
214             map.values().toArray(temp);
215 
216             // lets clear the map so that its next contents represent
217             // the stuff we need to checkpoint next time around
218             map.clear();
219         }
220 
221         final ActiveMQMessage[] data = temp;
222         transactionTemplate.run(new Callback() {
223             public void execute() throws Throwable {
224                 for (int i = 0, size = data.length; i < size; i++) {
225                     longTermStore.addMessage(data[i]);
226                 }
227                 if (message != null) {
228                     longTermStore.addMessage(message);
229                 }
230             }
231         });
232 
233         try {
234             transactionLog.mark(lastLogMark);
235         }
236         catch (Exception e) {
237             throw JMSExceptionHelper.newJMSException("Failed to checkpoint the Howl transaction log: " + e, e);
238         }
239     }
240 
241     /***
242      * Adds the given message to the cache if there is spare capacity
243      *
244      * @param message
245      * @return true if the message was added to the cache or false
246      */
247     protected boolean addMessageToCache(ActiveMQMessage message) {
248         synchronized (map) {
249             if (map.size() < maximumCacheSize && longTermPersistence.hasCacheCapacity(this)) {
250                 map.put(message.getJMSMessageID(), message);
251                 return true;
252             }
253         }
254         return false;
255     }
256 
257     protected void readPacket(LogRecord logRecord, QueueMessageContainer container) {
258         if (!logRecord.isCTRL() && !logRecord.isEOB() && logRecord.length > 0) {
259             try {
260                 // TODO for some wierd reason we get an unnecessary long which I'm guessing is the size
261                 Packet packet = wireFormat.fromBytes(logRecord.data, 2, logRecord.length - 2);
262                 if (packet instanceof ActiveMQMessage) {
263                     container.addMessage((ActiveMQMessage) packet);
264                 }
265                 else if (packet instanceof MessageAck) {
266                     MessageAck ack = (MessageAck) packet;
267                     container.delete(ack.getMessageIdentity(), ack);
268                 }
269                 else {
270                     log.error("Unknown type of packet in transaction log which will be discarded: " + packet);
271                 }
272             }
273             catch (Exception e) {
274                 if (firstException == null) {
275                     firstException = e;
276                 }
277             }
278         }
279     }
280 
281     /***
282      * Writes a message to the transaction log using the current sync mode
283      */
284     protected synchronized void writePacket(Packet packet) throws JMSException {
285         try {
286             byte[] data = wireFormat.toBytes(packet);
287             lastLogMark = transactionLog.put(data, sync);
288         }
289         catch (IOException e) {
290             throw createWriteException(packet, e);
291         }
292         catch (LogException e) {
293             throw createWriteException(packet, e);
294         }
295         catch (InterruptedException e) {
296             throw createWriteException(packet, e);
297         }
298     }
299 
300 
301     protected JMSException createRecoveryFailedException(Exception e) {
302         return JMSExceptionHelper.newJMSException("Failed to recover from Howl transaction log. Reason: " + e, e);
303     }
304 
305     protected JMSException createWriteException(Packet packet, Exception e) {
306         return JMSExceptionHelper.newJMSException("Failed to write to Howl transaction log for: " + packet + ". Reason: " + e, e);
307     }
308 }