View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Hiram Chirino
4    * Copyright 2004 Protique Ltd
5    * 
6    * Licensed under the Apache License, Version 2.0 (the "License"); 
7    * you may not use this file except in compliance with the License. 
8    * You may obtain a copy of the License at 
9    * 
10   * http://www.apache.org/licenses/LICENSE-2.0
11   * 
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS, 
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
15   * See the License for the specific language governing permissions and 
16   * limitations under the License. 
17   * 
18   **/
19  package org.codehaus.activemq.store.cache;
20  
21  import javax.jms.JMSException;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.codehaus.activemq.message.ActiveMQMessage;
26  import org.codehaus.activemq.message.MessageAck;
27  import org.codehaus.activemq.service.MessageIdentity;
28  import org.codehaus.activemq.service.QueueMessageContainer;
29  import org.codehaus.activemq.store.MessageStore;
30  
31  /***
32   * A MessageStore that uses an in memory cache to speed up getMessage() method calls.
33   * 
34   * @version $Revision: 1.2 $
35   */
36  public class CacheMessageStore implements MessageStore, CacheMessageStoreAware {
37  
38  	private static final Log log = LogFactory.getLog(CacheMessageStore.class);
39  	
40  	private final CachePersistenceAdapter peristenceAdapter;
41  	private final MessageStore longTermStore;	
42  	private final MessageCache cache;
43  
44  	public CacheMessageStore(CachePersistenceAdapter adapter, MessageStore longTermStore, MessageCache cache) {
45  		this.peristenceAdapter = adapter;
46  		this.longTermStore = longTermStore;
47  		this.cache = cache;
48  		
49  		// Make any downstream CacheMessageStoreAware objects aware of us.
50  		setCacheMessageStore(this);
51  	}
52  
53  	/***
54  	 * Add the meessage to the long term store and cache it.
55  	 */
56  	public MessageIdentity addMessage(ActiveMQMessage message) throws JMSException {		
57  		MessageIdentity messageIdentity = longTermStore.addMessage(message);
58  		cache.put(messageIdentity.getMessageID(), message);		
59  		return messageIdentity;
60  	}
61  	
62  	/***
63  	 * Remove the meessage to the long term store and remove it from the cache.
64  	 */
65  	public void removeMessage(MessageIdentity identity, MessageAck ack) throws JMSException {		
66  		longTermStore.removeMessage( identity, ack);
67  		cache.remove(identity.getMessageID());		
68  	}
69  
70  	/***
71  	 * Return the message from the cache or go to the longTermStore if it is not 
72  	 * in there. 
73  	 */
74  	public ActiveMQMessage getMessage(MessageIdentity identity)	throws JMSException {
75  		ActiveMQMessage answer=null;
76  		answer = cache.get(identity.getMessageID());
77  		if( answer!=null )
78  			return answer;
79  		
80  		answer = longTermStore.getMessage(identity);
81  		cache.put(identity.getMessageID(), answer);
82  		return answer;
83  	}
84  
85  	/***
86  	 * Replays the checkpointStore first as those messages are the oldest ones,
87  	 * then messages are replayed from the transaction log and then the cache is
88  	 * updated.
89  	 * 
90  	 * @param container
91  	 * @throws JMSException
92  	 */
93  	public synchronized void recover(final QueueMessageContainer container)
94  			throws JMSException {
95  		longTermStore.recover(container);
96  	}
97  
98  	public void start() throws JMSException {
99  		longTermStore.start();
100 	}
101 
102 	public void stop() throws JMSException {
103 		longTermStore.stop();
104 	}
105 
106 	/***
107 	 * @return Returns the longTermStore.
108 	 */
109 	public MessageStore getLongTermStore() {
110 		return longTermStore;
111 	}
112 
113 	/***
114 	 * @see org.codehaus.activemq.store.cache.CacheMessageStoreAware#setCacheMessageStore(org.codehaus.activemq.store.cache.CacheMessageStore)
115 	 */
116 	public void setCacheMessageStore(CacheMessageStore store) {
117 		// Make any downstream CacheMessageStoreAware objects aware of us.
118 		// This cache implementation could be cached by another cache.
119 		if( longTermStore instanceof CacheMessageStoreAware ) {
120 			((CacheMessageStoreAware)longTermStore).setCacheMessageStore(store);
121 		}
122 	}
123 	
124 }