1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.usecases;
20  import java.util.HashMap;
21  import javax.jms.Connection;
22  import javax.jms.DeliveryMode;
23  import javax.jms.Destination;
24  import javax.jms.JMSException;
25  import javax.jms.Message;
26  import javax.jms.MessageConsumer;
27  import javax.jms.MessageProducer;
28  import javax.jms.ObjectMessage;
29  import javax.jms.Queue;
30  import javax.jms.QueueConnection;
31  import javax.jms.QueueReceiver;
32  import javax.jms.QueueSender;
33  import javax.jms.QueueSession;
34  import javax.jms.Session;
35  import javax.jms.TextMessage;
36  import javax.jms.Topic;
37  import org.codehaus.activemq.ActiveMQConnection;
38  import org.codehaus.activemq.ActiveMQConnectionFactory;
39  import org.codehaus.activemq.TestSupport;
40  import org.codehaus.activemq.broker.BrokerContainer;
41  import org.codehaus.activemq.broker.impl.BrokerContainerImpl;
42  import org.codehaus.activemq.util.IdGenerator;
43  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
44  import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
45  
46  /***
47   * @version $Revision: 1.4 $
48   */
49  public class ReliableReconnectTest extends TestSupport {
50      private static final int RECEIVE_TIMEOUT = 10000;
51      protected static final int MESSAGE_COUNT = 100;
52      private IdGenerator idGen = new IdGenerator();
53      protected int deliveryMode = DeliveryMode.PERSISTENT;
54      protected String consumerClientId;
55      protected Destination destination;
56      protected SynchronizedBoolean closeBroker = new SynchronizedBoolean(false);
57      protected SynchronizedInt messagesReceived = new SynchronizedInt(0);
58      protected BrokerContainer brokerContainer;
59      protected int firstBatch = MESSAGE_COUNT/10;
60  
61      public ReliableReconnectTest() {
62      }
63  
64      public ReliableReconnectTest(String n) {
65          super(n);
66      }
67  
68      protected void setUp() throws Exception {
69          consumerClientId = idGen.generateId();
70          super.setUp();
71          topic = true;
72          destination = createDestination(getClass().getName());
73      }
74  
75      public ActiveMQConnectionFactory getConnectionFactory() throws Exception {
76          String url = "reliable:" + ActiveMQConnection.DEFAULT_URL;
77          return new ActiveMQConnectionFactory(url);
78      }
79  
80      protected void startBroker() throws JMSException {
81          brokerContainer = new BrokerContainerImpl();
82          String url = ActiveMQConnection.DEFAULT_URL;
83          brokerContainer.addConnector(url);
84          brokerContainer.start();
85      }
86  
87      protected Connection createConsumerConnection() throws Exception {
88          Connection consumerConnection = getConnectionFactory().createConnection();
89          consumerConnection.setClientID(consumerClientId);
90          consumerConnection.start();
91          return consumerConnection;
92      }
93  
94      protected MessageConsumer createConsumer(Connection con) throws Exception {
95          Session s = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
96          return s.createDurableSubscriber((Topic) destination, "TestFred");
97      }
98  
99      protected void spawnConsumer() {
100         Thread thread = new Thread(new Runnable() {
101             public void run() {
102                 try {
103                     Connection consumerConnection = createConsumerConnection();
104                     MessageConsumer consumer = createConsumer(consumerConnection);
105                     //consume some messages
106                 
107                     for (int i = 0;i < firstBatch;i++) {
108                         Message msg = consumer.receive(RECEIVE_TIMEOUT);
109                         if (msg != null) {
110                             //System.out.println("GOT: " + msg);
111                             messagesReceived.increment();
112                         }
113                     }
114                     synchronized (closeBroker) {
115                         closeBroker.set(true);
116                         closeBroker.notify();
117                     }
118                     Thread.sleep(2000);
119                     for (int i = firstBatch;i < MESSAGE_COUNT;i++) {
120                         Message msg = consumer.receive(RECEIVE_TIMEOUT);
121                         //System.out.println("GOT: " + msg);
122                         if (msg != null) {
123                             messagesReceived.increment();
124                         }
125                     }
126                     consumerConnection.close();
127                     synchronized (messagesReceived) {
128                         messagesReceived.notify();
129                     }
130                 }
131                 catch (Throwable e) {
132                     e.printStackTrace();
133                 }
134             }
135         });
136         thread.start();
137     }
138 
139     public void testReconnect() throws Exception {
140         startBroker();
141         //register an interest as a durable subscriber
142         Connection consumerConnection = createConsumerConnection();
143         createConsumer(consumerConnection);
144         consumerConnection.close();
145         //send some messages ...
146         Connection connection = createConnection();
147         connection.setClientID(idGen.generateId());
148         connection.start();
149         Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
150         MessageProducer producer = producerSession.createProducer(destination);
151         TextMessage msg = producerSession.createTextMessage();
152         for (int i = 0;i < MESSAGE_COUNT;i++) {
153             msg.setText("msg: " + i);
154             producer.send(msg);
155         }
156         connection.close();
157         spawnConsumer();
158         synchronized (closeBroker) {
159             if (!closeBroker.get()) {
160                 closeBroker.wait();
161             }
162         }
163         System.err.println("Stopping broker");
164         brokerContainer.stop();
165         startBroker();
166         System.err.println("Started Broker again");
167         synchronized (messagesReceived) {
168             if (messagesReceived.get() < MESSAGE_COUNT) {
169                 messagesReceived.wait(60000);
170             }
171         }
172         //assertTrue(messagesReceived.get() == MESSAGE_COUNT);
173         int count = messagesReceived.get();
174         assertTrue("Not enough messages received: " + count, count > firstBatch);
175     }
176 }