1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.journal.impl;
19
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.io.File;
23 import java.io.IOException;
24 import java.io.RandomAccessFile;
25 import java.nio.ByteBuffer;
26 import java.nio.channels.FileChannel;
27
28 /***
29 * Allows read/append access to a segment of a LogFile.
30 *
31 * @version $Revision: 1.2 $
32 */
33 public class Segment {
34 static final public int SEGMENT_HEADER_SIZE=0x0100;
35
36 /*** The File that holds the log data. */
37 private File file;
38 /*** Prefered size. The size that the segment file is set to when initilaized. */
39 private final int initialSize;
40 /*** The index of this Segment */
41 private final byte index;
42 /*** The id of the log file. */
43 private int id;
44 /*** The sequenceId of the first record */
45 private long firstSequenceId;
46 /*** The sequenceId of the last record */
47 private long lastSequenceId;
48 /*** Does it have live records in it? */
49 private boolean active=false;
50 /*** The location of the mark */
51 private final Mark lastMark = new Mark();
52 /*** The next free offset in the file. */
53 private int appendOffset=0;
54 /*** Used to access the file */
55 private RandomAccessFile randomAccessFile;
56 /*** Used to know where we seeked to last */
57
58 /*** Is the segment in readonly mode */
59 private boolean readOnly;
60 private FileChannel fileChannel;
61
62
63 public Segment(File file, int initialSize, byte index) throws IOException {
64 this.file = file;
65 this.initialSize = initialSize;
66 this.index = index;
67 boolean initializationNeeeded = !file.exists();
68 randomAccessFile = new RandomAccessFile(file, "rw");
69 randomAccessFile.seek(0);
70 fileChannel = randomAccessFile.getChannel();
71 if( initializationNeeeded ) {
72 reinitialize();
73 }
74 loadState();
75 }
76
77 public void seek( int newPos ) throws IOException {
78 randomAccessFile.seek(newPos);
79 }
80
81 private void loadState() throws IOException {
82 seek(0);
83 try {
84
85
86 read(randomAccessFile);
87 int firstCode = hashCode();
88 read(randomAccessFile);
89
90 if( firstCode==hashCode() && ( readOnly || !active ) ) {
91
92
93
94 return;
95 }
96 } catch (IOException e) {
97 }
98
99
100 firstSequenceId=-1;
101 lastSequenceId=-1;
102 readOnly=false;
103 active=true;
104
105 int offset=SEGMENT_HEADER_SIZE;
106 long fileSize = randomAccessFile.length();
107 RecordHeader header = new RecordHeader();
108 RecordFooter footer = new RecordFooter();
109 while( (offset+Record.RECORD_BASE_SIZE) < fileSize) {
110
111
112 seek(offset);
113 try {
114 header.readRecordHeader(randomAccessFile);
115 } catch (IOException e) {
116 break;
117 }
118
119
120 if( !header.isValid() )
121 break;
122 if( firstSequenceId==-1 ) {
123 firstSequenceId = header.sequenceId;
124 } else {
125
126 if( lastSequenceId >= header.sequenceId ) {
127 break;
128 }
129 }
130
131 if( offset+header.length+Record.RECORD_BASE_SIZE > fileSize) {
132 break;
133 }
134
135
136 byte data[]=null;
137 if( RecordFooter.isChecksumingEnabled() || header.recordType==LogFile.MARK_RECORD_TYPE ) {
138 data = new byte[header.length];
139 randomAccessFile.readFully(data);
140 }
141
142
143 seek(offset+header.length+RecordHeader.RECORD_HEADER_SIZE);
144 try {
145 footer.readRecordFooter(randomAccessFile);
146 } catch (IOException e) {
147 break;
148 }
149
150
151 if( !footer.matches(header) )
152 break;
153 if( RecordFooter.isChecksumingEnabled() ) {
154 if( !footer.matches(data) )
155 break;
156 }
157
158
159 if(header.recordType==LogFile.MARK_RECORD_TYPE) {
160 lastMark.readExternal(data);
161 }
162
163 lastSequenceId = header.sequenceId;
164 offset += header.length+Record.RECORD_BASE_SIZE;
165 appendOffset = offset;
166 }
167
168
169
170 storeState();
171 }
172
173 private void resize() throws IOException {
174 randomAccessFile.setLength(initialSize);
175 }
176
177 private void storeState() throws IOException {
178 int restorePos=getSeekPos();
179
180
181
182 seek(0);
183 writeHeader(randomAccessFile);
184 writeHeader(randomAccessFile);
185 force();
186 seek(restorePos);
187 }
188
189 private int getSeekPos() throws IOException {
190 return (int) randomAccessFile.getFilePointer();
191 }
192
193 public void force() throws IOException {
194 fileChannel.force(false);
195 }
196
197 public void reinitialize() throws IOException {
198 this.readOnly=true;
199 this.active=false;
200 this.firstSequenceId=-1;
201 this.appendOffset=-1;
202 this.id=-1;
203 this.lastSequenceId=-1;
204 this.lastMark.offsetId=-1;
205 this.lastMark.sequenceId=-1;
206 this.appendOffset=SEGMENT_HEADER_SIZE;
207 resize();
208 storeState();
209 }
210
211 public void setReadOnly(boolean enable) throws IOException {
212 if( !active )
213 throw new IllegalStateException("Not Active.");
214
215 this.readOnly=enable;
216 storeState();
217 }
218
219 public void activate(int id) throws IOException {
220 if( active )
221 throw new IllegalStateException("Allready Active.");
222
223 this.id=id;
224 this.readOnly=false;
225 this.active=true;
226 this.appendOffset=SEGMENT_HEADER_SIZE;
227 lastSequenceId=firstSequenceId=-1;
228 storeState();
229 seek(appendOffset);
230 }
231
232 public int hashCode() {
233 int rc = id;
234 rc ^= appendOffset;
235 rc ^= (int) (0xFFFFFFFF & firstSequenceId);
236 rc ^= (int) (0xFFFFFFFF & (firstSequenceId>>4) );
237 rc ^= (int) (0xFFFFFFFF & lastSequenceId);
238 rc ^= (int) (0xFFFFFFFF & (lastSequenceId>>4) );
239 if( active )
240 rc ^= 0xABD01212;
241 if( readOnly )
242 rc ^= 0x1F8ADC21;
243 return rc;
244 }
245
246 private void writeHeader(DataOutput f) throws IOException {
247 f.writeInt(id);
248 f.writeLong(firstSequenceId);
249 f.writeLong(lastSequenceId);
250 f.writeBoolean(active);
251 f.writeBoolean(readOnly);
252 f.writeLong(lastMark.sequenceId);
253 f.writeInt(lastMark.offsetId);
254 f.writeInt(appendOffset);
255 f.writeInt(hashCode());
256 }
257
258 private void read(DataInput f) throws IOException {
259 id = f.readInt();
260 firstSequenceId = f.readLong();
261 lastSequenceId = f.readLong();
262 active = f.readBoolean();
263 readOnly = f.readBoolean();
264 lastMark.sequenceId = f.readLong();
265 lastMark.offsetId = f.readInt();
266 appendOffset = f.readInt();
267
268
269 if( f.readInt()!=hashCode())
270 throw new IOException();
271 }
272
273 public int getId() {
274 return id;
275 }
276 public boolean isActive() {
277 return active;
278 }
279 public long getFirstSequenceId() {
280 return firstSequenceId;
281 }
282 public Mark getLastMark() {
283 return lastMark;
284 }
285 public long getLastSequenceId() {
286 return lastSequenceId;
287 }
288 public int getAppendOffset() {
289 return appendOffset;
290 }
291
292 public boolean isAtAppendOffset() throws IOException {
293 return appendOffset==getSeekPos();
294 }
295
296 /***
297 * @param write
298 * @throws IOException
299 */
300 public void write(BatchedWrite write) throws IOException {
301
302 ByteBuffer byteBuffer = write.getByteBuffer();
303 byteBuffer.flip();
304
305 write(byteBuffer);
306
307 if( firstSequenceId==-1 )
308 firstSequenceId=write.getFirstSequenceId();
309 lastSequenceId=write.getLastSequenceId();
310
311 }
312
313 public void write(ByteBuffer byteBuffer) throws IOException {
314 if( !active )
315 throw new IllegalStateException("Segment is not active. Writes are not allowed");
316 if( readOnly )
317 throw new IllegalStateException("Segment has been marked Read Only. Writes are not allowed");
318 if( !isAtAppendOffset() )
319 throw new IllegalStateException("Segment not at append location.");
320
321 int writeLength = byteBuffer.remaining();
322 while( byteBuffer.remaining() > 0 ) {
323 fileChannel.write(byteBuffer);
324 }
325
326
327 appendOffset+=writeLength;
328 }
329
330 public void readRecordHeader(RecordHeader seekedRecordHeader) throws IOException {
331 seekedRecordHeader.readRecordHeader(randomAccessFile);
332 }
333
334 public void close() throws IOException {
335 this.randomAccessFile.close();
336 }
337
338 public void read(byte[] answer) throws IOException {
339 randomAccessFile.readFully(answer);
340 }
341
342 /***
343 * @return
344 */
345 public boolean isReadOnly() {
346 return readOnly;
347 }
348
349 public byte getIndex() {
350 return index;
351 }
352
353 public String toString() {
354 return "Segment:"+index+":"+id;
355 }
356
357 /***
358 * @return
359 */
360 public RecordLocationImpl getFirstRecordLocation(byte fm) {
361 return new RecordLocationImpl(fm, index, SEGMENT_HEADER_SIZE, firstSequenceId);
362 }
363
364 }