1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.usecases;
20  import javax.jms.Connection;
21  import javax.jms.DeliveryMode;
22  import javax.jms.Destination;
23  import javax.jms.JMSException;
24  import javax.jms.Message;
25  import javax.jms.MessageConsumer;
26  import javax.jms.MessageListener;
27  import javax.jms.MessageProducer;
28  import javax.jms.Session;
29  import javax.jms.TextMessage;
30  import junit.framework.TestCase;
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.codehaus.activemq.ActiveMQConnectionFactory;
34  import org.codehaus.activemq.broker.BrokerContainer;
35  import org.codehaus.activemq.broker.impl.BrokerContainerImpl;
36  import org.codehaus.activemq.message.ActiveMQQueue;
37  import org.codehaus.activemq.message.ActiveMQTextMessage;
38  import org.codehaus.activemq.message.ActiveMQTopic;
39  import org.codehaus.activemq.transport.DiscoveryNetworkConnector;
40  import org.codehaus.activemq.transport.zeroconf.ZeroconfDiscoveryAgent;
41  import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
42  
43  /***
44   * @version $Revision: 1.3 $
45   */
46  public class TopicClusterTest extends TestCase implements MessageListener {
47      protected Log log = LogFactory.getLog(getClass());
48      protected Destination destination;
49      protected boolean topic = true;
50      protected SynchronizedInt receivedMessageCount = new SynchronizedInt(0);
51      protected static int MESSAGE_COUNT = 50;
52      protected static int NUMBER_IN_CLUSTER = 3;
53      protected int deliveryMode = DeliveryMode.NON_PERSISTENT;
54      protected MessageProducer[] producers;
55      protected Connection[] connections;
56  
57      protected void setUp() throws Exception {
58          connections = new Connection[NUMBER_IN_CLUSTER];
59          producers = new MessageProducer[NUMBER_IN_CLUSTER];
60          Destination destination = createDestination();
61          int portStart = 50000;
62          for (int i = 0;i < NUMBER_IN_CLUSTER;i++) {
63              connections[i] = createConnection("broker(" + i + ")");
64              connections[i].setClientID("ClusterTest" + i);
65              connections[i].start();
66              Session session = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
67              producers[i] = session.createProducer(destination);
68              producers[i].setDeliveryMode(deliveryMode);
69              MessageConsumer consumer = createMessageConsumer(session,destination);
70              consumer.setMessageListener(this);
71              
72          }
73          System.out.println("Sleeping to ensure cluster is fully connected");
74          Thread.sleep(5000);
75      }
76  
77      protected void tearDown() throws Exception {
78          if (connections != null) {
79              for (int i = 0;i < connections.length;i++) {
80                  connections[i].close();
81              }
82          }
83      }
84      
85      protected MessageConsumer createMessageConsumer(Session session, Destination destination) throws JMSException{
86          return session.createConsumer(destination);
87      }
88  
89      protected ActiveMQConnectionFactory createGenericClusterFactory(String brokerName) throws JMSException {
90          BrokerContainer container = new BrokerContainerImpl(brokerName);
91          ZeroconfDiscoveryAgent agent = new ZeroconfDiscoveryAgent();
92          agent.setType(getClass().getName() + ".");
93          container.setDiscoveryAgent(agent);
94          String url = "tcp://localhost:0";
95          container.addConnector(url);
96          container.addNetworkConnector(new DiscoveryNetworkConnector(container));
97          container.start();
98          //embedded brokers are resolved by url - so make url unique
99          //this confused me tests for a while :-)
100         return new ActiveMQConnectionFactory(container,"vm://"+brokerName);
101     }
102 
103     protected int expectedReceiveCount() {
104         return MESSAGE_COUNT * NUMBER_IN_CLUSTER * NUMBER_IN_CLUSTER;
105     }
106 
107     protected Connection createConnection(String name) throws JMSException {
108         return createGenericClusterFactory(name).createConnection();
109     }
110 
111     protected Destination createDestination() {
112         return createDestination(getClass().getName());
113     }
114 
115     protected Destination createDestination(String name) {
116         if (topic) {
117             return new ActiveMQTopic(name);
118         }
119         else {
120             return new ActiveMQQueue(name);
121         }
122     }
123 
124 
125     /***
126      * @param msg
127      */
128     public void onMessage(Message msg) {
129         //System.out.println("GOT: " + msg);
130         receivedMessageCount.increment();
131         synchronized (receivedMessageCount) {
132             if (receivedMessageCount.get() >= expectedReceiveCount()) {
133                 receivedMessageCount.notify();
134             }
135         }
136     }
137 
138     /***
139      * @throws Exception
140      */
141     public void testSendReceive() throws Exception {
142         for (int i = 0;i < MESSAGE_COUNT;i++) {
143             TextMessage textMessage = new ActiveMQTextMessage();
144             textMessage.setText("MSG-NO:" + i);
145             for (int x = 0;x < producers.length;x++) {
146                 producers[x].send(textMessage);
147             }
148         }
149         synchronized (receivedMessageCount) {
150             if (receivedMessageCount.get() < expectedReceiveCount()) {
151                 receivedMessageCount.wait(20000);
152             }
153         }
154         //sleep a little - to check we don't get too many messages
155         Thread.sleep(2000);
156         System.err.println("GOT: " + receivedMessageCount.get());
157         assertEquals("Expected message count not correct", expectedReceiveCount(), receivedMessageCount.get());
158     }
159 
160 }