1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq;
20
21 import javax.jms.DeliveryMode;
22 import javax.jms.Destination;
23 import javax.jms.JMSException;
24 import javax.jms.Message;
25 import javax.jms.MessageConsumer;
26 import javax.jms.MessageListener;
27 import javax.jms.MessageProducer;
28 import javax.jms.Session;
29 import javax.jms.TextMessage;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.Collections;
33 import java.util.Date;
34 import java.util.Iterator;
35 import java.util.List;
36
37 /***
38 * @version $Revision: 1.27 $
39 */
40 public class JmsSendReceiveTestSupport extends TestSupport implements MessageListener {
41 protected int messageCount = 100;
42 protected String[] data;
43 protected Session session;
44 protected MessageConsumer consumer;
45 protected MessageProducer producer;
46 protected Destination consumerDestination;
47 protected Destination producerDestination;
48 protected List messages = createConcurrentList();
49 protected boolean topic = true;
50 protected boolean durable = false;
51 protected int deliveryMode = DeliveryMode.PERSISTENT;
52 protected final Object lock = new Object();
53 protected boolean verbose = false;
54
55 public void testSendReceive() throws Exception {
56 messages.clear();
57 for (int i = 0; i < data.length; i++) {
58 Message message = session.createTextMessage(data[i]);
59 if (verbose) {
60 System.out.println("About to send a message: " + message + " with text: " + data[i]);
61 }
62 producer.send(producerDestination, message);
63 }
64 assertMessagesAreReceived();
65 System.out.println("" + data.length + " messages(s) received, closing down connections");
66 }
67
68 protected void assertMessagesAreReceived() throws JMSException {
69 waitForMessagesToBeDelivered();
70 assertMessagesReceivedAreValid(messages);
71 }
72
73 protected void assertMessagesReceivedAreValid(List receivedMessages) throws JMSException {
74 List copyOfMessages = Arrays.asList(receivedMessages.toArray());
75 int counter = 0;
76 if (data.length != copyOfMessages.size()) {
77 for (Iterator iter = copyOfMessages.iterator(); iter.hasNext();) {
78 TextMessage message = (TextMessage) iter.next();
79 System.out.println("<== " + counter++ + " = " + message);
80 }
81 }
82 assertEquals("Not enough messages received", data.length, receivedMessages.size());
83 for (int i = 0; i < data.length; i++) {
84 TextMessage received = (TextMessage) receivedMessages.get(i);
85 String text = received.getText();
86 if (verbose) {
87 System.out.println("Received Text: " + text);
88 }
89 assertEquals("Message: " + i, data[i], text);
90 }
91 }
92
93 protected void waitForMessagesToBeDelivered() {
94 long maxWaitTime = 30000;
95 long waitTime = maxWaitTime;
96 long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();
97 synchronized (lock) {
98 while (messages.size() < data.length && waitTime >= 0) {
99 try {
100 lock.wait(200);
101 }
102 catch (InterruptedException e) {
103 e.printStackTrace();
104 }
105 waitTime = maxWaitTime - (System.currentTimeMillis() - start);
106 }
107 }
108 }
109
110 private void waitForTimeOrNotify(long timeout) {
111 try {
112 synchronized (lock) {
113 lock.wait(timeout);
114 }
115 }
116 catch (InterruptedException e) {
117 System.out.println("Caught: " + e);
118 }
119 }
120
121 protected void setUp() throws Exception {
122 super.setUp();
123 String temp = System.getProperty("messageCount");
124 if (temp != null) {
125 int i = Integer.parseInt(temp);
126 if (i > 0) {
127 messageCount = i;
128 }
129 }
130 System.out.println("Message count for test case is: " + messageCount);
131 data = new String[messageCount];
132 for (int i = 0; i < messageCount; i++) {
133 data[i] = "Text for message: " + i + " at " + new Date();
134 }
135 }
136
137 public synchronized void onMessage(Message message) {
138 consumeMessage(message, messages);
139 }
140
141 protected void consumeMessage(Message message, List messageList) {
142 if (verbose) {
143 System.out.println("Received message: " + message);
144 }
145 messageList.add(message);
146 if (messageList.size() >= data.length) {
147 synchronized (lock) {
148 lock.notifyAll();
149 }
150 }
151 }
152
153 protected List createConcurrentList() {
154 return Collections.synchronizedList(new ArrayList());
155 }
156 }