1 /*** 2 * 3 * Copyright 2004 Protique Ltd 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 **/ 18 package org.codehaus.activemq.service; 19 20 import org.codehaus.activemq.broker.Broker; 21 import org.codehaus.activemq.broker.BrokerClient; 22 import org.codehaus.activemq.broker.BrokerClientStub; 23 import org.codehaus.activemq.message.ActiveMQMessage; 24 import org.codehaus.activemq.message.ConsumerInfo; 25 26 import javax.jms.JMSException; 27 28 /*** 29 * @version $Revision: 1.3 $ 30 */ 31 public class TopicBrokerTest extends BrokerTestSupport { 32 33 protected BrokerClient client; 34 protected Object semaphore = new Object(); 35 36 public void testSendingMessagesToVariousDestinations() throws Exception { 37 38 BrokerClientStub queueA = new BrokerClientStub(semaphore); 39 BrokerClientStub queueB = new BrokerClientStub(semaphore); 40 BrokerClientStub queueC = new BrokerClientStub(semaphore); 41 BrokerClientStub queueD = new BrokerClientStub(semaphore); 42 43 addSubscription("FOO.BAR", null, queueA, isTopic()); 44 addSubscription("FOO.*", null, queueB, isTopic()); 45 addSubscription("BAR.*", "priority = 123", queueC, isTopic()); 46 addSubscription("BAR.*", "x = 'ABC'", queueD, isTopic()); 47 48 ActiveMQMessage message = dispatchMessage(broker, "FOO.BAR"); 49 queueA.waitForMessageToArrive(); 50 if (isTopic()) { 51 assertEquals("queueA", true, queueA.flushMessages().contains(message)); 52 assertEquals("queueB", true, queueB.flushMessages().contains(message)); 53 } 54 else { 55 boolean atA = queueA.flushMessages().contains(message); 56 boolean atB = queueB.flushMessages().contains(message); 57 assertTrue("Sent to queueA or queueB. a: " + atA + " b: " + atB, (atA && !atB) || (!atA && atB)); 58 } 59 assertEquals("queueC", false, queueC.flushMessages().contains(message)); 60 61 message = dispatchMessage(broker, "FOO.XYZ"); 62 queueB.waitForMessageToArrive(); 63 assertEquals("queueA", false, queueA.flushMessages().contains(message)); 64 assertEquals("queueC", false, queueC.flushMessages().contains(message)); 65 assertEquals("queueD", false, queueD.flushMessages().contains(message)); 66 assertEquals("queueB", true, queueB.flushMessages().contains(message)); 67 68 message = createMessage("BAR.XYZ"); 69 message.setIntProperty("priority", 123); 70 broker.sendMessage(client, message); 71 queueC.waitForMessageToArrive(); 72 73 assertEquals("queueA", false, queueA.flushMessages().contains(message)); 74 assertEquals("queueB", false, queueB.flushMessages().contains(message)); 75 assertEquals("queueC", true, queueC.flushMessages().contains(message)); 76 assertEquals("queueD", false, queueD.flushMessages().contains(message)); 77 78 message = createMessage("BAR.XYZ"); 79 message.setStringProperty("x", "ABC"); 80 broker.sendMessage(client, message); 81 queueD.waitForMessageToArrive(); 82 83 assertEquals("queueA", false, queueA.flushMessages().contains(message)); 84 assertEquals("queueB", false, queueB.flushMessages().contains(message)); 85 assertEquals("queueC", false, queueC.flushMessages().contains(message)); 86 assertEquals("queueD", true, queueD.flushMessages().contains(message)); 87 } 88 89 protected void addSubscription(String subject, String selector, BrokerClient client, boolean topic) throws JMSException { 90 ConsumerInfo info = createConsumer(subject, selector); 91 broker.addMessageConsumer(client, info); 92 } 93 94 protected ActiveMQMessage dispatchMessage(Broker broker, String subject) throws JMSException, InterruptedException { 95 ActiveMQMessage message = createMessage(subject); 96 broker.sendMessage(client, message); 97 return message; 98 } 99 100 }