1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.usecases;
19  
20  import java.util.HashMap;
21  import javax.jms.Connection;
22  import javax.jms.DeliveryMode;
23  import javax.jms.Destination;
24  import javax.jms.Message;
25  import javax.jms.MessageConsumer;
26  import javax.jms.MessageProducer;
27  import javax.jms.ObjectMessage;
28  import javax.jms.Queue;
29  import javax.jms.QueueConnection;
30  import javax.jms.QueueReceiver;
31  import javax.jms.QueueSender;
32  import javax.jms.QueueSession;
33  import javax.jms.Session;
34  import javax.jms.TextMessage;
35  import javax.jms.Topic;
36  import org.codehaus.activemq.TestSupport;
37  import org.codehaus.activemq.util.IdGenerator;
38  
39  /***
40   * @version $Revision: 1.3 $
41   */
42  public class TopicRedeliverTest extends TestSupport {
43      
44      private static final int RECEIVE_TIMEOUT = 10000;
45      private IdGenerator idGen = new IdGenerator();
46      protected int deliveryMode = DeliveryMode.PERSISTENT;
47      public TopicRedeliverTest(){       
48      }
49      
50      public TopicRedeliverTest(String n){
51          super(n);
52      }
53      
54      protected void setup() throws Exception{
55          super.setUp();
56          topic = true;
57      }
58      
59      
60      /***
61       * test messages are acknowledged and recovered properly
62       * @throws Exception
63       */
64      public void testClientAcknowledge() throws Exception {
65          Destination destination = createDestination(getClass().getName());
66          Connection connection = createConnection();
67          connection.setClientID(idGen.generateId());
68          connection.start();
69          Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
70          MessageConsumer consumer = consumerSession.createConsumer(destination);
71          Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
72          MessageProducer producer = producerSession.createProducer(destination);
73          producer.setDeliveryMode(deliveryMode);
74          
75          //send some messages
76          
77          TextMessage sent1 = producerSession.createTextMessage();
78          sent1.setText("msg1");
79          producer.send(sent1);
80          
81          TextMessage sent2 = producerSession.createTextMessage();
82          sent1.setText("msg2");
83          producer.send(sent2);
84          
85          TextMessage sent3 = producerSession.createTextMessage();
86          sent1.setText("msg3");
87          producer.send(sent3);
88          
89          Message rec1 = consumer.receive(RECEIVE_TIMEOUT);
90          Message rec2 = consumer.receive(RECEIVE_TIMEOUT);
91          Message rec3 = consumer.receive(RECEIVE_TIMEOUT);
92          
93          //ack rec2
94          rec2.acknowledge();
95          
96          TextMessage sent4 = producerSession.createTextMessage();
97          sent4.setText("msg4");
98          producer.send(sent4);
99          
100         Message rec4 = consumer.receive(RECEIVE_TIMEOUT);
101         assertTrue(rec4.equals(sent4));
102         consumerSession.recover();
103         rec4 = consumer.receive(RECEIVE_TIMEOUT);
104         assertTrue(rec4.equals(sent4));
105         assertTrue(rec4.getJMSRedelivered());
106         rec4.acknowledge();
107         connection.close();
108         
109     }
110     
111     /***
112      * Test redelivered flag is set on rollbacked transactions
113      * @throws Exception
114      */
115     public void testRedilveredFlagSetOnRollback() throws Exception {
116         Destination destination = createDestination(getClass().getName());
117         Connection connection = createConnection();
118         connection.setClientID(idGen.generateId());
119         connection.start();
120         Session consumerSession = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
121         MessageConsumer consumer = null;
122         if (topic){
123             consumer = consumerSession.createDurableSubscriber((Topic)destination, "TESTRED");
124         }else{
125         consumer = consumerSession.createConsumer(destination);
126         }
127         Session producerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
128         MessageProducer producer = producerSession.createProducer(destination);
129         producer.setDeliveryMode(deliveryMode);
130         
131         TextMessage sentMsg = producerSession.createTextMessage();
132         sentMsg.setText("msg1");
133         producer.send(sentMsg);
134         producerSession.commit();
135         
136         Message recMsg = consumer.receive(RECEIVE_TIMEOUT);
137         assertTrue(recMsg.getJMSRedelivered() == false);
138         recMsg = consumer.receive(RECEIVE_TIMEOUT);
139         consumerSession.rollback();
140         recMsg = consumer.receive(RECEIVE_TIMEOUT);
141         assertTrue(recMsg.getJMSRedelivered());
142         consumerSession.commit();
143         assertTrue(recMsg.equals(sentMsg));
144         assertTrue(recMsg.getJMSRedelivered());
145         connection.close();
146     }
147 
148 
149     /***
150      * Check a session is rollbacked on a Session close();
151      * @throws Exception
152      */
153 
154     public void XtestTransactionRollbackOnSessionClose() throws Exception {
155         Destination destination = createDestination(getClass().getName());
156         Connection connection = createConnection();
157         connection.setClientID(idGen.generateId());
158         connection.start();
159         Session consumerSession = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
160         MessageConsumer consumer = null;
161         if (topic){
162             consumer = consumerSession.createDurableSubscriber((Topic)destination, "TESTRED");
163         }else{
164         consumer = consumerSession.createConsumer(destination);
165         }
166         Session producerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
167         MessageProducer producer = producerSession.createProducer(destination);
168         producer.setDeliveryMode(deliveryMode);
169         
170         TextMessage sentMsg = producerSession.createTextMessage();
171         sentMsg.setText("msg1");
172         producer.send(sentMsg);
173       
174         producerSession.commit();
175         
176         Message recMsg = consumer.receive(RECEIVE_TIMEOUT);
177         assertTrue(recMsg.getJMSRedelivered() == false);
178         consumerSession.close();
179         consumerSession = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
180         consumer = consumerSession.createConsumer(destination);
181         
182         recMsg = consumer.receive(RECEIVE_TIMEOUT);
183         consumerSession.commit();
184         assertTrue(recMsg.equals(sentMsg));
185         connection.close();
186     }
187 
188     /***
189      * check messages are actuallly sent on a tx rollback
190      * @throws Exception
191      */
192 
193     public void testTransactionRollbackOnSend() throws Exception {
194         Destination destination = createDestination(getClass().getName());
195         Connection connection = createConnection();
196         connection.setClientID(idGen.generateId());
197         connection.start();
198         Session consumerSession = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
199         MessageConsumer consumer = consumerSession.createConsumer(destination);
200         Session producerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
201         MessageProducer producer = producerSession.createProducer(destination);
202         producer.setDeliveryMode(deliveryMode);
203         
204         TextMessage sentMsg = producerSession.createTextMessage();
205         sentMsg.setText("msg1");
206         producer.send(sentMsg);
207         producerSession.commit();
208         
209         Message recMsg = consumer.receive(RECEIVE_TIMEOUT);
210         consumerSession.commit();
211         assertTrue(recMsg.equals(sentMsg));
212         
213         sentMsg = producerSession.createTextMessage();
214         sentMsg.setText("msg2");
215         producer.send(sentMsg);
216         producerSession.rollback();
217         
218         sentMsg = producerSession.createTextMessage();
219         sentMsg.setText("msg3");
220         producer.send(sentMsg);
221         producerSession.commit();
222         
223         recMsg = consumer.receive(RECEIVE_TIMEOUT);
224         assertTrue(recMsg.equals(sentMsg));
225         consumerSession.commit();
226         
227         connection.close();
228     }
229    
230 }