1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.journal;
19
20 import java.io.IOException;
21 import java.io.PrintWriter;
22 import java.io.StringWriter;
23
24 import org.codehaus.activemq.management.CountStatisticImpl;
25 import org.codehaus.activemq.management.TimeStatisticImpl;
26 import org.codehaus.activemq.util.IndentPrinter;
27
28 /***
29 * A Journal filter that captures performance statistics of the filtered Journal.
30 *
31 * @version $Revision: 1.2 $
32 */
33 public class JournalStatsFilter implements Journal {
34
35 private final TimeStatisticImpl writeLatency = new TimeStatisticImpl("writeLatency", "The amount of time that is spent waiting for a record to be written to the Journal");
36 private final CountStatisticImpl writeRecordsCounter = new CountStatisticImpl("writeRecordsCounter","The number of records that have been written by the Journal");
37 private final CountStatisticImpl writeBytesCounter = new CountStatisticImpl("writeBytesCounter","The number of bytes that have been written by the Journal");
38 private final TimeStatisticImpl synchedWriteLatency = new TimeStatisticImpl(writeLatency, "synchedWriteLatency", "The amount of time that is spent waiting for a synch record to be written to the Journal");
39 private final TimeStatisticImpl unsynchedWriteLatency = new TimeStatisticImpl(writeLatency, "unsynchedWriteLatency", "The amount of time that is spent waiting for a non synch record to be written to the Journal");
40 private final TimeStatisticImpl readLatency = new TimeStatisticImpl("readLatency", "The amount of time that is spent waiting for a record to be read from the Journal");
41 private final CountStatisticImpl readBytesCounter = new CountStatisticImpl("readBytesCounter","The number of bytes that have been read by the Journal");
42
43 private final Journal next;
44 private boolean detailedStats;
45
46
47 /***
48 * Creates a JournalStatsFilter that captures performance information of <code>next</next>.
49 * @param next
50 */
51 public JournalStatsFilter(Journal next) {
52 this.next = next;
53 }
54
55 /***
56 * @see org.codehaus.activemq.journal.Journal#write(byte[], boolean)
57 */
58 public RecordLocation write(byte[] data, boolean sync) throws IOException {
59
60 long start = System.currentTimeMillis();
61 RecordLocation answer = next.write(data, sync);
62 long end = System.currentTimeMillis();
63
64 writeRecordsCounter.increment();
65 writeBytesCounter.add(data.length);
66 if( sync )
67 synchedWriteLatency.addTime(end-start);
68 else
69 unsynchedWriteLatency.addTime(end-start);
70 return answer;
71 }
72
73 /***
74 * @see org.codehaus.activemq.journal.Journal#read(org.codehaus.activemq.journal.RecordLocation)
75 */
76 public byte[] read(RecordLocation location)
77 throws InvalidRecordLocationException, IOException {
78
79 long start = System.currentTimeMillis();
80 byte answer[] = next.read(location);
81 long end = System.currentTimeMillis();
82
83 readBytesCounter.add(answer.length);
84 readLatency.addTime(end-start);
85 return answer;
86 }
87
88 /***
89 * @see org.codehaus.activemq.journal.Journal#setMark(org.codehaus.activemq.journal.RecordLocation, boolean)
90 */
91 public void setMark(RecordLocation recordLocator, boolean force)
92 throws InvalidRecordLocationException, IOException {
93 next.setMark(recordLocator, force);
94 }
95
96 /***
97 * @see org.codehaus.activemq.journal.Journal#getMark()
98 */
99 public RecordLocation getMark() {
100 return next.getMark();
101 }
102
103 /***
104 * @see org.codehaus.activemq.journal.Journal#close()
105 */
106 public void close() throws IOException {
107 next.close();
108 }
109
110 /***
111 * @see org.codehaus.activemq.journal.Journal#setJournalEventListener(org.codehaus.activemq.journal.JournalEventListener)
112 */
113 public void setJournalEventListener(JournalEventListener eventListener) {
114 next.setJournalEventListener(eventListener);
115 }
116
117 /***
118 * @see org.codehaus.activemq.journal.Journal#getNextRecordLocation(org.codehaus.activemq.journal.RecordLocation)
119 */
120 public RecordLocation getNextRecordLocation(RecordLocation lastLocation)
121 throws IOException, InvalidRecordLocationException {
122 return next.getNextRecordLocation(lastLocation);
123 }
124
125 /***
126 * Writes the gathered statistics to the <code>out</code> object.
127 *
128 * @param out
129 */
130 public void dump(IndentPrinter out) {
131 out.printIndent();
132 out.println("Journal Stats {");
133 out.incrementIndent();
134 out.printIndent();
135 out.println("Throughput : "+ getThroughputKps() +" k/s and " + getThroughputRps() +" records/s" );
136 out.printIndent();
137 out.println("Latency with force : "+ getAvgSyncedLatencyMs() +" ms" );
138 out.printIndent();
139 out.println("Latency without force: "+ getAvgUnSyncedLatencyMs() +" ms" );
140
141 out.printIndent();
142 out.println("Raw Stats {");
143 out.incrementIndent();
144
145 out.printIndent();
146 out.println(writeRecordsCounter);
147 out.printIndent();
148 out.println(writeBytesCounter);
149 out.printIndent();
150 out.println(writeLatency);
151 out.incrementIndent();
152 out.printIndent();
153 out.println(synchedWriteLatency);
154 out.printIndent();
155 out.println(unsynchedWriteLatency);
156 out.decrementIndent();
157
158 out.printIndent();
159 out.println(readBytesCounter);
160
161 out.printIndent();
162 out.println(readLatency);
163 out.decrementIndent();
164 out.printIndent();
165 out.println("}");
166
167 out.decrementIndent();
168 out.printIndent();
169 out.println("}");
170
171 }
172
173 /***
174 * Dumps the stats to a String.
175 *
176 * @see java.lang.Object#toString()
177 */
178 public String toString() {
179 if( detailedStats ) {
180 StringWriter w = new StringWriter();
181 PrintWriter pw = new PrintWriter(w);
182 dump(new IndentPrinter(pw, " "));
183 return w.getBuffer().toString();
184 } else {
185 StringWriter w = new StringWriter();
186 PrintWriter pw = new PrintWriter(w);
187 IndentPrinter out = new IndentPrinter(pw, " ");
188 out.println("Throughput : "+ getThroughputKps() +" k/s and " + getThroughputRps() +" records/s");
189 out.printIndent();
190 out.println("Latency with force : "+getAvgSyncedLatencyMs()+" ms" );
191 out.printIndent();
192 out.println("Latency without force: "+getAvgUnSyncedLatencyMs()+" ms" );
193 return w.getBuffer().toString();
194 }
195 }
196
197 /***
198 * @param detailedStats true if details stats should be displayed by <code>toString()</code> and <code>dump</code>
199 * @return
200 */
201 public JournalStatsFilter enableDetailedStats(boolean detailedStats) {
202 this.detailedStats = detailedStats;
203 return this;
204 }
205
206 /***
207 * Gets the average throughput in k/s.
208 *
209 * @return the average throughput in k/s.
210 */
211 public double getThroughputKps() {
212 long totalTime = writeBytesCounter.getLastSampleTime()-writeBytesCounter.getStartTime();
213 return (((double)writeBytesCounter.getCount()/(double)totalTime)/(double)1024)*1000;
214 }
215
216 /***
217 * Gets the average throughput in records/s.
218 *
219 * @return the average throughput in records/s.
220 */
221 public double getThroughputRps() {
222 long totalTime = writeRecordsCounter.getLastSampleTime()-writeRecordsCounter.getStartTime();
223 return (((double)writeRecordsCounter.getCount()/(double)totalTime))*1000;
224 }
225
226 /***
227 * Gets the average number of writes done per second
228 *
229 * @return the average number of writes in w/s.
230 */
231 public double getWritesPerSecond() {
232 return writeLatency.getAveragePerSecond();
233 }
234
235 /***
236 * Gets the average sync write latency in ms.
237 *
238 * @return the average sync write latency in ms.
239 */
240 public double getAvgSyncedLatencyMs() {
241 return synchedWriteLatency.getAverageTime();
242 }
243
244 /***
245 * Gets the average non sync write latency in ms.
246 *
247 * @return the average non sync write latency in ms.
248 */
249 public double getAvgUnSyncedLatencyMs() {
250 return unsynchedWriteLatency.getAverageTime();
251 }
252
253 /***
254 * Resets the stats sample.
255 */
256 public void reset() {
257 writeLatency.reset();
258 writeBytesCounter.reset();
259 writeRecordsCounter.reset();
260 synchedWriteLatency.reset();
261 unsynchedWriteLatency.reset();
262 readLatency.reset();
263 readBytesCounter.reset();
264 }
265 }