1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.tool;
19
20 import javax.jms.Connection;
21 import javax.jms.JMSException;
22 import javax.jms.Message;
23 import javax.jms.MessageConsumer;
24 import javax.jms.MessageListener;
25 import javax.jms.Session;
26 import javax.jms.TextMessage;
27 import javax.jms.Topic;
28 import java.io.IOException;
29
30 /***
31 * A simple tool for consuming messages
32 *
33 * @version $Revision: 1.11 $
34 */
35 public class ConsumerTool extends ToolSupport implements MessageListener {
36
37 protected int count = 0;
38 protected int dumpCount = 10;
39 protected boolean verbose = true;
40 protected int maxiumMessages = 0;
41 private boolean pauseBeforeShutdown;
42
43
44 public static void main(String[] args) {
45 ConsumerTool tool = new ConsumerTool();
46 if (args.length > 0) {
47 tool.url = args[0];
48 }
49 if (args.length > 1) {
50 tool.topic = args[1].equalsIgnoreCase("true");
51 }
52 if (args.length > 2) {
53 tool.subject = args[2];
54 }
55 if (args.length > 3) {
56 tool.durable = args[3].equalsIgnoreCase("true");
57 }
58 if (args.length > 4) {
59 tool.maxiumMessages = Integer.parseInt(args[4]);
60 }
61 tool.run();
62 }
63
64 public void run() {
65 try {
66 System.out.println("Connecting to URL: " + url);
67 System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject);
68 System.out.println("Using " + (durable ? "durable" : "non-durable") + " subscription");
69
70 Connection connection = createConnection();
71 Session session = createSession(connection);
72 MessageConsumer consumer = null;
73 if (durable && topic) {
74 consumer = session.createDurableSubscriber((Topic) destination, consumerName);
75 }
76 else {
77 consumer = session.createConsumer(destination);
78 }
79 if (maxiumMessages <= 0) {
80 consumer.setMessageListener(this);
81 }
82 connection.start();
83
84 if (maxiumMessages > 0) {
85 consumeMessagesAndClose(connection, session, consumer);
86 }
87 }
88 catch (Exception e) {
89 System.out.println("Caught: " + e);
90 e.printStackTrace();
91 }
92 }
93
94 public void onMessage(Message message) {
95 try {
96 if (message instanceof TextMessage) {
97 TextMessage txtMsg = (TextMessage) message;
98 if (verbose) {
99
100 String msg = txtMsg.getText();
101 if( msg.length() > 50 )
102 msg = msg.substring(0, 50)+"...";
103
104 System.out.println("Received: " + msg);
105 }
106 }
107 else {
108 if (verbose) {
109 System.out.println("Received: " + message);
110 }
111 }
112
113
114
115
116
117 }
118 catch (JMSException e) {
119 System.out.println("Caught: " + e);
120 e.printStackTrace();
121 }
122 }
123
124
125 protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException, IOException {
126 System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown");
127
128 for (int i = 0; i < maxiumMessages; i++) {
129 Message message = consumer.receive();
130 onMessage(message);
131 }
132 System.out.println("Closing connection");
133 consumer.close();
134 session.close();
135 connection.close();
136 if (pauseBeforeShutdown) {
137 System.out.println("Press return to shut down");
138 System.in.read();
139 }
140 }
141 }