|
|||||||||||||||||||
| 30 day Evaluation Version distributed via the Maven Jar Repository. Clover is not free. You have 30 days to evaluate it. Please visit http://www.thecortex.net/clover to obtain a licensed version of Clover | |||||||||||||||||||
| Source file | Conditionals | Statements | Methods | TOTAL | |||||||||||||||
| JournalImpl.java | 42.1% | 57.6% | 73.1% | 56.6% |
|
||||||||||||||
| 1 |
/**
|
|
| 2 |
*
|
|
| 3 |
* Copyright 2004 Hiram Chirino
|
|
| 4 |
*
|
|
| 5 |
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
| 6 |
* you may not use this file except in compliance with the License.
|
|
| 7 |
* You may obtain a copy of the License at
|
|
| 8 |
*
|
|
| 9 |
* http://www.apache.org/licenses/LICENSE-2.0
|
|
| 10 |
*
|
|
| 11 |
* Unless required by applicable law or agreed to in writing, software
|
|
| 12 |
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
| 13 |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
| 14 |
* See the License for the specific language governing permissions and
|
|
| 15 |
* limitations under the License.
|
|
| 16 |
*
|
|
| 17 |
**/
|
|
| 18 |
package org.activeio.journal.active;
|
|
| 19 |
|
|
| 20 |
import java.io.File;
|
|
| 21 |
import java.io.IOException;
|
|
| 22 |
import java.io.InterruptedIOException;
|
|
| 23 |
import java.lang.reflect.InvocationTargetException;
|
|
| 24 |
|
|
| 25 |
import org.activeio.Disposable;
|
|
| 26 |
import org.activeio.Packet;
|
|
| 27 |
import org.activeio.journal.InvalidRecordLocationException;
|
|
| 28 |
import org.activeio.journal.Journal;
|
|
| 29 |
import org.activeio.journal.JournalEventListener;
|
|
| 30 |
import org.activeio.journal.RecordLocation;
|
|
| 31 |
import org.activeio.packet.ByteArrayPacket;
|
|
| 32 |
import org.activeio.packet.ByteBufferPacketPool;
|
|
| 33 |
|
|
| 34 |
import EDU.oswego.cs.dl.util.concurrent.FutureResult;
|
|
| 35 |
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
|
|
| 36 |
import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
|
|
| 37 |
|
|
| 38 |
/**
|
|
| 39 |
* A high speed Journal implementation. Inspired by the ideas of the <a
|
|
| 40 |
* href="http://howl.objectweb.org/">Howl </a> project but tailored to the needs
|
|
| 41 |
* of ActiveMQ. <p/>This Journal provides the following features:
|
|
| 42 |
* <ul>
|
|
| 43 |
* <li>Conncurrent writes are batched into a single write/force done by a
|
|
| 44 |
* background thread.</li>
|
|
| 45 |
* <li>Uses preallocated logs to avoid disk fragmentation and performance
|
|
| 46 |
* degregation.</li>
|
|
| 47 |
* <li>The number and size of the preallocated logs are configurable.</li>
|
|
| 48 |
* <li>Uses direct ByteBuffers to write data to log files.</li>
|
|
| 49 |
* <li>Allows logs to grow in case of an overflow condition so that overflow
|
|
| 50 |
* exceptions are not not thrown. Grown logs that are inactivated (due to a new
|
|
| 51 |
* mark) are resized to thier original size.</li>
|
|
| 52 |
* <li>No limit on the size of the record written to the journal</li>
|
|
| 53 |
* <li>Should be possible to extend so that multiple physical disk are used
|
|
| 54 |
* concurrently to increase throughput and decrease latency.</li>
|
|
| 55 |
* </ul>
|
|
| 56 |
* <p/>
|
|
| 57 |
*
|
|
| 58 |
* @version $Revision: 1.1 $
|
|
| 59 |
*/
|
|
| 60 |
final public class JournalImpl implements Journal, Disposable { |
|
| 61 |
|
|
| 62 |
public static final int DEFAULT_POOL_SIZE = Integer.parseInt(System.getProperty("org.activeio.journal.active.DefaultPoolSize", ""+(5))); |
|
| 63 |
public static final int DEFAULT_PACKET_SIZE = Integer.parseInt(System.getProperty("org.activeio.journal.active.DefaultPacketSize", ""+(1024*1024*4))); |
|
| 64 |
|
|
| 65 |
static final private int OVERFLOW_RENOTIFICATION_DELAY = 500; |
|
| 66 |
|
|
| 67 |
private boolean disposed = false; |
|
| 68 |
|
|
| 69 |
// The id of the current log file that is being filled.
|
|
| 70 |
private int appendLogFileId = 0; |
|
| 71 |
|
|
| 72 |
// The offset in the current log file that is being filled.
|
|
| 73 |
private int appendLogFileOffset = 0; |
|
| 74 |
|
|
| 75 |
// Used to batch writes together.
|
|
| 76 |
private BatchedWrite pendingBatchWrite;
|
|
| 77 |
|
|
| 78 |
private Location lastMarkedLocation;
|
|
| 79 |
private LogFileManager file;
|
|
| 80 |
private QueuedExecutor executor;
|
|
| 81 |
private int rolloverFence; |
|
| 82 |
private JournalEventListener eventListener;
|
|
| 83 |
private ByteBufferPacketPool packetPool;
|
|
| 84 |
private long overflowNotificationTime = System.currentTimeMillis(); |
|
| 85 |
private Packet markPacket = new ByteArrayPacket(new byte[Location.SERIALIZED_SIZE]); |
|
| 86 |
|
|
| 87 | 0 |
public JournalImpl(File logDirectory) throws IOException { |
| 88 | 0 |
this(new LogFileManager(logDirectory)); |
| 89 |
} |
|
| 90 |
|
|
| 91 | 4 |
public JournalImpl(File logDirectory, int logFileCount, int logFileSize) throws IOException { |
| 92 | 4 |
this(new LogFileManager(logDirectory, logFileCount, logFileSize)); |
| 93 |
} |
|
| 94 |
|
|
| 95 | 4 |
public JournalImpl(LogFileManager logFile) {
|
| 96 | 4 |
this.file = logFile;
|
| 97 | 4 |
this.packetPool = new ByteBufferPacketPool(DEFAULT_POOL_SIZE, DEFAULT_PACKET_SIZE); |
| 98 | 4 |
this.executor = new QueuedExecutor(); |
| 99 | 4 |
this.executor.setThreadFactory(new ThreadFactory() { |
| 100 | 2 |
public Thread newThread(Runnable runnable) {
|
| 101 | 2 |
Thread answer = new Thread(runnable, "Journal Writter"); |
| 102 | 2 |
answer.setPriority(Thread.MAX_PRIORITY); |
| 103 | 2 |
answer.setDaemon(true);
|
| 104 | 2 |
return answer;
|
| 105 |
} |
|
| 106 |
}); |
|
| 107 |
|
|
| 108 | 4 |
lastMarkedLocation = file.getLastMarkedRecordLocation(); |
| 109 | 4 |
Location nextAppendLocation = file.getNextAppendLocation(); |
| 110 | 4 |
appendLogFileId = nextAppendLocation.getLogFileId(); |
| 111 | 4 |
appendLogFileOffset = nextAppendLocation.getLogFileOffset(); |
| 112 |
|
|
| 113 | 4 |
rolloverFence = (file.getInitialLogFileSize() / 10) * 9; |
| 114 |
} |
|
| 115 |
|
|
| 116 | 6 |
public RecordLocation write(Packet data, boolean sync) throws IOException { |
| 117 | 6 |
return write(LogFileManager.DATA_RECORD_TYPE, data, sync, null); |
| 118 |
} |
|
| 119 |
|
|
| 120 | 6 |
private Location write(byte recordType, Packet data, boolean sync, Location mark) throws IOException { |
| 121 | 6 |
try {
|
| 122 | 6 |
Location location; |
| 123 | 6 |
BatchedWrite writeCommand; |
| 124 |
|
|
| 125 | 6 |
Record record = new Record(recordType, data, mark);
|
| 126 |
|
|
| 127 |
// The following synchronized block is the bottle neck of the journal. Make this
|
|
| 128 |
// code faster and the journal should speed up.
|
|
| 129 | 6 |
synchronized (this) { |
| 130 | 6 |
if (disposed) {
|
| 131 | 0 |
throw new IOException("Journal has been closed."); |
| 132 |
} |
|
| 133 |
|
|
| 134 |
// Create our record
|
|
| 135 | 6 |
location = new Location(appendLogFileId, appendLogFileOffset);
|
| 136 | 6 |
record.setLocation(location); |
| 137 |
|
|
| 138 |
// Piggy back the packet on the pending write batch.
|
|
| 139 | 6 |
writeCommand = addToPendingWriteBatch(record, mark, sync); |
| 140 |
|
|
| 141 |
// Update where the next record will land.
|
|
| 142 | 6 |
appendLogFileOffset += data.limit() + Record.RECORD_BASE_SIZE; |
| 143 | 6 |
rolloverCheck(); |
| 144 |
} |
|
| 145 |
|
|
| 146 | 6 |
if (sync) {
|
| 147 | 0 |
writeCommand.waitForForce(); |
| 148 |
} |
|
| 149 |
|
|
| 150 | 6 |
return location;
|
| 151 |
} catch (IOException e) {
|
|
| 152 | 0 |
throw e;
|
| 153 |
} catch (InterruptedException e) {
|
|
| 154 | 0 |
throw (IOException) new InterruptedIOException().initCause(e); |
| 155 |
} catch (Throwable e) {
|
|
| 156 | 0 |
throw (IOException) new IOException("Write failed: " + e).initCause(e); |
| 157 |
} |
|
| 158 |
} |
|
| 159 |
|
|
| 160 |
/**
|
|
| 161 |
* @param record
|
|
| 162 |
* @return
|
|
| 163 |
* @throws InterruptedException
|
|
| 164 |
*/
|
|
| 165 | 6 |
private BatchedWrite addToPendingWriteBatch(Record record, Location mark, boolean force) throws InterruptedException { |
| 166 |
|
|
| 167 |
// Load the write batch up with data from our record.
|
|
| 168 |
// it may take more than one write batch if the record is large.
|
|
| 169 | 6 |
BatchedWrite answer = null;
|
| 170 | 6 |
while (record.hasRemaining()) {
|
| 171 |
|
|
| 172 |
// Do we need another BatchWrite?
|
|
| 173 | 8 |
boolean queueTheWrite=false; |
| 174 | 8 |
if (pendingBatchWrite == null) { |
| 175 | 4 |
pendingBatchWrite = new BatchedWrite(packetPool.getPacket());
|
| 176 | 4 |
queueTheWrite = true;
|
| 177 |
} |
|
| 178 | 8 |
answer = pendingBatchWrite; |
| 179 |
|
|
| 180 |
// Can we continue to use the pendingBatchWrite?
|
|
| 181 | 8 |
boolean full = !pendingBatchWrite.append(record, mark, force);
|
| 182 |
|
|
| 183 | 8 |
if( queueTheWrite ) {
|
| 184 | 4 |
final BatchedWrite queuedWrite = pendingBatchWrite; |
| 185 | 4 |
executor.execute(new Runnable() {
|
| 186 | 4 |
public void run() { |
| 187 | 4 |
try {
|
| 188 | 4 |
queuedWrite(queuedWrite); |
| 189 |
} catch (InterruptedException e) {
|
|
| 190 |
} |
|
| 191 |
} |
|
| 192 |
}); |
|
| 193 |
} |
|
| 194 |
|
|
| 195 | 8 |
if( full )
|
| 196 | 2 |
pendingBatchWrite = null;
|
| 197 |
} |
|
| 198 | 6 |
return answer;
|
| 199 |
|
|
| 200 |
} |
|
| 201 |
|
|
| 202 |
/**
|
|
| 203 |
* This is a blocking call
|
|
| 204 |
*
|
|
| 205 |
* @param write
|
|
| 206 |
* @throws InterruptedException
|
|
| 207 |
*/
|
|
| 208 | 4 |
private void queuedWrite(BatchedWrite write) throws InterruptedException { |
| 209 |
|
|
| 210 |
// Stop other threads from appending more pendingBatchWrite.
|
|
| 211 | 4 |
write.flip(); |
| 212 |
|
|
| 213 |
// Do the write.
|
|
| 214 | 4 |
try {
|
| 215 | 4 |
file.append(write); |
| 216 | 4 |
write.forced(); |
| 217 |
} catch (Throwable e) {
|
|
| 218 | 0 |
write.writeFailed(e); |
| 219 |
} finally {
|
|
| 220 | 4 |
write.getPacket().dispose(); |
| 221 |
} |
|
| 222 |
} |
|
| 223 |
|
|
| 224 |
/**
|
|
| 225 |
*
|
|
| 226 |
*/
|
|
| 227 | 6 |
private void rolloverCheck() throws IOException { |
| 228 |
|
|
| 229 |
// See if we need to issue an overflow notification.
|
|
| 230 | 6 |
if (eventListener != null && file.isPastHalfActive() |
| 231 |
&& overflowNotificationTime + OVERFLOW_RENOTIFICATION_DELAY < System.currentTimeMillis()) {
|
|
| 232 |
|
|
| 233 |
// We need to send an overflow notification to free up
|
|
| 234 |
// some logFiles.
|
|
| 235 | 0 |
Location safeSpot = file.getFirstRecordLocationOfSecondActiveLogFile(); |
| 236 | 0 |
eventListener.overflowNotification(safeSpot); |
| 237 | 0 |
overflowNotificationTime = System.currentTimeMillis(); |
| 238 |
} |
|
| 239 |
|
|
| 240 |
// Is it time to roll over?
|
|
| 241 | 6 |
if (appendLogFileOffset > rolloverFence ) {
|
| 242 |
|
|
| 243 |
// Can we roll over?
|
|
| 244 | 0 |
if ( !file.canActivateNextLogFile() ) {
|
| 245 |
// don't delay the next overflow notification.
|
|
| 246 | 0 |
overflowNotificationTime -= OVERFLOW_RENOTIFICATION_DELAY; |
| 247 |
|
|
| 248 |
} else {
|
|
| 249 |
|
|
| 250 | 0 |
final FutureResult result = new FutureResult();
|
| 251 | 0 |
try {
|
| 252 | 0 |
executor.execute(new Runnable() {
|
| 253 | 0 |
public void run() { |
| 254 | 0 |
try {
|
| 255 | 0 |
result.set(queuedActivateNextLogFile()); |
| 256 |
} catch (Throwable e) {
|
|
| 257 | 0 |
result.setException(e); |
| 258 |
} |
|
| 259 |
} |
|
| 260 |
}); |
|
| 261 |
|
|
| 262 | 0 |
Location location = (Location) result.get(); |
| 263 | 0 |
appendLogFileId = location.getLogFileId(); |
| 264 | 0 |
appendLogFileOffset = location.getLogFileOffset(); |
| 265 |
|
|
| 266 |
} catch (InterruptedException e) {
|
|
| 267 | 0 |
throw (IOException) new IOException("Interrupted.").initCause(e); |
| 268 |
} catch (InvocationTargetException e) {
|
|
| 269 | 0 |
if (e.getTargetException() instanceof IOException) |
| 270 | 0 |
throw (IOException) new IOException(e.getTargetException().getMessage()).initCause(e |
| 271 |
.getTargetException()); |
|
| 272 | 0 |
throw (IOException) new IOException("Unexpected Exception: ").initCause(e.getTargetException()); |
| 273 |
} |
|
| 274 |
} |
|
| 275 |
} |
|
| 276 |
} |
|
| 277 |
|
|
| 278 |
/**
|
|
| 279 |
* This is a blocking call
|
|
| 280 |
*/
|
|
| 281 | 0 |
private Location queuedActivateNextLogFile() throws IOException { |
| 282 | 0 |
file.activateNextLogFile(); |
| 283 | 0 |
return file.getNextAppendLocation();
|
| 284 |
} |
|
| 285 |
|
|
| 286 |
|
|
| 287 |
|
|
| 288 |
/**
|
|
| 289 |
* @param recordLocator
|
|
| 290 |
* @param force
|
|
| 291 |
* @return
|
|
| 292 |
* @throws InvalidRecordLocationException
|
|
| 293 |
* @throws IOException
|
|
| 294 |
* @throws InterruptedException
|
|
| 295 |
*/
|
|
| 296 | 0 |
synchronized public void setMark(RecordLocation l, boolean force) throws InvalidRecordLocationException, |
| 297 |
IOException {
|
|
| 298 |
|
|
| 299 | 0 |
Location location = (Location) l; |
| 300 | 0 |
if (location == null) |
| 301 | 0 |
throw new InvalidRecordLocationException("The location cannot be null."); |
| 302 | 0 |
if (lastMarkedLocation != null && location.compareTo(lastMarkedLocation) < 0) |
| 303 | 0 |
throw new InvalidRecordLocationException("The location is less than the last mark."); |
| 304 |
|
|
| 305 | 0 |
markPacket.clear(); |
| 306 | 0 |
location.writeToPacket(markPacket); |
| 307 | 0 |
markPacket.flip(); |
| 308 | 0 |
write(LogFileManager.MARK_RECORD_TYPE, markPacket, force, location); |
| 309 |
|
|
| 310 | 0 |
lastMarkedLocation = location; |
| 311 |
} |
|
| 312 |
|
|
| 313 |
/**
|
|
| 314 |
* @return
|
|
| 315 |
*/
|
|
| 316 | 2 |
public RecordLocation getMark() {
|
| 317 | 2 |
return lastMarkedLocation;
|
| 318 |
} |
|
| 319 |
|
|
| 320 |
/**
|
|
| 321 |
* @param lastLocation
|
|
| 322 |
* @return
|
|
| 323 |
* @throws IOException
|
|
| 324 |
* @throws InvalidRecordLocationException
|
|
| 325 |
*/
|
|
| 326 | 8 |
public RecordLocation getNextRecordLocation(final RecordLocation lastLocation) throws IOException, |
| 327 |
InvalidRecordLocationException {
|
|
| 328 |
|
|
| 329 | 8 |
if (lastLocation == null) { |
| 330 | 2 |
if (lastMarkedLocation != null) { |
| 331 | 0 |
return lastMarkedLocation;
|
| 332 |
} else {
|
|
| 333 | 2 |
return file.getFirstActiveLogLocation();
|
| 334 |
} |
|
| 335 |
} |
|
| 336 |
|
|
| 337 |
// Run this in the queued executor thread.
|
|
| 338 | 6 |
final FutureResult result = new FutureResult();
|
| 339 | 6 |
try {
|
| 340 | 6 |
executor.execute(new Runnable() {
|
| 341 | 6 |
public void run() { |
| 342 | 6 |
try {
|
| 343 | 6 |
result.set(queuedGetNextRecordLocation((Location) lastLocation)); |
| 344 |
} catch (Throwable e) {
|
|
| 345 | 0 |
result.setException(e); |
| 346 |
} |
|
| 347 |
} |
|
| 348 |
}); |
|
| 349 | 6 |
return (Location) result.get();
|
| 350 |
} catch (InterruptedException e) {
|
|
| 351 | 0 |
throw (IOException) new IOException("Interrupted.").initCause(e); |
| 352 |
} catch (InvocationTargetException e) {
|
|
| 353 | 0 |
return (RecordLocation) unwrapException(e);
|
| 354 |
|
|
| 355 |
} |
|
| 356 |
} |
|
| 357 |
|
|
| 358 | 0 |
private Object unwrapException(InvocationTargetException e) throws InvalidRecordLocationException, IOException { |
| 359 | 0 |
if (e.getTargetException() instanceof InvalidRecordLocationException) |
| 360 | 0 |
throw new InvalidRecordLocationException(e.getTargetException().getMessage(), e.getTargetException()); |
| 361 | 0 |
if (e.getTargetException() instanceof IOException) |
| 362 | 0 |
throw (IOException) new IOException(e.getTargetException().getMessage()).initCause(e |
| 363 |
.getTargetException()); |
|
| 364 | 0 |
throw (IOException) new IOException("Unexpected Exception: ").initCause(e.getTargetException()); |
| 365 |
} |
|
| 366 |
|
|
| 367 | 6 |
private Location queuedGetNextRecordLocation(Location location) throws IOException, InvalidRecordLocationException { |
| 368 | 6 |
return file.getNextDataRecordLocation(location);
|
| 369 |
} |
|
| 370 |
|
|
| 371 |
/**
|
|
| 372 |
* @param location
|
|
| 373 |
* @return
|
|
| 374 |
* @throws InvalidRecordLocationException
|
|
| 375 |
* @throws IOException
|
|
| 376 |
*/
|
|
| 377 | 12 |
public Packet read(final RecordLocation l) throws IOException, InvalidRecordLocationException { |
| 378 | 12 |
final Location location = (Location) l; |
| 379 |
// Run this in the queued executor thread.
|
|
| 380 | 12 |
final FutureResult result = new FutureResult();
|
| 381 | 12 |
try {
|
| 382 | 12 |
executor.execute(new Runnable() {
|
| 383 | 12 |
public void run() { |
| 384 | 12 |
try {
|
| 385 | 12 |
result.set(file.readPacket(location)); |
| 386 |
} catch (Throwable e) {
|
|
| 387 | 0 |
result.setException(e); |
| 388 |
} |
|
| 389 |
} |
|
| 390 |
}); |
|
| 391 | 12 |
return (Packet) result.get();
|
| 392 |
} catch (InterruptedException e) {
|
|
| 393 | 0 |
throw (IOException) new IOException("Interrupted.").initCause(e); |
| 394 |
} catch (InvocationTargetException e) {
|
|
| 395 | 0 |
if (e.getTargetException() instanceof InvalidRecordLocationException) |
| 396 | 0 |
throw new InvalidRecordLocationException(e.getTargetException().getMessage(), e.getTargetException()); |
| 397 | 0 |
if (e.getTargetException() instanceof IOException) |
| 398 | 0 |
throw (IOException) new IOException(e.getTargetException().getMessage()).initCause(e |
| 399 |
.getTargetException()); |
|
| 400 | 0 |
throw (IOException) new IOException("Unexpected Exception: ").initCause(e.getTargetException()); |
| 401 |
} |
|
| 402 |
} |
|
| 403 |
|
|
| 404 | 0 |
public void setJournalEventListener(JournalEventListener eventListener) { |
| 405 | 0 |
this.eventListener = eventListener;
|
| 406 |
} |
|
| 407 |
|
|
| 408 |
/**
|
|
| 409 |
* @deprecated @see #dispose()
|
|
| 410 |
*/
|
|
| 411 | 4 |
public void close() throws IOException { |
| 412 | 4 |
dispose(); |
| 413 |
} |
|
| 414 |
|
|
| 415 |
/**
|
|
| 416 |
*/
|
|
| 417 | 4 |
public void dispose() { |
| 418 | 4 |
if (disposed)
|
| 419 | 0 |
return;
|
| 420 | 4 |
disposed=true;
|
| 421 | 4 |
executor.shutdownAfterProcessingCurrentlyQueuedTasks(); |
| 422 | 4 |
file.dispose(); |
| 423 |
} |
|
| 424 |
|
|
| 425 |
/**
|
|
| 426 |
* @return
|
|
| 427 |
*/
|
|
| 428 | 2 |
public File getLogDirectory() {
|
| 429 | 2 |
return file.getLogDirectory();
|
| 430 |
} |
|
| 431 |
|
|
| 432 | 0 |
public int getInitialLogFileSize() { |
| 433 | 0 |
return file.getInitialLogFileSize();
|
| 434 |
} |
|
| 435 |
|
|
| 436 | 2 |
public String toString() {
|
| 437 | 2 |
return "Active Journal: using "+file.getOnlineLogFileCount()+" x " + (file.getInitialLogFileSize()/(1024*1024f)) + " Megs at: " + getLogDirectory(); |
| 438 |
} |
|
| 439 |
|
|
| 440 |
} |
|
| 441 |
|
|
||||||||||