1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.usecases;
19  
20  import org.codehaus.activemq.TestSupport;
21  
22  import javax.jms.Connection;
23  import javax.jms.DeliveryMode;
24  import javax.jms.Destination;
25  import javax.jms.JMSException;
26  import javax.jms.Message;
27  import javax.jms.MessageConsumer;
28  import javax.jms.MessageProducer;
29  import javax.jms.Session;
30  import javax.jms.Topic;
31  
32  /***
33   * @version $Revision: 1.7 $
34   */
35  public class DurableConsumerCloseAndReconnectTest extends TestSupport {
36      protected static final long RECEIVE_TIMEOUT = 5000L;
37  
38      private Connection connection;
39      private Session session;
40      private MessageConsumer consumer;
41      private MessageProducer producer;
42      private Destination destination;
43  
44      public void testCreateDurableConsumerCloseThenReconnect() throws Exception {
45          // force the server to stay up across both connection tests
46          Connection dummyConnection = createConnection();
47  
48          consumeMessagesDeliveredWhileConsumerClosed();
49  
50          dummyConnection.close();
51  
52          // now lets try again without one connection open
53          consumeMessagesDeliveredWhileConsumerClosed();
54      }
55  
56      protected void consumeMessagesDeliveredWhileConsumerClosed() throws Exception {
57          makeConsumer();
58          closeConsumer();
59  
60          publish();
61  
62          // wait a few moments for the close to really occur
63          Thread.sleep(1000);
64  
65          makeConsumer();
66  
67          Message message = consumer.receive(RECEIVE_TIMEOUT);
68          assertTrue("Should have received a message!", message != null);
69  
70          closeConsumer();
71  
72          System.out.println("Now lets create the consumer again and because we didn't ack, we should get it again");
73          makeConsumer();
74  
75          message = consumer.receive(RECEIVE_TIMEOUT);
76          assertTrue("Should have received a message!", message != null);
77          message.acknowledge();
78  
79          closeConsumer();
80  
81          System.out.println("Now lets create the consumer again and because we didn't ack, we should get it again");
82          makeConsumer();
83  
84          message = consumer.receive(2000);
85          assertTrue("Should have no more messages left!", message == null);
86  
87          closeConsumer();
88  
89          System.out.println("Lets publish one more message now");
90          publish();
91  
92          makeConsumer();
93          message = consumer.receive(RECEIVE_TIMEOUT);
94          assertTrue("Should have received a message!", message != null);
95          message.acknowledge();
96  
97          closeConsumer();
98      }
99  
100     protected void publish() throws Exception {
101         connection = createConnection();
102         connection.start();
103 
104         session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
105         destination = createDestination();
106 
107         producer = session.createProducer(destination);
108         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
109 
110         producer.send(session.createTextMessage("This is a test"));
111 
112         producer.close();
113         producer = null;
114         closeSession();
115     }
116 
117     protected Destination createDestination() throws JMSException {
118         if (isTopic()) {
119             return session.createTopic(getSubject());
120         }
121         else {
122             return session.createQueue(getSubject());
123         }
124     }
125 
126     protected boolean isTopic() {
127         return true;
128     }
129 
130     protected void closeConsumer() throws JMSException {
131         consumer.close();
132         consumer = null;
133         closeSession();
134     }
135 
136     protected void closeSession() throws JMSException {
137         session.close();
138         session = null;
139         connection.close();
140         connection = null;
141     }
142 
143     protected void makeConsumer() throws Exception {
144         String durableName = getName();
145         String clientID = getSubject();
146         System.out.println("Creating a durable subscribe for clientID: " + clientID + " and durable name: " + durableName);
147         createSession(clientID);
148         consumer = createConsumer(durableName);
149     }
150 
151     private MessageConsumer createConsumer(String durableName) throws JMSException {
152         if (destination instanceof Topic) {
153             return session.createDurableSubscriber((Topic) destination, durableName);
154         }
155         else {
156             return session.createConsumer(destination);
157         }
158     }
159 
160     protected void createSession(String clientID) throws Exception {
161         connection = createConnection();
162         connection.setClientID(clientID);
163         connection.start();
164 
165         session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
166         destination = createDestination();
167     }
168 }