1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.benchmark;
19
20 import javax.jms.DeliveryMode;
21 import javax.jms.Destination;
22 import javax.jms.JMSException;
23 import javax.jms.Message;
24 import javax.jms.MessageProducer;
25 import javax.jms.Session;
26 import java.io.BufferedReader;
27 import java.io.File;
28 import java.io.FileReader;
29 import java.io.IOException;
30
31 /***
32 * @author James Strachan
33 * @version $Revision: 1.11 $
34 */
35 public class Producer extends BenchmarkSupport {
36
37 int loops = -1;
38 int loopSize = 1000;
39 private int messageSize = 1000;
40
41 public static void main(String[] args) {
42 Producer tool = new Producer();
43 if (args.length > 0) {
44 tool.setUrl(args[0]);
45 }
46 if (args.length > 1) {
47 tool.setTopic(parseBoolean(args[1]));
48 }
49 if (args.length > 2) {
50 tool.setSubject(args[2]);
51 }
52 if (args.length > 3) {
53 tool.setDurable(parseBoolean(args[3]));
54 }
55 if (args.length > 4) {
56 tool.setMessageSize(Integer.parseInt(args[4]));
57 }
58 if (args.length > 5) {
59 tool.setConnectionCount(Integer.parseInt(args[5]));
60 }
61 try {
62 tool.run();
63 }
64 catch (Exception e) {
65 System.out.println("Caught: " + e);
66 e.printStackTrace();
67 }
68 }
69
70 public Producer() {
71 }
72
73 public void run() throws Exception {
74 start();
75 publish();
76 }
77
78
79
80 public int getMessageSize() {
81 return messageSize;
82 }
83
84 public void setMessageSize(int messageSize) {
85 this.messageSize = messageSize;
86 }
87
88 public int getLoopSize() {
89 return loopSize;
90 }
91
92 public void setLoopSize(int loopSize) {
93 this.loopSize = loopSize;
94 }
95
96
97
98
99 protected void publish() throws Exception {
100 final String text = getMessage();
101
102 System.out.println("Publishing to: " + subjects.length + " subject(s)");
103
104 for (int i = 0; i < subjects.length; i++) {
105 final String subject = subjects[i];
106 Thread thread = new Thread() {
107 public void run() {
108 try {
109 publish(text, subject);
110 }
111 catch (JMSException e) {
112 System.out.println("Caught: " + e);
113 e.printStackTrace();
114 }
115 }
116 };
117 thread.start();
118 }
119
120 }
121
122 protected String getMessage() {
123 StringBuffer buffer = new StringBuffer();
124 for (int i = 0; i < messageSize; i++) {
125 char ch = 'X';
126 buffer.append(ch);
127 }
128 return buffer.toString();
129 }
130
131 protected void publish(String text, String subject) throws JMSException {
132 Session session = createSession();
133
134 Destination destination = createDestination(session, subject);
135
136 MessageProducer publisher = session.createProducer(destination);
137 if (isDurable()) {
138 publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
139 }
140 else {
141 publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
142 }
143
144 System.out.println("Starting publisher on : " + destination + " of type: " + destination.getClass().getName());
145 System.out.println("Message length: " + text.length());
146
147 if (loops <= 0) {
148 while (true) {
149 publishLoop(session, publisher, text);
150 }
151 }
152 else {
153 for (int i = 0; i < loops; i++) {
154 publishLoop(session, publisher, text);
155 }
156 }
157 }
158
159 protected void publishLoop(Session session, MessageProducer publisher, String text) throws JMSException {
160 for (int i = 0; i < loopSize; i++) {
161 Message message = session.createTextMessage(text);
162
163 publisher.send(message);
164 count(1);
165 }
166 }
167
168 protected String loadFile(String file) throws IOException {
169 System.out.println("Loading file: " + file);
170
171 StringBuffer buffer = new StringBuffer();
172 BufferedReader in = new BufferedReader(new FileReader(file));
173 while (true) {
174 String line = in.readLine();
175 if (line == null) {
176 break;
177 }
178 buffer.append(line);
179 buffer.append(File.separator);
180 }
181 return buffer.toString();
182 }
183 }