1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 * Copyright 2004 Protique Ltd
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 **/
19 package org.codehaus.activemq.store.journal;
20
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.Iterator;
24
25 import javax.jms.JMSException;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.codehaus.activemq.journal.RecordLocation;
30 import org.codehaus.activemq.message.ActiveMQMessage;
31 import org.codehaus.activemq.message.MessageAck;
32 import org.codehaus.activemq.service.MessageIdentity;
33 import org.codehaus.activemq.service.QueueMessageContainer;
34 import org.codehaus.activemq.store.MessageStore;
35 import org.codehaus.activemq.store.cache.CacheMessageStore;
36 import org.codehaus.activemq.store.cache.CacheMessageStoreAware;
37 import org.codehaus.activemq.util.Callback;
38 import org.codehaus.activemq.util.TransactionTemplate;
39
40 /***
41 * A MessageStore that uses a Journal to store it's messages.
42 *
43 * @version $Revision: 1.7 $
44 */
45 public class JournalMessageStore implements MessageStore, CacheMessageStoreAware {
46
47 private static final Log log = LogFactory.getLog(JournalMessageStore.class);
48
49 private final static class AckData {
50 private final RecordLocation location;
51 private final MessageAck ack;
52 AckData(MessageAck ack, RecordLocation location) {
53 this.ack = ack;
54 this.location = location;
55 }
56 }
57
58 private final JournalPersistenceAdapter peristenceAdapter;
59 private final MessageStore longTermStore;
60 private final String destinationName;
61 private final TransactionTemplate transactionTemplate;
62
63 private HashMap addedMessageLocations = new HashMap();
64 private ArrayList removedMessageLocations = new ArrayList();
65
66 /*** A MessageStore that we can use to retreive messages quickly. */
67 private MessageStore cacheMessageStore = this;
68
69 private boolean sync = true;
70
71 public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, String destinationName, boolean sync) {
72 this.peristenceAdapter = adapter;
73 this.longTermStore = checkpointStore;
74 this.destinationName = destinationName;
75 this.sync=sync;
76 this.transactionTemplate = new TransactionTemplate(adapter);
77 }
78
79 /***
80 * Not synchronized since the Journal has better throughput if you increase
81 * the number of conncurrent writes that it is doing.
82 */
83 public MessageIdentity addMessage(ActiveMQMessage message) throws JMSException {
84 boolean sync = message.isReceiptRequired();
85 RecordLocation location = peristenceAdapter.writePacket(destinationName, message, sync);
86 synchronized(this) {
87 addedMessageLocations.put(message.getJMSMessageIdentity(), location);
88 }
89
90
91
92 MessageIdentity messageIdentity = message.getJMSMessageIdentity();
93 messageIdentity.setSequenceNumber(location);
94 return messageIdentity;
95 }
96
97 /***
98 */
99 public void removeMessage(MessageIdentity identity, MessageAck ack)
100 throws JMSException {
101
102 RecordLocation ackLocation = peristenceAdapter.writePacket(destinationName, ack, sync);
103
104 synchronized(this) {
105 RecordLocation addLocation = (RecordLocation) addedMessageLocations.remove(identity);
106 if( addLocation==null ) {
107 removedMessageLocations.add(new AckData(ack, ackLocation));
108 }
109 }
110 }
111
112 /***
113 * @return
114 * @throws JMSException
115 */
116 public RecordLocation checkpoint() throws JMSException {
117 final RecordLocation rc[] = new RecordLocation[]{null};
118
119
120 final ArrayList addedMessageIdentitys;
121 final ArrayList removedMessageLocations;
122 synchronized(this) {
123 addedMessageIdentitys = new ArrayList(this.addedMessageLocations.keySet());
124 removedMessageLocations = this.removedMessageLocations;
125 this.removedMessageLocations = new ArrayList();
126 }
127
128 transactionTemplate.run(new Callback() {
129 public void execute() throws Throwable {
130
131
132 Iterator iterator = addedMessageIdentitys.iterator();
133 while (iterator.hasNext()) {
134 MessageIdentity identity = (MessageIdentity) iterator.next();
135
136 ActiveMQMessage msg = getCacheMessage(identity);
137 longTermStore.addMessage(msg);
138 synchronized(this) {
139 RecordLocation location = (RecordLocation)addedMessageLocations.remove(identity);
140 if( rc[0]==null || rc[0].compareTo(location)<0 ) {
141 rc[0] = location;
142 }
143 }
144
145 }
146
147
148 iterator = removedMessageLocations.iterator();
149 while (iterator.hasNext()) {
150 AckData data = (AckData)iterator.next();
151 longTermStore.removeMessage(data.ack.getMessageIdentity(),data.ack);
152
153 if( rc[0]==null || rc[0].compareTo(data.location)<0 ) {
154 rc[0] = data.location;
155 }
156 }
157
158 }
159
160 });
161
162 return rc[0];
163 }
164
165 private ActiveMQMessage getCacheMessage(MessageIdentity identity) throws JMSException {
166 return cacheMessageStore.getMessage(identity);
167 }
168
169 /***
170 *
171 */
172 public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException {
173 ActiveMQMessage answer=null;
174
175 Object location = identity.getSequenceNumber();
176 if( location==null ) {
177
178 synchronized(this) {
179 location = addedMessageLocations.get(identity);
180 }
181 }
182
183 if(location!=null && location instanceof RecordLocation) {
184 answer = (ActiveMQMessage) peristenceAdapter.readPacket((RecordLocation)location);
185 if( answer !=null )
186 return answer;
187 }
188
189
190 return longTermStore.getMessage(identity);
191 }
192
193 /***
194 * Replays the checkpointStore first as those messages are the oldest ones,
195 * then messages are replayed from the transaction log and then the cache is
196 * updated.
197 *
198 * @param container
199 * @throws JMSException
200 */
201 public synchronized void recover(final QueueMessageContainer container)
202 throws JMSException {
203 longTermStore.recover(container);
204 }
205
206 public void start() throws JMSException {
207 longTermStore.start();
208 }
209
210 public void stop() throws JMSException {
211 longTermStore.stop();
212 }
213
214 /***
215 * @return Returns the longTermStore.
216 */
217 public MessageStore getLongTermStore() {
218 return longTermStore;
219 }
220
221 /***
222 * @see org.codehaus.activemq.store.cache.CacheMessageStoreAware#setCacheMessageStore(org.codehaus.activemq.store.cache.CacheMessageStore)
223 */
224 public void setCacheMessageStore(CacheMessageStore store) {
225 cacheMessageStore = store;
226
227 if( longTermStore instanceof CacheMessageStoreAware ) {
228 ((CacheMessageStoreAware)longTermStore).setCacheMessageStore(store);
229 }
230 }
231 }