1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.usecases;
19  
20  import org.codehaus.activemq.ActiveMQConnectionFactory;
21  import org.codehaus.activemq.JmsSendReceiveTestSupport;
22  import org.codehaus.activemq.message.ActiveMQTopic;
23  
24  import javax.jms.Connection;
25  import javax.jms.Destination;
26  import javax.jms.JMSException;
27  import javax.jms.Message;
28  import javax.jms.MessageConsumer;
29  import javax.jms.MessageListener;
30  import javax.jms.Session;
31  import java.util.List;
32  
33  /***
34   * @version $Revision: 1.4 $
35   */
36  public class CompositePublishTest extends JmsSendReceiveTestSupport {
37  
38      protected Connection sendConnection;
39      protected Connection receiveConnection;
40      protected Session receiveSession;
41      protected MessageConsumer[] consumers;
42      protected List[] messageLists;
43  
44      protected void setUp() throws Exception {
45          super.setUp();
46  
47          connectionFactory = createConnectionFactory();
48  
49          sendConnection = createConnection();
50          sendConnection.start();
51  
52          receiveConnection = createConnection();
53          receiveConnection.start();
54  
55          System.out.println("Created sendConnection: " + sendConnection);
56          System.out.println("Created receiveConnection: " + receiveConnection);
57  
58          session = sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
59          receiveSession = receiveConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
60  
61          System.out.println("Created sendSession: " + session);
62          System.out.println("Created receiveSession: " + receiveSession);
63  
64          producer = session.createProducer(null);
65  
66          System.out.println("Created producer: " + producer);
67  
68          if (topic) {
69              consumerDestination = session.createTopic(getConsumerSubject());
70              producerDestination = session.createTopic(getProducerSubject());
71          }
72          else {
73              consumerDestination = session.createQueue(getConsumerSubject());
74              producerDestination = session.createQueue(getProducerSubject());
75          }
76  
77          System.out.println("Created  consumer destination: " + consumerDestination + " of type: " + consumerDestination.getClass());
78          System.out.println("Created  producer destination: " + producerDestination + " of type: " + producerDestination.getClass());
79  
80          Destination[] destinations = getDestinations();
81          consumers = new MessageConsumer[destinations.length];
82          messageLists = new List[destinations.length];
83          for (int i = 0; i < destinations.length; i++) {
84              Destination dest = destinations[i];
85              messageLists[i] = createConcurrentList();
86              consumers[i] = receiveSession.createConsumer(dest);
87              consumers[i].setMessageListener(createMessageListener(i, messageLists[i]));
88          }
89  
90  
91          System.out.println("Started connections");
92      }
93  
94      protected MessageListener createMessageListener(int i, final List messageList) {
95          return new MessageListener() {
96              public void onMessage(Message message) {
97                  consumeMessage(message, messageList);
98              }
99          };
100     }
101 
102     /***
103      * Returns the subject on which we publish
104      */
105     protected String getSubject() {
106         return getPrefix() + "FOO.BAR," + getPrefix() + "FOO.X.Y";
107     }
108 
109     /***
110      * Returns the destinations to which we consume
111      */
112     protected Destination[] getDestinations() {
113         return new Destination[]{new ActiveMQTopic(getPrefix() + "FOO.BAR"), new ActiveMQTopic(getPrefix() + "FOO.*"), new ActiveMQTopic(getPrefix() + "FOO.X.Y")};
114     }
115 
116     protected String getPrefix() {
117         return super.getSubject() + ".";
118     }
119 
120     protected void assertMessagesAreReceived() throws JMSException {
121         waitForMessagesToBeDelivered();
122 
123         for (int i = 0, size = messageLists.length; i < size; i++) {
124             System.out.println("Message list: " + i + " contains: " + messageLists[i].size() + " message(s)");
125         }
126 
127         for (int i = 0, size = messageLists.length; i < size; i++) {
128             assertMessagesReceivedAreValid(messageLists[i]);
129         }
130     }
131 
132     protected ActiveMQConnectionFactory createConnectionFactory() {
133         return new ActiveMQConnectionFactory("vm://localhost");
134     }
135 
136     protected void tearDown() throws Exception {
137         session.close();
138         receiveSession.close();
139 
140         sendConnection.close();
141         receiveConnection.close();
142     }
143 }