1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.journal;
19
20 import java.io.File;
21 import java.io.FileOutputStream;
22 import java.io.IOException;
23 import java.io.PrintWriter;
24 import java.util.Random;
25
26 /***
27 * Provides the base class uses to run performance tests against a Journal.
28 * Should be subclassed to customize for specific journal implementation.
29 *
30 * @version $Revision: 1.6 $
31 */
32 abstract public class JournalPerfToolSupport implements JournalEventListener {
33
34 private JournalStatsFilter journal;
35 private Random random = new Random();
36 private byte data[];
37 private int workerCount=0;
38 private PrintWriter statWriter;
39
40
41
42 protected File journalDirectory = new File("journal-logs");
43 protected File statCSVFile = new File("stats.csv");;
44
45
46 protected int workerIncrement=20;
47 protected long incrementDelay=1000*20;
48 protected boolean verbose=true;
49
50
51 protected int recordSize=1024;
52 protected int syncFrequency=15;
53 protected int workerThinkTime=100;
54
55 private final class Worker implements Runnable {
56 public void run() {
57 int i=random.nextInt()%syncFrequency;
58 while(true) {
59 boolean sync=false;
60
61 if( syncFrequency>=0 && (i%syncFrequency)==0 ) {
62 sync=true;
63 }
64 try {
65 journal.write(data, sync);
66 Thread.sleep(workerThinkTime);
67 } catch (Exception e) {
68 e.printStackTrace();
69 return;
70 }
71 i++;
72 }
73 }
74 }
75
76 /***
77 * @throws IOException
78 *
79 */
80 protected void exec() throws Exception {
81
82 System.out.println("Client threads write records using: Record Size: "+recordSize+", Sync Frequency: "+syncFrequency+", Worker Think Time: "+workerThinkTime);
83
84
85 data = new byte[recordSize];
86 for (int i = 0; i < data.length; i++) {
87 data[i] = (byte)i;
88 }
89
90 if( statCSVFile!=null ) {
91 statWriter = new PrintWriter(new FileOutputStream(statCSVFile));
92 statWriter.println("Threads,Throughput (k/s),Forcd write latency (ms),Throughput (records/s)");
93 }
94
95 if( journalDirectory.exists() ) {
96 deleteDir(journalDirectory);
97 }
98 journal = new JournalStatsFilter(createJournal()).enableDetailedStats(verbose);
99 journal.setJournalEventListener(this);
100
101 try {
102
103
104
105 while(true) {
106 System.out.println("Starting "+workerIncrement+" Workers...");
107 for(int i=0;i <workerIncrement;i++) {
108 new Thread(new Worker()).start();
109 workerCount++;
110 }
111
112
113 System.out.println("Waiting "+(incrementDelay/1000)+" seconds before next Stat sample.");
114 Thread.sleep(incrementDelay);
115 displayStats();
116 journal.reset();
117 }
118
119
120 } finally {
121 journal.close();
122 }
123 }
124
125 private void displayStats() {
126 System.out.println("Stats at "+workerCount+" workers.");
127 System.out.println(journal);
128 if( statWriter!= null ) {
129 statWriter.println(""+workerCount+","+journal.getThroughputKps()+","+journal.getAvgSyncedLatencyMs()+","+journal.getThroughputRps());
130 statWriter.flush();
131 }
132 }
133
134 /***
135 * @return
136 */
137 abstract public Journal createJournal() throws Exception;
138
139 static private void deleteDir(File f) {
140 File[] files = f.listFiles();
141 for (int i = 0; i < files.length; i++) {
142 File file = files[i];
143 file.delete();
144 }
145 f.delete();
146 }
147
148
149 public void overflowNotification(RecordLocation safeLocation) {
150 try {
151 System.out.println("Mark set: "+safeLocation);
152 journal.setMark(safeLocation, false);
153 } catch (InvalidRecordLocationException e) {
154 e.printStackTrace();
155 } catch (IOException e) {
156 e.printStackTrace();
157 }
158 }
159 }