1 /*** 2 * 3 * Copyright 2004 Hiram Chirino 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 **/ 18 package org.codehaus.activemq.journal.impl; 19 20 import org.codehaus.activemq.journal.InvalidRecordLocationException; 21 import org.codehaus.activemq.journal.Journal; 22 import org.codehaus.activemq.journal.JournalEventListener; 23 import org.codehaus.activemq.journal.RecordLocation; 24 import org.codehaus.activemq.util.LongSequenceGenerator; 25 26 import java.io.File; 27 import java.io.IOException; 28 import java.io.InterruptedIOException; 29 30 /*** 31 * A high speed Journal implementation. Inspired by the ideas of the 32 * <a href="http://howl.objectweb.org/">Howl</a> project but tailored to the needs 33 * of ActiveMQ. 34 * <p/> 35 * This Journal provides the following features: 36 * <ul> 37 * <li> Conncurrent writes are batched into a single write/force done by a background thread.</li> 38 * <li> Uses preallocated logs to avoid disk fragmentation and performance degregation. </li> 39 * <li> The number and size of the preallocated logs are configurable. </li> 40 * <li> Uses direct ByteBuffers to write data to log files. </li> 41 * <li> Allows logs to grow in case of an overflow condition so that overflow exceptions are not 42 * not thrown. Grown logs that are inactivated (due to a new mark) are resized to 43 * thier original size.</li> 44 * <li> No limit on the size of the record written to the journal</li> 45 * <li> Should be possible to extend so that multiple physical disk are used concurrently to 46 * increase throughput and decrease latency.</li> 47 * </ul> 48 * <p/> 49 * This class is a thing wrapper around a LogFileManager. It may be possible to extends this class so 50 * that it manages multiple LogFileManager objects to allow logging to multiple physical disks. 51 * 52 * @version $Revision: 1.3 $ 53 */ 54 public class JournalImpl implements Journal { 55 56 LogFileManager manager; 57 LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator(); 58 59 public JournalImpl() throws IOException { 60 this(new File("logs")); 61 } 62 63 public JournalImpl(File logDirectory) throws IOException { 64 manager = new LogFileManager((byte) 0, sequenceGenerator, new LogFile(logDirectory)); 65 initSequenceId(); 66 } 67 68 public JournalImpl(File logDirectory, int segments, int segmentSize) throws IOException { 69 manager = new LogFileManager((byte) 0, sequenceGenerator, new LogFile(logDirectory, segments, segmentSize)); 70 initSequenceId(); 71 } 72 73 74 private void initSequenceId() { 75 long id = manager.getLastSequenceId(); 76 if (id == -1) { 77 id = 0; 78 } 79 sequenceGenerator.setLastSequenceId(id); 80 } 81 82 /*** 83 * @throws IOException 84 * @see org.codehaus.activemq.journal.Journal#write(byte[], boolean) 85 */ 86 public RecordLocation write(byte[] data, boolean sync) throws IOException { 87 return manager.write(data, sync); 88 } 89 90 /*** 91 * @see org.codehaus.activemq.journal.Journal#setMark(org.codehaus.activemq.journal.RecordLocation, 92 * boolean) 93 */ 94 public void setMark(RecordLocation recordLocator, boolean force) 95 throws InvalidRecordLocationException, IOException { 96 RecordLocationImpl rl = (RecordLocationImpl) recordLocator; 97 try { 98 manager.setMark(rl, force); 99 } 100 catch (InterruptedException e) { 101 throw new InterruptedIOException(); 102 } 103 } 104 105 /*** 106 * @see org.codehaus.activemq.journal.Journal#getMark() 107 */ 108 public RecordLocation getMark() { 109 return manager.getMark(); 110 } 111 112 /*** 113 * @see org.codehaus.activemq.journal.Journal#close() 114 */ 115 public void close() throws IOException { 116 manager.close(); 117 } 118 119 /*** 120 * @see org.codehaus.activemq.journal.Journal#setJournalEventListener(org.codehaus.activemq.journal.JournalEventListener) 121 */ 122 public void setJournalEventListener(JournalEventListener eventListener) { 123 manager.setJournalEventListener(eventListener); 124 } 125 126 /*** 127 * @throws InvalidRecordLocationException 128 * @throws IOException 129 * @see org.codehaus.activemq.journal.Journal#getNextRecordLocation(org.codehaus.activemq.journal.RecordLocation) 130 */ 131 public RecordLocation getNextRecordLocation(RecordLocation lastLocation) 132 throws IOException, InvalidRecordLocationException { 133 134 RecordLocationImpl rl = (RecordLocationImpl) lastLocation; 135 return manager.getNextRecordLocation(rl); 136 } 137 138 /*** 139 * @see org.codehaus.activemq.journal.Journal#getNextRecordLocation(org.codehaus.activemq.journal.RecordLocation) 140 */ 141 public byte[] read(RecordLocation location) 142 throws InvalidRecordLocationException, IOException { 143 144 RecordLocationImpl rl = (RecordLocationImpl) location; 145 return manager.read(rl); 146 } 147 148 /*** 149 * @see java.lang.Object#toString() 150 */ 151 public String toString() { 152 return "JournalImpl at '" + manager.getLogDirectory() + "' using " + manager.getTotalSegements() + " x " + ((float) manager.getInitialSegmentSize() / ((float) 1024 * 1024)) + " Meg log files."; 153 } 154 }