1   /***
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.codehaus.activemq.message.ActiveMQDestination;
23  
24  import javax.jms.Connection;
25  import javax.jms.Destination;
26  import javax.jms.JMSException;
27  import javax.jms.Message;
28  import javax.jms.MessageConsumer;
29  import javax.jms.MessageListener;
30  import javax.jms.MessageProducer;
31  import javax.jms.Session;
32  import javax.jms.TextMessage;
33  import java.util.List;
34  import java.util.Vector;
35  
36  /***
37   * @version $Revision: 1.6 $
38   */
39  public class JmsTopicRequestReplyTest extends TestSupport implements MessageListener {
40      private final Log log = LogFactory.getLog(getClass());
41  
42      private Connection serverConnection;
43      private Connection clientConnection;
44      private MessageProducer replyProducer;
45      private Session serverSession;
46      private Destination requestDestination;
47      private List failures = new Vector();
48      private boolean dynamicallyCreateProducer;
49      protected boolean useAsyncConsume = false;
50      private String clientSideClientID;
51  
52      public void testSendAndReceive() throws Exception {
53          clientConnection = createConnection();
54          clientConnection.setClientID("ClientConnection:" + getSubject());
55  
56          Session session = clientConnection.createSession(false,
57                  Session.AUTO_ACKNOWLEDGE);
58  
59          clientConnection.start();
60  
61          Destination replyDestination = createTemporaryDestination(session);
62  
63  
64          // lets test the destination
65          clientSideClientID = clientConnection.getClientID();
66          String value = ActiveMQDestination.getClientId((ActiveMQDestination) replyDestination);
67          assertEquals("clientID from the temporary destination must be the same", clientSideClientID, value);
68          log.info("Both the clientID and destination clientID match properly: " + clientSideClientID);
69  
70  
71          /* build queues */
72          MessageProducer requestProducer =
73                  session.createProducer(requestDestination);
74          MessageConsumer replyConsumer =
75                  session.createConsumer(replyDestination);
76  
77  
78          /* build requestmessage */
79          TextMessage requestMessage = session.createTextMessage("Olivier");
80          requestMessage.setJMSReplyTo(replyDestination);
81          requestProducer.send(requestMessage);
82  
83          log.info("Sent request.");
84          log.info(requestMessage.toString());
85  
86          Message msg = replyConsumer.receive(4000);
87  
88  
89          if (msg instanceof TextMessage) {
90              TextMessage replyMessage = (TextMessage) msg;
91              log.info("Received reply.");
92              log.info(replyMessage.toString());
93  
94              assertEquals("Wrong message content", "Hello: Olivier", replyMessage.getText());
95          }
96          else {
97              fail("Should have received a reply by now");
98          }
99  
100         assertEquals("Should not have had any failures: " + failures, 0, failures.size());
101     }
102 
103     public void testSendAndReceiveWithDynamicallyCreatedProducer() throws Exception {
104         dynamicallyCreateProducer = true;
105         testSendAndReceive();
106     }
107 
108     /***
109      * Use the asynchronous subscription mechanism
110      */
111     public void onMessage(Message message) {
112         try {
113             TextMessage requestMessage = (TextMessage) message;
114 
115             log.info("Received request.");
116             log.info(requestMessage.toString());
117 
118             Destination replyDestination = requestMessage.getJMSReplyTo();
119 
120             String value = ActiveMQDestination.getClientId((ActiveMQDestination) replyDestination);
121             assertEquals("clientID from the temporary destination must be the same", clientSideClientID, value);
122 
123             TextMessage replyMessage = serverSession.createTextMessage("Hello: " + requestMessage.getText());
124 
125             replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID());
126 
127             if (dynamicallyCreateProducer) {
128                 replyProducer = serverSession.createProducer(replyDestination);
129                 replyProducer.send(replyMessage);
130             }
131             else {
132                 replyProducer.send(replyDestination, replyMessage);
133             }
134 
135             log.info("Sent reply.");
136             log.info(replyMessage.toString());
137         }
138         catch (JMSException e) {
139             onException(e);
140         }
141     }
142 
143     /***
144      * Use the synchronous subscription mechanism
145      */
146     protected void syncConsumeLoop(MessageConsumer requestConsumer) {
147         try {
148             Message message = requestConsumer.receive(5000);
149             if (message != null) {
150                 onMessage(message);
151             }
152             else {
153                 log.error("No message received");
154             }
155         }
156         catch (JMSException e) {
157             onException(e);
158         }
159     }
160 
161 
162     protected void setUp() throws Exception {
163         super.setUp();
164 
165         serverConnection = createConnection();
166         serverConnection.setClientID("serverConnection:" + getSubject());
167         serverSession = serverConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
168 
169         replyProducer = serverSession.createProducer(null);
170 
171         requestDestination = createDestination(serverSession);
172 
173         /* build queues */
174         final MessageConsumer requestConsumer = serverSession.createConsumer(requestDestination);
175         if (useAsyncConsume) {
176             requestConsumer.setMessageListener(this);
177         }
178         else {
179             Thread thread = new Thread(new Runnable() {
180                 public void run() {
181                     syncConsumeLoop(requestConsumer);
182                 }
183             });
184             thread.start();
185         }
186         serverConnection.start();
187     }
188 
189     protected void tearDown() throws Exception {
190         super.tearDown();
191 
192         serverConnection.close();
193         clientConnection.stop();
194         clientConnection.close();
195     }
196 
197     protected void onException(JMSException e) {
198         log.info("Caught: " + e);
199         e.printStackTrace();
200         failures.add(e);
201     }
202 
203     protected Destination createDestination(Session session) throws JMSException {
204         if (topic) {
205             return session.createTopic(getSubject());
206         }
207         return session.createQueue(getSubject());
208     }
209 
210     protected Destination createTemporaryDestination(Session session) throws JMSException {
211         if (topic) {
212             return session.createTemporaryTopic();
213         }
214         return session.createTemporaryQueue();
215     }
216 
217 }