1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.service;
19
20 import org.codehaus.activemq.broker.Broker;
21 import org.codehaus.activemq.broker.BrokerClient;
22 import org.codehaus.activemq.broker.BrokerClientStub;
23 import org.codehaus.activemq.message.ActiveMQMessage;
24 import org.codehaus.activemq.message.ConsumerInfo;
25
26 import javax.jms.JMSException;
27
28 /***
29 * @version $Revision: 1.3 $
30 */
31 public class TopicBrokerTest extends BrokerTestSupport {
32
33 protected BrokerClient client;
34 protected Object semaphore = new Object();
35
36 public void testSendingMessagesToVariousDestinations() throws Exception {
37
38 BrokerClientStub queueA = new BrokerClientStub(semaphore);
39 BrokerClientStub queueB = new BrokerClientStub(semaphore);
40 BrokerClientStub queueC = new BrokerClientStub(semaphore);
41 BrokerClientStub queueD = new BrokerClientStub(semaphore);
42
43 addSubscription("FOO.BAR", null, queueA, isTopic());
44 addSubscription("FOO.*", null, queueB, isTopic());
45 addSubscription("BAR.*", "priority = 123", queueC, isTopic());
46 addSubscription("BAR.*", "x = 'ABC'", queueD, isTopic());
47
48 ActiveMQMessage message = dispatchMessage(broker, "FOO.BAR");
49 queueA.waitForMessageToArrive();
50 if (isTopic()) {
51 assertEquals("queueA", true, queueA.flushMessages().contains(message));
52 assertEquals("queueB", true, queueB.flushMessages().contains(message));
53 }
54 else {
55 boolean atA = queueA.flushMessages().contains(message);
56 boolean atB = queueB.flushMessages().contains(message);
57 assertTrue("Sent to queueA or queueB. a: " + atA + " b: " + atB, (atA && !atB) || (!atA && atB));
58 }
59 assertEquals("queueC", false, queueC.flushMessages().contains(message));
60
61 message = dispatchMessage(broker, "FOO.XYZ");
62 queueB.waitForMessageToArrive();
63 assertEquals("queueA", false, queueA.flushMessages().contains(message));
64 assertEquals("queueC", false, queueC.flushMessages().contains(message));
65 assertEquals("queueD", false, queueD.flushMessages().contains(message));
66 assertEquals("queueB", true, queueB.flushMessages().contains(message));
67
68 message = createMessage("BAR.XYZ");
69 message.setIntProperty("priority", 123);
70 broker.sendMessage(client, message);
71 queueC.waitForMessageToArrive();
72
73 assertEquals("queueA", false, queueA.flushMessages().contains(message));
74 assertEquals("queueB", false, queueB.flushMessages().contains(message));
75 assertEquals("queueC", true, queueC.flushMessages().contains(message));
76 assertEquals("queueD", false, queueD.flushMessages().contains(message));
77
78 message = createMessage("BAR.XYZ");
79 message.setStringProperty("x", "ABC");
80 broker.sendMessage(client, message);
81 queueD.waitForMessageToArrive();
82
83 assertEquals("queueA", false, queueA.flushMessages().contains(message));
84 assertEquals("queueB", false, queueB.flushMessages().contains(message));
85 assertEquals("queueC", false, queueC.flushMessages().contains(message));
86 assertEquals("queueD", true, queueD.flushMessages().contains(message));
87 }
88
89 protected void addSubscription(String subject, String selector, BrokerClient client, boolean topic) throws JMSException {
90 ConsumerInfo info = createConsumer(subject, selector);
91 broker.addMessageConsumer(client, info);
92 }
93
94 protected ActiveMQMessage dispatchMessage(Broker broker, String subject) throws JMSException, InterruptedException {
95 ActiveMQMessage message = createMessage(subject);
96 broker.sendMessage(client, message);
97 return message;
98 }
99
100 }