1   /*** 
2    * 
3    * Copyright 2004 Hiram Chirino
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.journal;
19  
20  import java.io.File;
21  import java.io.FileOutputStream;
22  import java.io.IOException;
23  import java.io.PrintWriter;
24  import java.util.Random;
25  
26  /***
27   * Provides the base class uses to run performance tests against a Journal.
28   * Should be subclassed to customize for specific journal implementation.
29   * 
30   * @version $Revision: 1.6 $
31   */
32  abstract public class JournalPerfToolSupport implements JournalEventListener {
33  
34  	private JournalStatsFilter journal;
35  	private Random random = new Random();
36  	private byte data[];
37  	private int workerCount=0;	
38  	private PrintWriter statWriter;
39  	// Performance test Options
40  	
41  	// The output goes here:
42  	protected File journalDirectory = new File("journal-logs");
43  	protected File statCSVFile = new File("stats.csv");;
44  
45  	// Controls how often we start a new batch of workers.
46  	protected int workerIncrement=20;
47  	protected long incrementDelay=1000*20;
48  	protected boolean verbose=true;
49  
50  	// Worker configuration.
51  	protected int recordSize=1024;
52  	protected int syncFrequency=15;	
53  	protected int workerThinkTime=100;
54  
55      private final class Worker implements Runnable {
56  		public void run() {
57  			int i=random.nextInt()%syncFrequency;
58  			while(true) {
59  				boolean sync=false;
60  				
61  				if( syncFrequency>=0 && (i%syncFrequency)==0 ) {
62  					sync=true;
63  				}				
64  				try {
65  					journal.write(data, sync);
66  					Thread.sleep(workerThinkTime);
67  				} catch (Exception e) {
68  					e.printStackTrace();
69  					return;
70  				}
71  				i++;						
72  			}					
73  		}
74  	}	
75  	
76      /***
77       * @throws IOException
78  	 * 
79  	 */
80  	protected void exec() throws Exception {
81  		
82  		System.out.println("Client threads write records using: Record Size: "+recordSize+", Sync Frequency: "+syncFrequency+", Worker Think Time: "+workerThinkTime);
83  
84  		// Create the record and fill it with some values.
85  		data = new byte[recordSize];
86  		for (int i = 0; i < data.length; i++) {
87  			data[i] = (byte)i;
88  		}
89  		
90  		if( statCSVFile!=null ) {
91  			statWriter = new PrintWriter(new FileOutputStream(statCSVFile));
92  			statWriter.println("Threads,Throughput (k/s),Forcd write latency (ms),Throughput (records/s)");
93  		}
94  		
95          if( journalDirectory.exists() ) {
96          	deleteDir(journalDirectory);
97          }		
98          journal = new JournalStatsFilter(createJournal()).enableDetailedStats(verbose);
99          journal.setJournalEventListener(this);
100 		
101         try {        	
102         	
103         	// Wait a little to see the worker affect the stats.
104         	// Increment the number of workers every few seconds.
105         	while(true) {
106         		System.out.println("Starting "+workerIncrement+" Workers...");
107             	for(int i=0;i <workerIncrement;i++) {
108                 	new Thread(new Worker()).start();
109                 	workerCount++;
110             	}
111             				
112             	// Wait a little to see the worker affect the stats.
113             	System.out.println("Waiting "+(incrementDelay/1000)+" seconds before next Stat sample.");
114             	Thread.sleep(incrementDelay);
115             	displayStats();
116             	journal.reset();
117         	}
118         	
119         	
120         } finally {
121         	journal.close();
122         }
123 	}
124 
125 	private void displayStats() {		
126 		System.out.println("Stats at "+workerCount+" workers.");
127 		System.out.println(journal);        	
128 		if( statWriter!= null ) {
129 			statWriter.println(""+workerCount+","+journal.getThroughputKps()+","+journal.getAvgSyncedLatencyMs()+","+journal.getThroughputRps());
130 			statWriter.flush();
131 		}
132 	}
133 
134 	/***
135 	 * @return
136 	 */
137 	abstract public Journal createJournal() throws Exception;
138 
139 	static private void deleteDir(File f) {
140 		File[] files = f.listFiles();
141 		for (int i = 0; i < files.length; i++) {
142 			File file = files[i];
143 			file.delete();
144 		}
145 		f.delete();
146 	}
147     
148     
149 	public void overflowNotification(RecordLocation safeLocation) {
150 		try {
151 			System.out.println("Mark set: "+safeLocation);
152 			journal.setMark(safeLocation, false);
153 		} catch (InvalidRecordLocationException e) {
154 			e.printStackTrace();
155 		} catch (IOException e) {
156 			e.printStackTrace();
157 		}		
158 	}
159 }