1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq;
20  
21  import javax.jms.DeliveryMode;
22  import javax.jms.Destination;
23  import javax.jms.JMSException;
24  import javax.jms.Message;
25  import javax.jms.MessageConsumer;
26  import javax.jms.MessageListener;
27  import javax.jms.MessageProducer;
28  import javax.jms.Session;
29  import javax.jms.TextMessage;
30  import java.util.ArrayList;
31  import java.util.Arrays;
32  import java.util.Collections;
33  import java.util.Date;
34  import java.util.Iterator;
35  import java.util.List;
36  
37  /***
38   * @version $Revision: 1.27 $
39   */
40  public class JmsSendReceiveTestSupport extends TestSupport implements MessageListener {
41      protected int messageCount = 100;
42      protected String[] data;
43      protected Session session;
44      protected MessageConsumer consumer;
45      protected MessageProducer producer;
46      protected Destination consumerDestination;
47      protected Destination producerDestination;
48      protected List messages = createConcurrentList();
49      protected boolean topic = true;
50      protected boolean durable = false;
51      protected int deliveryMode = DeliveryMode.PERSISTENT;
52      protected final Object lock = new Object();
53      protected boolean verbose = false;
54  
55      public void testSendReceive() throws Exception {
56          messages.clear();
57          for (int i = 0; i < data.length; i++) {
58              Message message = session.createTextMessage(data[i]);
59              if (verbose) {
60                  System.out.println("About to send a message: " + message + " with text: " + data[i]);
61              }
62              producer.send(producerDestination, message);
63          }
64          assertMessagesAreReceived();
65          System.out.println("" + data.length + " messages(s) received, closing down connections");
66      }
67  
68      protected void assertMessagesAreReceived() throws JMSException {
69          waitForMessagesToBeDelivered();
70          assertMessagesReceivedAreValid(messages);
71      }
72  
73      protected void assertMessagesReceivedAreValid(List receivedMessages) throws JMSException {
74          List copyOfMessages = Arrays.asList(receivedMessages.toArray());
75          int counter = 0;
76          if (data.length != copyOfMessages.size()) {
77              for (Iterator iter = copyOfMessages.iterator(); iter.hasNext();) {
78                  TextMessage message = (TextMessage) iter.next();
79                  System.out.println("<== " + counter++ + " = " + message);
80              }
81          }
82          assertEquals("Not enough messages received", data.length, receivedMessages.size());
83          for (int i = 0; i < data.length; i++) {
84              TextMessage received = (TextMessage) receivedMessages.get(i);
85              String text = received.getText();
86              if (verbose) {
87                  System.out.println("Received Text: " + text);
88              }
89              assertEquals("Message: " + i, data[i], text);
90          }
91      }
92  
93      protected void waitForMessagesToBeDelivered() {
94          long maxWaitTime = 30000;
95          long waitTime = maxWaitTime;
96          long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();
97          synchronized (lock) {
98              while (messages.size() < data.length && waitTime >= 0) {
99                  try {
100                     lock.wait(200);
101                 }
102                 catch (InterruptedException e) {
103                     e.printStackTrace();
104                 }
105                 waitTime = maxWaitTime - (System.currentTimeMillis() - start);
106             }
107         }
108     }
109 
110     private void waitForTimeOrNotify(long timeout) {
111         try {
112             synchronized (lock) {
113                 lock.wait(timeout);
114             }
115         }
116         catch (InterruptedException e) {
117             System.out.println("Caught: " + e);
118         }
119     }
120 
121     protected void setUp() throws Exception {
122         super.setUp();
123         String temp = System.getProperty("messageCount");
124         if (temp != null) {
125             int i = Integer.parseInt(temp);
126             if (i > 0) {
127                 messageCount = i;
128             }
129         }
130         System.out.println("Message count for test case is: " + messageCount);
131         data = new String[messageCount];
132         for (int i = 0; i < messageCount; i++) {
133             data[i] = "Text for message: " + i + " at " + new Date();
134         }
135     }
136 
137     public synchronized void onMessage(Message message) {
138         consumeMessage(message, messages);
139     }
140 
141     protected void consumeMessage(Message message, List messageList) {
142         if (verbose) {
143             System.out.println("Received message: " + message);
144         }
145         messageList.add(message);
146         if (messageList.size() >= data.length) {
147             synchronized (lock) {
148                 lock.notifyAll();
149             }
150         }
151     }
152 
153     protected List createConcurrentList() {
154         return Collections.synchronizedList(new ArrayList());
155     }
156 }