1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.usecases;
19
20 import org.codehaus.activemq.ActiveMQConnectionFactory;
21 import org.codehaus.activemq.JmsSendReceiveTestSupport;
22 import org.codehaus.activemq.message.ActiveMQTopic;
23
24 import javax.jms.Connection;
25 import javax.jms.Destination;
26 import javax.jms.JMSException;
27 import javax.jms.Message;
28 import javax.jms.MessageConsumer;
29 import javax.jms.MessageListener;
30 import javax.jms.Session;
31 import java.util.List;
32
33 /***
34 * @version $Revision: 1.4 $
35 */
36 public class CompositePublishTest extends JmsSendReceiveTestSupport {
37
38 protected Connection sendConnection;
39 protected Connection receiveConnection;
40 protected Session receiveSession;
41 protected MessageConsumer[] consumers;
42 protected List[] messageLists;
43
44 protected void setUp() throws Exception {
45 super.setUp();
46
47 connectionFactory = createConnectionFactory();
48
49 sendConnection = createConnection();
50 sendConnection.start();
51
52 receiveConnection = createConnection();
53 receiveConnection.start();
54
55 System.out.println("Created sendConnection: " + sendConnection);
56 System.out.println("Created receiveConnection: " + receiveConnection);
57
58 session = sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
59 receiveSession = receiveConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
60
61 System.out.println("Created sendSession: " + session);
62 System.out.println("Created receiveSession: " + receiveSession);
63
64 producer = session.createProducer(null);
65
66 System.out.println("Created producer: " + producer);
67
68 if (topic) {
69 consumerDestination = session.createTopic(getConsumerSubject());
70 producerDestination = session.createTopic(getProducerSubject());
71 }
72 else {
73 consumerDestination = session.createQueue(getConsumerSubject());
74 producerDestination = session.createQueue(getProducerSubject());
75 }
76
77 System.out.println("Created consumer destination: " + consumerDestination + " of type: " + consumerDestination.getClass());
78 System.out.println("Created producer destination: " + producerDestination + " of type: " + producerDestination.getClass());
79
80 Destination[] destinations = getDestinations();
81 consumers = new MessageConsumer[destinations.length];
82 messageLists = new List[destinations.length];
83 for (int i = 0; i < destinations.length; i++) {
84 Destination dest = destinations[i];
85 messageLists[i] = createConcurrentList();
86 consumers[i] = receiveSession.createConsumer(dest);
87 consumers[i].setMessageListener(createMessageListener(i, messageLists[i]));
88 }
89
90
91 System.out.println("Started connections");
92 }
93
94 protected MessageListener createMessageListener(int i, final List messageList) {
95 return new MessageListener() {
96 public void onMessage(Message message) {
97 consumeMessage(message, messageList);
98 }
99 };
100 }
101
102 /***
103 * Returns the subject on which we publish
104 */
105 protected String getSubject() {
106 return getPrefix() + "FOO.BAR," + getPrefix() + "FOO.X.Y";
107 }
108
109 /***
110 * Returns the destinations to which we consume
111 */
112 protected Destination[] getDestinations() {
113 return new Destination[]{new ActiveMQTopic(getPrefix() + "FOO.BAR"), new ActiveMQTopic(getPrefix() + "FOO.*"), new ActiveMQTopic(getPrefix() + "FOO.X.Y")};
114 }
115
116 protected String getPrefix() {
117 return super.getSubject() + ".";
118 }
119
120 protected void assertMessagesAreReceived() throws JMSException {
121 waitForMessagesToBeDelivered();
122
123 for (int i = 0, size = messageLists.length; i < size; i++) {
124 System.out.println("Message list: " + i + " contains: " + messageLists[i].size() + " message(s)");
125 }
126
127 for (int i = 0, size = messageLists.length; i < size; i++) {
128 assertMessagesReceivedAreValid(messageLists[i]);
129 }
130 }
131
132 protected ActiveMQConnectionFactory createConnectionFactory() {
133 return new ActiveMQConnectionFactory("vm://localhost");
134 }
135
136 protected void tearDown() throws Exception {
137 session.close();
138 receiveSession.close();
139
140 sendConnection.close();
141 receiveConnection.close();
142 }
143 }