1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.service.impl;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.codehaus.activemq.message.ActiveMQMessage;
23 import org.codehaus.activemq.message.MessageAck;
24 import org.codehaus.activemq.service.MessageIdentity;
25 import org.codehaus.activemq.service.QueueList;
26 import org.codehaus.activemq.service.QueueListEntry;
27 import org.codehaus.activemq.service.QueueMessageContainer;
28 import org.codehaus.activemq.store.MessageStore;
29 import org.codehaus.activemq.store.PersistenceAdapter;
30 import org.codehaus.activemq.util.Callback;
31 import org.codehaus.activemq.util.TransactionTemplate;
32
33 import javax.jms.JMSException;
34
35 /***
36 * A default implemenation of a Durable Queue based
37 * {@link org.codehaus.activemq.service.MessageContainer}
38 * which acts as an adapter between the {@link org.codehaus.activemq.service.MessageContainerManager}
39 * requirements and those of the persistent {@link MessageStore} implementations.
40 *
41 * @version $Revision: 1.19 $
42 */
43 public class DurableQueueMessageContainer implements QueueMessageContainer {
44 private static final Log log = LogFactory.getLog(DurableQueueMessageContainer.class);
45
46 private MessageStore messageStore;
47 private String destinationName;
48
49 /***
50 * messages to be delivered
51 */
52 private QueueList messagesToBeDelivered;
53 /***
54 * messages that have been delivered but not acknowledged
55 */
56 private QueueList deliveredMessages;
57 private PersistenceAdapter persistenceAdapter;
58 private TransactionTemplate transactionTemplate;
59
60 public DurableQueueMessageContainer(PersistenceAdapter persistenceAdapter, MessageStore messageStore, String destinationName) {
61 this(persistenceAdapter, messageStore, destinationName, new DefaultQueueList(), new DefaultQueueList());
62 }
63
64 public DurableQueueMessageContainer(PersistenceAdapter persistenceAdapter, MessageStore messageStore, String destinationName, QueueList messagesToBeDelivered, QueueList deliveredMessages) {
65 this.persistenceAdapter = persistenceAdapter;
66 this.messageStore = messageStore;
67 this.destinationName = destinationName;
68 this.messagesToBeDelivered = messagesToBeDelivered;
69 this.deliveredMessages = deliveredMessages;
70 this.transactionTemplate = new TransactionTemplate(persistenceAdapter);
71 }
72
73 public String getDestinationName() {
74 return destinationName;
75 }
76
77 public MessageIdentity addMessage(ActiveMQMessage message) throws JMSException {
78 MessageIdentity answer = messageStore.addMessage(message);
79 synchronized( this ) {
80 messagesToBeDelivered.add(answer);
81 }
82 return answer;
83
84 }
85
86 public synchronized void delete(MessageIdentity messageID, MessageAck ack) throws JMSException {
87
88
89
90 MessageIdentity storedIdentity=null;
91
92 synchronized( this ) {
93 QueueListEntry entry = deliveredMessages.getFirstEntry();
94 while (entry != null) {
95 MessageIdentity identity = (MessageIdentity) entry.getElement();
96 if (messageID.equals(identity)) {
97 deliveredMessages.remove(entry);
98 storedIdentity=identity;
99 break;
100 }
101 entry = deliveredMessages.getNextEntry(entry);
102 }
103
104 if (storedIdentity==null) {
105
106
107 entry = messagesToBeDelivered.getFirstEntry();
108 while (entry != null) {
109 MessageIdentity identity = (MessageIdentity) entry.getElement();
110 if (messageID.equals(identity)) {
111 messagesToBeDelivered.remove(entry);
112 storedIdentity=identity;
113 break;
114 }
115 entry = messagesToBeDelivered.getNextEntry(entry);
116 }
117 }
118 }
119
120 if (storedIdentity==null) {
121 log.error("Attempt to acknowledge unknown messageID: " + messageID);
122 } else {
123 messageStore.removeMessage(storedIdentity, ack);
124 }
125
126 }
127
128 public ActiveMQMessage getMessage(MessageIdentity messageID) throws JMSException {
129 return messageStore.getMessage(messageID);
130 }
131
132
133 public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException {
134 /*** TODO: make more optimal implementation */
135 return getMessage(messageIdentity) != null;
136 }
137
138 /***
139 * Does nothing since when we receive an acknowledgement on a queue
140 * we can delete the message
141 *
142 * @param messageIdentity
143 */
144 public void registerMessageInterest(MessageIdentity messageIdentity) {
145 }
146
147 /***
148 * Does nothing since when we receive an acknowledgement on a queue
149 * we can delete the message
150 *
151 * @param messageIdentity
152 * @param ack
153 */
154 public void unregisterMessageInterest(MessageIdentity messageIdentity, MessageAck ack) {
155 }
156
157 public ActiveMQMessage poll() throws JMSException {
158 ActiveMQMessage message = null;
159 MessageIdentity messageIdentity=null;
160 synchronized( this ) {
161 messageIdentity = (MessageIdentity) messagesToBeDelivered.removeFirst();
162 if (messageIdentity != null) {
163 deliveredMessages.add(messageIdentity);
164 }
165 }
166 if (messageIdentity != null) {
167 message = messageStore.getMessage(messageIdentity);
168 }
169 return message;
170 }
171
172 public ActiveMQMessage peekNext(MessageIdentity messageID) throws JMSException {
173 ActiveMQMessage answer = null;
174 MessageIdentity identity = null;
175 synchronized( this ) {
176 if (messageID == null) {
177 identity = (MessageIdentity) messagesToBeDelivered.getFirst();
178 }
179 else {
180 int index = messagesToBeDelivered.indexOf(messageID);
181 if (index >= 0 && (index + 1) < messagesToBeDelivered.size()) {
182 identity = (MessageIdentity) messagesToBeDelivered.get(index + 1);
183 }
184 }
185
186 }
187 if (identity != null) {
188 answer = messageStore.getMessage(identity);
189 }
190 return answer;
191 }
192
193
194 public synchronized void returnMessage(MessageIdentity messageIdentity) throws JMSException {
195 boolean result = deliveredMessages.remove(messageIdentity);
196 messagesToBeDelivered.addFirst(messageIdentity);
197 }
198
199 /***
200 * called to reset dispatch pointers if a new Message Consumer joins
201 *
202 * @throws javax.jms.JMSException
203 */
204 public synchronized void reset() throws JMSException {
205
206 int count = 0;
207 MessageIdentity messageIdentity = (MessageIdentity) deliveredMessages.removeFirst();
208 while (messageIdentity != null) {
209 messagesToBeDelivered.add(count++, messageIdentity);
210 messageIdentity = (MessageIdentity) deliveredMessages.removeFirst();
211 }
212 }
213
214 public synchronized void start() throws JMSException {
215 final QueueMessageContainer container = this;
216 transactionTemplate.run(new Callback() {
217 public void execute() throws Throwable {
218 messageStore.start();
219 messageStore.recover(container);
220 }
221 });
222
223 }
224
225 public synchronized void recoverMessageToBeDelivered(MessageIdentity messageIdentity) throws JMSException {
226 messagesToBeDelivered.add(messageIdentity);
227 }
228
229 public void stop() throws JMSException {
230 messageStore.stop();
231 }
232 }