View Javadoc

1   /***
2    *
3    * Copyright 2004 Hiram Chirino
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  package org.codehaus.activemq.journal.impl;
19  
20  import org.codehaus.activemq.journal.InvalidRecordLocationException;
21  
22  import java.io.File;
23  import java.io.IOException;
24  import java.text.NumberFormat;
25  import java.util.Iterator;
26  import java.util.LinkedList;
27  
28  /***
29   * Provides a logical view of many seperate files as one single long
30   * log file.  The seperate files that compose the LogFile are Segements
31   * of the LogFile.
32   * <p/>
33   * This class is not thread safe.
34   *
35   * @version $Revision: 1.7 $
36   */
37  final public class LogFile {
38  
39      static final public byte DATA_RECORD_TYPE = 1;
40      static final public byte MARK_RECORD_TYPE = 2;
41      static final private NumberFormat onlineLogNameFormat = NumberFormat.getNumberInstance();
42  
43      static {
44          onlineLogNameFormat.setMinimumIntegerDigits(3);
45          onlineLogNameFormat.setMaximumIntegerDigits(3);
46          onlineLogNameFormat.setGroupingUsed(false);
47          onlineLogNameFormat.setParseIntegerOnly(true);
48          onlineLogNameFormat.setMaximumFractionDigits(0);
49      }
50  
51      // Configuration variables.
52      private final File logDirectory;
53      private final int initialSegmentSize;
54      private boolean closed;
55  
56      // Keeps track of the online segments.
57      private final Segment segments[];
58      private final LinkedList activeSegments = new LinkedList();
59      private final LinkedList inactiveSegments = new LinkedList();;
60  
61      private byte markSegment = -1;
62      private final Mark lastMark = new Mark();
63      private byte appendSegment = -1;
64      private int lastSegmentId = -1;
65  
66      public LogFile(File logDirectory) throws IOException {
67          this(logDirectory, 4, 1024 * 1024 * 5);
68      }
69  
70      public LogFile(File logDirectory, int onlineSegmentCount, int initialSegmentSize) throws IOException {
71          this.logDirectory = logDirectory;
72          this.segments = new Segment[onlineSegmentCount];
73          this.initialSegmentSize = initialSegmentSize;
74          initialize();
75      }
76  
77      /***
78       * Creates/Loads the log segments.
79       *
80       * @throws IOException
81       */
82      private void initialize() throws IOException {
83  
84  // Create the log directory if it does not exist.
85          if (!logDirectory.exists()) {
86              if (!logDirectory.mkdirs()) {
87                  throw new IOException("Could not create directory: " + logDirectory);
88              }
89          }
90  
91          byte lastIndex = (byte) (segments.length - 1);
92          int lastSegmentId = 0;
93          Mark mark = null;
94          for (byte i = 0; i < segments.length; i++) {
95              // Initialize the segments..
96              segments[i] = new Segment(new File(logDirectory, "log-" + onlineLogNameFormat.format(i) + ".dat"), initialSegmentSize, i);
97              // Find the last segment out of the online segments.
98              if (segments[i].isActive()) {
99                  if (segments[i].getLastMark().sequenceId >= 0) {
100                 	markSegment = i;
101                     mark = segments[i].getLastMark();
102                 }
103                 if (segments[i].getId() > lastSegmentId) {
104                     lastSegmentId = segments[i].getId();
105                     lastIndex = i;
106                 }
107             }
108         }
109 
110 // Add all the segments to the activeSegments list.  Last segment will be
111 // the segment with the largest segment id.
112         byte i = nextSegmentIndex(lastIndex);
113         while (inactiveSegments.size() + activeSegments.size() < segments.length) {
114             Segment segment = segments[i];
115             if (segment.isActive()) {
116                 activeSegments.add(segment);
117             }
118             else {
119                 inactiveSegments.add(segment);
120             }
121             i = nextSegmentIndex(i);
122         }
123         if (mark != null) {
124             setMark(mark);
125         }
126 
127         if (activeSegments.size() == 0) {
128             // At least one segment needs to be active.
129             activateNextSegment();
130         }
131         else {
132 // Now make everything readonly except the last/append segment.
133             Segment lastSegment = (Segment) activeSegments.getLast();
134             lastSegment.setReadOnly(false);
135             appendSegment = lastSegment.getIndex();
136             for (Iterator iter = activeSegments.iterator(); iter.hasNext();) {
137                 Segment s = (Segment) iter.next();
138                 if (s != lastSegment) {
139                     s.setReadOnly(true);
140                 }
141             }
142         }
143         if (mark == null) {
144             // No mark had been set?  Make the mark the first active record.
145             Segment segment = (Segment) activeSegments.getFirst();
146             mark = new Mark();
147             mark.sequenceId = segment.getIndex();
148             mark.offsetId = Segment.SEGMENT_HEADER_SIZE;
149         }
150         lastMark.copy(mark);
151     }
152 
153 
154     /***
155      * @return
156      */
157     private int getNextSegmentId() {
158         return ++lastSegmentId;
159     }
160 
161 
162     public void close() throws IOException {
163         if (closed) {
164             return;
165         }
166         this.closed = true;
167         for (int i = 0; i < segments.length; i++) {
168             segments[i].close();
169         }
170     }
171 
172     private void setMark(Mark mark) throws IOException {
173         lastMark.copy(mark);
174         
175         // Find and deactivate active segments before the mark.
176         for (Iterator i = activeSegments.iterator(); i.hasNext();) {
177             Segment segment = (Segment) i.next();
178             if (segment.getLastSequenceId() < lastMark.sequenceId) {
179                 segment.reinitialize();
180                 i.remove();
181                 inactiveSegments.add(segment);
182             } else {
183             	markSegment = segment.getIndex();
184             	break;
185             }
186         }
187     }
188 
189     /***
190      * @param write
191      * @throws IOException
192      */
193     public void appendAndForce(BatchedWrite write) throws IOException {
194         Segment segment = segments[appendSegment];
195         segment.seek(segment.getAppendOffset());
196         segment.write(write);
197         if (write.getMark() != null) {
198             setMark(write.getMark());
199         }
200         segment.force();
201     }
202 
203 
204     public static class RecordInfo {
205         private final RecordLocationImpl location;
206         private final RecordHeader header;
207 
208         public RecordInfo(RecordLocationImpl location, RecordHeader header) {
209             this.location = location;
210             this.header = header;
211         }
212         
213         int getNextLocation() {
214         	return location.getSegmentOffset()+header.length+Record.RECORD_BASE_SIZE;
215         }
216         
217     }
218 
219     private RecordInfo readRecordInfo(RecordLocationImpl location) throws IOException, InvalidRecordLocationException {
220         if (0 > location.getSegmentIndex() || location.getSegmentIndex() > segments.length) {
221             throw new InvalidRecordLocationException("Invalid segment id.");
222         }
223 
224         Segment segment = segments[location.getSegmentIndex()];
225         segment.seek(location.getSegmentOffset());
226         
227         // There can be no record at the append offset.
228         if (segment.isAtAppendOffset()) {
229             throw new InvalidRecordLocationException("No record at end of log.");
230         }
231         
232         // Is there a record header at the seeked location?
233         try {
234             RecordHeader header = new RecordHeader();
235             segment.readRecordHeader(header);
236             return new RecordInfo(location, header);
237         }
238         catch (IOException e) {
239             throw new InvalidRecordLocationException("No record at found.");
240         }
241     }
242 
243     /***
244      * @param location
245      * @return
246      * @throws InvalidRecordLocationException
247      * @throws IOException
248      */
249     public RecordLocationImpl readRecordLocation(RecordLocationImpl location) throws IOException, InvalidRecordLocationException {
250         RecordInfo info = readRecordInfo(location);
251         return info.location.setSequence(info.header.sequenceId);        
252     }
253 
254     /***
255      * @param lastLocation
256      * @return
257      */
258     public RecordLocationImpl getNextDataRecordLocation(RecordLocationImpl lastLocation) throws IOException, InvalidRecordLocationException {
259         RecordInfo ri = readRecordInfo(lastLocation);
260         while (true) {
261             byte segmentIndex = ri.location.getSegmentIndex();
262             int offset = ri.getNextLocation();
263 
264             // Are we overflowing into next segment?
265             if (offset >= segments[segmentIndex].getAppendOffset()) {
266                 segmentIndex = nextActiveSegmentIndex(segmentIndex);
267                 if (segmentIndex < 0) {
268                     return null;
269                 }
270                 offset = Segment.SEGMENT_HEADER_SIZE;
271             }
272 
273             try {
274                 ri = readRecordInfo(ri.location.setSegmentIndexAndOffset(segmentIndex, offset));
275             }
276             catch (InvalidRecordLocationException e) {
277                 return null;
278             }
279 
280             // Is the next record the right record type?
281             if (ri.header.recordType == DATA_RECORD_TYPE) {
282                 return ri.location.setSequence(ri.header.sequenceId);
283             }
284             // No? go onto the next record.
285         }
286     }
287 
288     /***
289      * @param i
290      * @return -1 if no next segment is available.
291      */
292     private byte nextActiveSegmentIndex(byte i) {
293         byte rc = nextSegmentIndex(i);
294         return segments[rc].isActive() ? rc : -1;
295     }
296 
297     private byte nextSegmentIndex(byte i) {
298         i++;
299         if (i < segments.length) {
300             return i;
301         }
302         return 0;
303     }
304 
305     /***
306      * @param segmentIndex
307      * @param segmentOffset
308      * @return
309      * @throws IOException
310      */
311     public byte[] readData(int segmentIndex, int segmentOffset) throws IOException {
312         if (0 > segmentIndex || segmentIndex > segments.length) {
313             return null;
314         }
315 
316         Segment segment = segments[segmentIndex];
317         segment.seek(segmentOffset);
318 
319 // There can be no record at the append offset.
320         if (segment.isAtAppendOffset()) {
321             return null;
322         }
323 
324 // Is there a record header at the seeked location?
325         RecordHeader header = new RecordHeader();
326         segment.readRecordHeader(header);
327         byte data[] = new byte[header.length];
328         segment.read(data);
329 
330         return data;
331     }
332 
333     public int getInitialSegmentSize() {
334         return initialSegmentSize;
335     }
336 
337     public boolean isSegmentIndexActive(byte i) {
338         synchronized (segments[i]) {
339             return segments[i].isActive();
340         }
341     }
342 
343     public long getFirstSequenceIdOfSegementIndex(byte i) {
344         synchronized (segments[i]) {
345             return segments[i].getFirstSequenceId();
346         }
347     }
348 
349     synchronized public boolean canActivateNextSegment() {
350         return inactiveSegments.size() > 0;
351     }
352 
353     public byte getFirstActiveSegmentIndex() {
354         return ((Segment) activeSegments.getFirst()).getIndex();
355     }
356 
357     void activateNextSegment() throws IOException {
358         // The current append segment becomes readonly
359         if (appendSegment >= 0) {
360             segments[appendSegment].setReadOnly(true);
361         }
362         Segment next = (Segment) inactiveSegments.removeFirst();
363         activeSegments.addLast(next);
364         next.activate(getNextSegmentId());
365         appendSegment = next.getIndex();
366     }
367 
368     /***
369      * @return
370      */
371     public byte getAppendSegmentIndex() {
372         return appendSegment;
373     }
374 
375     /***
376      * @return
377      */
378     public int getAppendSegmentOffset() {
379         return segments[appendSegment].getAppendOffset();
380     }
381 
382     int getTotalSegements() {
383         return segments.length;
384     }
385 
386     /***
387      *
388      */
389     public long getLastSequenceId() {
390         return segments[appendSegment].getLastSequenceId();
391     }
392 
393     /***
394      * @return
395      */
396     synchronized public RecordLocationImpl getFirstRecordLocationOfSecondActiveSegment(byte fm) {
397         return ((Segment) activeSegments.get(1)).getFirstRecordLocation(fm);
398     }
399 
400     /***
401      * @return Returns the logDirectory.
402      */
403     public File getLogDirectory() {
404         return logDirectory;
405     }
406 	/***
407 	 * @return Returns the lastMark.
408 	 */
409 	public RecordLocationImpl getLastMarkedRecordLocation(byte fm) {
410 		if( markSegment==-1 )
411 			return null;
412 		return new RecordLocationImpl(fm, markSegment, lastMark.offsetId, lastMark.sequenceId);
413 	}
414 }