1   /***
2    *
3    * Copyright 2004 Protique Ltd
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  package org.codehaus.activemq.benchmark;
19  
20  import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
21  import org.codehaus.activemq.ActiveMQConnectionFactory;
22  import org.codehaus.activemq.util.IdGenerator;
23  
24  import javax.jms.Connection;
25  import javax.jms.Destination;
26  import javax.jms.JMSException;
27  import javax.jms.Session;
28  import java.text.NumberFormat;
29  import java.util.ArrayList;
30  import java.util.List;
31  
32  /***
33   * Abstract base class for some simple benchmark tools
34   *
35   * @author James Strachan
36   * @version $Revision: 1.16 $
37   */
38  public class BenchmarkSupport {
39  
40      protected int connectionCount = 1;
41      protected int batch = 1000;
42      protected Destination destination;
43      protected boolean embeddedBroker = false;
44      private boolean topic = true;
45      private boolean durable = false;
46  
47      private ActiveMQConnectionFactory factory;
48      private String url;
49      protected String[] subjects;
50      private long time = System.currentTimeMillis();
51      private int counter;
52      private List resources = new ArrayList();
53      private NumberFormat formatter = NumberFormat.getInstance();
54      private SynchronizedInt connectionCounter = new SynchronizedInt(0);
55      private IdGenerator idGenerator = new IdGenerator();
56  
57      public BenchmarkSupport() {
58      }
59  
60      public void start() {
61          System.out.println("Using: " + connectionCount + " connection(s)");
62          subjects = new String[connectionCount];
63          for (int i = 0; i < connectionCount; i++) {
64              subjects[i] = "BENCHMARK.FEED" + i;
65          }
66          if (useTimerLoop()) {
67              Thread timer = new Thread() {
68                  public void run() {
69                      timerLoop();
70                  }
71              };
72              timer.start();
73          }
74      }
75  
76      public String getUrl() {
77          return url;
78      }
79  
80      public void setUrl(String url) {
81          this.url = url;
82      }
83  
84      public boolean isTopic() {
85          return topic;
86      }
87  
88      public void setTopic(boolean topic) {
89          this.topic = topic;
90      }
91  
92      public ActiveMQConnectionFactory getFactory() {
93          return factory;
94      }
95  
96      public void setFactory(ActiveMQConnectionFactory factory) {
97          this.factory = factory;
98      }
99  
100     public void setSubject(String subject) {
101         connectionCount = 1;
102         subjects = new String[]{subject};
103     }
104 
105     public boolean isDurable() {
106         return durable;
107     }
108 
109     public void setDurable(boolean durable) {
110         this.durable = durable;
111     }
112 
113     public boolean isEmbeddedBroker() {
114         return embeddedBroker;
115     }
116 
117     public void setEmbeddedBroker(boolean embeddedBroker) {
118         this.embeddedBroker = embeddedBroker;
119     }
120 
121     public int getConnectionCount() {
122         return connectionCount;
123     }
124 
125     public void setConnectionCount(int connectionCount) {
126         this.connectionCount = connectionCount;
127     }
128 
129     protected Session createSession() throws JMSException {
130         if (factory == null) {
131             factory = createFactory();
132         }
133         Connection connection = factory.createConnection();
134         int value = connectionCounter.increment();
135         System.out.println("Created connection: " + value + " = " + connection);
136         if (durable) {
137             connection.setClientID(idGenerator.generateId());
138         }
139         addResource(connection);
140         connection.start();
141 
142         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
143         addResource(session);
144         return session;
145     }
146 
147     protected ActiveMQConnectionFactory createFactory() {
148         ActiveMQConnectionFactory answer = new ActiveMQConnectionFactory(getUrl());
149         if (embeddedBroker) {
150             answer.setUseEmbeddedBroker(true);
151         }
152         return answer;
153     }
154 
155     protected synchronized void count(int count) {
156         counter += count;
157         /*
158         if (counter > batch) {
159             counter = 0;
160             long current = System.currentTimeMillis();
161             double end = current - time;
162             end /= 1000;
163             time = current;
164 
165             System.out.println("Processed " + batch + " messages in " + end + " (secs)");
166         }
167         */
168     }
169 
170     protected synchronized int resetCount() {
171         int answer = counter;
172         counter = 0;
173         return answer;
174     }
175 
176 
177     protected void timerLoop() {
178         int times = 0;
179         int total = 0;
180         int dumpVmStatsFrequency = 10;
181         Runtime runtime = Runtime.getRuntime();
182 
183         while (true) {
184             try {
185                 Thread.sleep(1000);
186             }
187             catch (InterruptedException e) {
188                 e.printStackTrace();
189             }
190             int processed = resetCount();
191             double average = 0;
192             if (processed > 0) {
193                 total += processed;
194                 times++;
195             }
196             if (times > 0) {
197                 average = total / times;
198             }
199 
200             long oldtime = time;
201             time = System.currentTimeMillis();
202 
203             double diff = time - oldtime;
204 
205             System.out.println(getClass().getName() + " Processed: " + processed + " messages this second. Average: " + average);
206 
207             if ((times % dumpVmStatsFrequency) == 0 && times != 0) {
208                 System.out.println("Used memory: " + asMemoryString(runtime.totalMemory() - runtime.freeMemory())
209                         + " Free memory: " + asMemoryString(runtime.freeMemory())
210                         + " Total memory: " + asMemoryString(runtime.totalMemory())
211                         + " Max memory: " + asMemoryString(runtime.maxMemory()));
212             }
213 
214         }
215     }
216 
217     protected String asMemoryString(long value) {
218         return formatter.format(value / 1024) + " K";
219     }
220 
221     protected boolean useTimerLoop() {
222         return true;
223     }
224 
225     protected Destination createDestination(Session session, String subject) throws JMSException {
226         if (topic) {
227             return session.createTopic(subject);
228         }
229         else {
230             return session.createQueue(subject);
231         }
232     }
233 
234     protected void addResource(Object resource) {
235         resources.add(resource);
236     }
237 
238     protected static boolean parseBoolean(String text) {
239         return text.equalsIgnoreCase("true");
240     }
241 }