1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.tool;
19
20 import javax.jms.JMSException;
21 import javax.jms.Message;
22 import javax.jms.MessageConsumer;
23 import javax.jms.MessageListener;
24 import javax.jms.TextMessage;
25 import javax.jms.Topic;
26 import java.io.IOException;
27
28 /***
29 * A simple tool for consuming messages
30 *
31 * @version $Revision: 1.7 $
32 */
33 public class ConsumerTool extends ToolSupport implements MessageListener {
34
35 protected int count = 0;
36 protected int dumpCount = 10;
37 protected boolean verbose = true;
38 protected int maxiumMessages = 0;
39 private MessageConsumer consumer;
40 private boolean pauseBeforeShutdown;
41
42
43 public static void main(String[] args) {
44 ConsumerTool tool = new ConsumerTool();
45 if (args.length > 0) {
46 tool.url = args[0];
47 }
48 if (args.length > 1) {
49 tool.topic = args[1].equalsIgnoreCase("true");
50 }
51 if (args.length > 2) {
52 tool.subject = args[2];
53 }
54 if (args.length > 3) {
55 tool.durable = args[3].equalsIgnoreCase("true");
56 }
57 if (args.length > 4) {
58 tool.maxiumMessages = Integer.parseInt(args[4]);
59 }
60 tool.run();
61 }
62
63 public void run() {
64 try {
65 System.out.println("Connecting to URL: " + url);
66 System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject);
67 System.out.println("Using " + (durable ? "durable" : "non-durable") + " subscription");
68
69 createSession();
70 if (durable && topic) {
71 consumer = session.createDurableSubscriber((Topic) destination, consumerName);
72 }
73 else {
74 consumer = session.createConsumer(destination);
75 }
76 if (maxiumMessages <= 0) {
77 consumer.setMessageListener(this);
78 }
79 connection.start();
80
81 if (maxiumMessages > 0) {
82 consumeMessagesAndClose();
83 }
84 }
85 catch (Exception e) {
86 System.out.println("Caught: " + e);
87 e.printStackTrace();
88 }
89 }
90
91 public void onMessage(Message message) {
92 try {
93 if (message instanceof TextMessage) {
94 TextMessage txtMsg = (TextMessage) message;
95 if (verbose) {
96 System.out.println("Received: " + txtMsg.getText());
97 }
98 }
99 else {
100 if (verbose) {
101 System.out.println("Received: " + message);
102 }
103 }
104 if (++count % dumpCount == 0) {
105 dumpStats();
106 }
107 }
108 catch (JMSException e) {
109 System.out.println("Caught: " + e);
110 e.printStackTrace();
111 }
112 }
113
114
115 protected void consumeMessagesAndClose() throws JMSException, IOException {
116 System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown");
117
118 for (int i = 0; i < maxiumMessages; i++) {
119 Message message = consumer.receive();
120 onMessage(message);
121 }
122 System.out.println("Closing connection");
123 consumer.close();
124 session.close();
125 connection.close();
126 if (pauseBeforeShutdown) {
127 System.out.println("Press return to shut down");
128 System.in.read();
129 }
130 }
131 }