1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.service;
19  
20  import com.sleepycat.je.DatabaseEntry;
21  import junit.framework.TestCase;
22  import org.codehaus.activemq.broker.BrokerClient;
23  import org.codehaus.activemq.broker.impl.BrokerClientImpl;
24  import org.codehaus.activemq.filter.DestinationFilter;
25  import org.codehaus.activemq.filter.FilterFactoryImpl;
26  import org.codehaus.activemq.message.ActiveMQDestination;
27  import org.codehaus.activemq.message.ActiveMQMessage;
28  import org.codehaus.activemq.message.ActiveMQTextMessage;
29  import org.codehaus.activemq.message.ConsumerInfo;
30  import org.codehaus.activemq.service.impl.DispatcherImpl;
31  import org.codehaus.activemq.service.impl.DurableQueueMessageContainerManager;
32  import org.codehaus.activemq.service.impl.DurableTopicMessageContainerManager;
33  import org.codehaus.activemq.service.impl.DurableTopicSubscription;
34  import org.codehaus.activemq.service.impl.DurableTopicSubscriptionContainerImpl;
35  import org.codehaus.activemq.store.PersistenceAdapter;
36  import org.codehaus.activemq.util.Callback;
37  import org.codehaus.activemq.util.IdGenerator;
38  import org.codehaus.activemq.util.TransactionTemplate;
39  
40  import javax.jms.JMSException;
41  import javax.jms.TextMessage;
42  import java.io.IOException;
43  
44  /***
45   * @version $Revision: 1.13 $
46   */
47  public abstract class MessageStoreTestSupport extends TestCase {
48      protected PersistenceAdapter persistenceAapter;
49      protected MessageContainer container;
50      protected Subscription subscription;
51      protected int publishMessageCount = 10;
52      protected int ackCount = 5;
53      protected ActiveMQMessage[] messages;
54      protected ActiveMQDestination destination;
55      protected IdGenerator idGenerator = new IdGenerator();
56      protected MessageContainerManager messageContainerManager;
57      protected BrokerClient client = new BrokerClientImpl();
58      protected TransactionTemplate template;
59  
60      public void testRecovery() throws Exception {
61          System.out.println("Publishing: " + publishMessageCount + " messages");
62  
63          for (int i = 0; i < publishMessageCount; i++) {
64              doAddMessage(i);
65          }
66  
67          dumpMessageIdentities("After add");
68  
69          assertDeliveryList(0, publishMessageCount);
70  
71          // now lets ack messages
72          System.out.println("Acknowledging the first: " + ackCount + " messages");
73          for (int i = 0; i < ackCount; i++) {
74              doAcknowledgeMessage(i);
75          }
76  
77          // no more left to dispatch
78          assertDeliveryList(0, 0);
79  
80          dumpMessageIdentities("After ack of first part");
81  
82          // now lets shut things down and then recover
83          closeAndReopenContainer();
84  
85          assertDeliveryList(ackCount, publishMessageCount);
86  
87          dumpMessageIdentities("About to perform final ack");
88  
89          for (int i = ackCount; i < publishMessageCount; i++) {
90              doAcknowledgeMessage(i);
91          }
92      }
93  
94      public void testRecoveryOfNewConsumerWhichHasYetToAck() throws Exception {
95  
96          assertDeliveryList(0, publishMessageCount);
97  
98          // no more left to dispatch
99          assertDeliveryList(0, 0);
100 
101         // now lets shut things down and then recover
102         closeAndReopenContainer();
103 
104         assertDeliveryList(0, publishMessageCount);
105     }
106 
107     protected abstract void acknowledgeMessage(int i) throws JMSException;
108 
109     protected abstract PersistenceAdapter createPersistenceAdapter() throws IOException, Exception;
110 
111     protected abstract ActiveMQDestination createDestination();
112 
113     protected abstract ActiveMQMessage[] getMessagesToDispatch() throws JMSException;
114 
115 
116     protected void doAcknowledgeMessage(final int i) throws JMSException {
117         template.run(new Callback() {
118             public void execute() throws Throwable {
119                 acknowledgeMessage(i);
120             }
121         });
122     }
123 
124     protected void doAddMessage(int i) throws JMSException {
125         final ActiveMQMessage message = getMessage(i);
126         template.run(new Callback() {
127             public void execute() throws Throwable {
128                 container.addMessage(message);
129             }
130         });
131     }
132 
133     protected void dumpMessageIdentities(String text) throws JMSException {
134         System.out.println("#### Dumping identities at: " + text);
135         for (int i = 0; i < publishMessageCount; i++) {
136             ActiveMQMessage message = getMessage(i);
137             MessageIdentity identity = message.getJMSMessageIdentity();
138             Object sequenceNo = identity.getSequenceNumber();
139             String sequenceText = null;
140             if (sequenceNo != null) {
141                 sequenceText = sequenceNo.toString();
142                 if (sequenceNo instanceof DatabaseEntry) {
143                     DatabaseEntry entry = (DatabaseEntry) sequenceNo;
144                     byte[] data = entry.getData();
145                     sequenceText = asText(data);
146                 }
147             }
148             System.out.println("item: " + i + " is: " + sequenceText);
149         }
150         System.out.println();
151     }
152 
153     protected String asText(byte[] data) {
154         StringBuffer buffer = new StringBuffer("[ ");
155         for (int i = 0; i < data.length; i++) {
156             if (i > 0) {
157                 buffer.append(", ");
158             }
159             buffer.append(Byte.toString(data[i]));
160         }
161         buffer.append(" ]");
162         return buffer.toString();
163     }
164 
165 
166     protected MessageContainer createTopicMessageContainer() throws JMSException {
167         if (destination.isTopic()) {
168             return persistenceAapter.createTopicMessageContainer(destination.toString());
169         }
170         else {
171             return persistenceAapter.createQueueMessageContainer(destination.toString());
172         }
173     }
174 
175     protected Subscription createSubscription() throws JMSException {
176         DestinationFilter filter = DestinationFilter.parseFilter(destination);
177         ConsumerInfo consumerInfo = createConsumerInfo();
178 
179         // lets register the subscription with the manager
180         messageContainerManager.addMessageConsumer(client, consumerInfo);
181 
182         return new DurableTopicSubscription(new DispatcherImpl(), client, consumerInfo, filter, new RedeliveryPolicy());
183     }
184 
185     protected ConsumerInfo createConsumerInfo() {
186         ConsumerInfo answer = new ConsumerInfo();
187         answer.setClientId(getClientID());
188         answer.setConsumerId(idGenerator.generateId());
189         answer.setConsumerName(getConsumerName());
190         answer.setDestination(destination);
191         answer.setPrefetchNumber(100);
192         answer.setSessionId(idGenerator.generateId());
193         answer.setStarted(true);
194         return answer;
195     }
196 
197     protected String getConsumerName() {
198         return getName();
199     }
200 
201     protected String getClientID() {
202         return getClass().getName();
203     }
204 
205     protected void setUp() throws Exception {
206         super.setUp();
207         this.messages = new ActiveMQMessage[publishMessageCount];
208         this.destination = createDestination();
209 
210         this.persistenceAapter = createPersistenceAdapter();
211         persistenceAapter.start();
212 
213         template = new TransactionTemplate(persistenceAapter);
214 
215         this.messageContainerManager = createMessageContainerManager();
216 
217         this.container = messageContainerManager.getContainer(this.destination.getPhysicalName());
218         assertTrue("Should have created a container", container != null);
219 
220         this.subscription = createSubscription();
221 
222     }
223 
224     protected void tearDown() throws Exception {
225         messageContainerManager.stop();
226         persistenceAapter.stop();
227         super.tearDown();
228     }
229 
230     protected MessageContainerManager createMessageContainerManager() {
231         if (destination.isTopic()) {
232             return new DurableTopicMessageContainerManager(persistenceAapter, new DurableTopicSubscriptionContainerImpl(new RedeliveryPolicy()), new FilterFactoryImpl(), new DispatcherImpl());
233         }
234         else {
235             return new DurableQueueMessageContainerManager(persistenceAapter, new DurableTopicSubscriptionContainerImpl(new RedeliveryPolicy()), new FilterFactoryImpl(), new DispatcherImpl());
236         }
237     }
238 
239     protected void assertDeliveryList(final int startIndex, final int lastIndex) throws JMSException {
240         template.run(new Callback() {
241             public void execute() throws Throwable {
242                 ActiveMQMessage[] messagesToDispatch = getMessagesToDispatch();
243                 int count = lastIndex - startIndex;
244                 assertTrue("Not enough messages available to dispatch. Expected: " + count
245                         + " messages but was: " + messagesToDispatch.length, messagesToDispatch.length >= count);
246 
247                 for (int i = 0; i < count; i++) {
248                     ActiveMQMessage expected = getMessage(i + startIndex);
249                     ActiveMQMessage actual = messagesToDispatch[i];
250                     assertMessagesEqual("Dispatched message at index: " + i, expected, actual);
251                 }
252             }
253         });
254     }
255 
256     protected void assertMessagesEqual(String description, ActiveMQMessage expected, ActiveMQMessage actual) throws JMSException {
257         assertEquals("MessageText compare. " + description, ((TextMessage) expected).getText(), ((TextMessage) actual).getText());
258         assertEquals("MessageID compare. " + description + " expected: " + expected + " actual: " + actual, expected.getJMSMessageID(), actual.getJMSMessageID());
259         assertEquals(description, expected, actual);
260     }
261 
262     protected ActiveMQMessage getMessage(int i) throws JMSException {
263         if (messages[i] == null) {
264             messages[i] = createMessage(i);
265         }
266         return messages[i];
267     }
268 
269     protected ActiveMQMessage createMessage(int i) throws JMSException {
270         ActiveMQTextMessage answer = new ActiveMQTextMessage();
271         answer.setJMSMessageID(idGenerator.generateId());
272         answer.setJMSClientID(getClientID());
273         answer.setJMSDestination(destination);
274         answer.setText("message index: " + i);
275         return answer;
276     }
277 
278     protected void closeAndReopenContainer() throws Exception {
279         subscription.clear();
280 
281         messageContainerManager.stop();
282         persistenceAapter.stop();
283 
284         persistenceAapter = createPersistenceAdapter();
285         persistenceAapter.start();
286 
287         template = new TransactionTemplate(persistenceAapter);
288 
289         this.messageContainerManager = createMessageContainerManager();
290 
291         container = messageContainerManager.getContainer(destination.getPhysicalName());
292 
293         this.subscription = createSubscription();
294 
295         template.run(new Callback() {
296             public void execute() throws Throwable {
297                 recover();
298             }
299         });
300     }
301 
302     protected void recover() throws JMSException {
303     }
304 
305     protected String getSubject() {
306         return getClass().getName() + "." + getName();
307     }
308 }