1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.usecases;
19  
20  import org.codehaus.activemq.ActiveMQConnection;
21  import org.codehaus.activemq.ProducerConsumerTestSupport;
22  
23  import javax.jms.Connection;
24  import javax.jms.JMSException;
25  import javax.jms.Message;
26  import javax.jms.TextMessage;
27  
28  /***
29   * @version $Revision: 1.4 $
30   */
31  public class ConsumeTopicPrefetchTest extends ProducerConsumerTestSupport {
32  
33      protected int prefetchSize = 100;
34      protected String[] messageTexts;
35      protected long consumerTimeout = 10000L;
36  
37      public void testSendPrefetchSize() throws JMSException {
38          testWithMessageCount(prefetchSize);
39      }
40  
41      public void testSendDoublePrefetchSize() throws JMSException {
42          testWithMessageCount(prefetchSize * 2);
43      }
44  
45      public void testSendPrefetchSizePlusOne() throws JMSException {
46          testWithMessageCount(prefetchSize + 1);
47      }
48  
49      protected void testWithMessageCount(int messageCount) throws JMSException {
50          makeMessages(messageCount);
51  
52          System.out.println("About to send and receive: " + messageCount + " on destination: " + destination
53                  + " of type: " + destination.getClass().getName());
54  
55          for (int i = 0; i < messageCount; i++) {
56              Message message = session.createTextMessage(messageTexts[i]);
57              producer.send(message);
58          }
59  
60          // lets consume them in two fetch batches
61          for (int i = 0; i < messageCount; i++) {
62              consumeMessge(i);
63          }
64      }
65  
66      protected Connection createConnection() throws Exception {
67          ActiveMQConnection connection = (ActiveMQConnection) super.createConnection();
68          connection.getPrefetchPolicy().setQueuePrefetch(prefetchSize);
69          connection.getPrefetchPolicy().setTopicPrefetch(prefetchSize);
70          return connection;
71      }
72  
73      protected void consumeMessge(int i) throws JMSException {
74          Message message = consumer.receive(consumerTimeout);
75          assertTrue("Should have received a message by now for message: " + i, message != null);
76          assertTrue("Should be a TextMessage: " + message, message instanceof TextMessage);
77          TextMessage textMessage = (TextMessage) message;
78          assertEquals("Message content", messageTexts[i], textMessage.getText());
79      }
80  
81  
82      protected void makeMessages(int messageCount) {
83          messageTexts = new String[messageCount];
84          for (int i = 0; i < messageCount; i++) {
85              messageTexts[i] = "Message for test: + " + getName() + " = " + i;
86          }
87      }
88  
89  }