1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.journal.impl;
19
20 import org.codehaus.activemq.journal.InvalidRecordLocationException;
21 import org.codehaus.activemq.journal.Journal;
22 import org.codehaus.activemq.journal.JournalEventListener;
23 import org.codehaus.activemq.journal.RecordLocation;
24 import org.codehaus.activemq.util.LongSequenceGenerator;
25
26 import java.io.File;
27 import java.io.IOException;
28 import java.io.InterruptedIOException;
29
30 /***
31 * A high speed Journal implementation. Inspired by the ideas of the
32 * <a href="http://howl.objectweb.org/">Howl</a> project but tailored to the needs
33 * of ActiveMQ.
34 * <p/>
35 * This Journal provides the following features:
36 * <ul>
37 * <li> Conncurrent writes are batched into a single write/force done by a background thread.</li>
38 * <li> Uses preallocated logs to avoid disk fragmentation and performance degregation. </li>
39 * <li> The number and size of the preallocated logs are configurable. </li>
40 * <li> Uses direct ByteBuffers to write data to log files. </li>
41 * <li> Allows logs to grow in case of an overflow condition so that overflow exceptions are not
42 * not thrown. Grown logs that are inactivated (due to a new mark) are resized to
43 * thier original size.</li>
44 * <li> No limit on the size of the record written to the journal</li>
45 * <li> Should be possible to extend so that multiple physical disk are used concurrently to
46 * increase throughput and decrease latency.</li>
47 * </ul>
48 * <p/>
49 * This class is a thing wrapper around a LogFileManager. It may be possible to extends this class so
50 * that it manages multiple LogFileManager objects to allow logging to multiple physical disks.
51 *
52 * @version $Revision: 1.3 $
53 */
54 public class JournalImpl implements Journal {
55
56 LogFileManager manager;
57 LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
58
59 public JournalImpl() throws IOException {
60 this(new File("logs"));
61 }
62
63 public JournalImpl(File logDirectory) throws IOException {
64 manager = new LogFileManager((byte) 0, sequenceGenerator, new LogFile(logDirectory));
65 initSequenceId();
66 }
67
68 public JournalImpl(File logDirectory, int segments, int segmentSize) throws IOException {
69 manager = new LogFileManager((byte) 0, sequenceGenerator, new LogFile(logDirectory, segments, segmentSize));
70 initSequenceId();
71 }
72
73
74 private void initSequenceId() {
75 long id = manager.getLastSequenceId();
76 if (id == -1) {
77 id = 0;
78 }
79 sequenceGenerator.setLastSequenceId(id);
80 }
81
82 /***
83 * @throws IOException
84 * @see org.codehaus.activemq.journal.Journal#write(byte[], boolean)
85 */
86 public RecordLocation write(byte[] data, boolean sync) throws IOException {
87 return manager.write(data, sync);
88 }
89
90 /***
91 * @see org.codehaus.activemq.journal.Journal#setMark(org.codehaus.activemq.journal.RecordLocation,
92 * boolean)
93 */
94 public void setMark(RecordLocation recordLocator, boolean force)
95 throws InvalidRecordLocationException, IOException {
96 RecordLocationImpl rl = (RecordLocationImpl) recordLocator;
97 try {
98 manager.setMark(rl, force);
99 }
100 catch (InterruptedException e) {
101 throw new InterruptedIOException();
102 }
103 }
104
105 /***
106 * @see org.codehaus.activemq.journal.Journal#getMark()
107 */
108 public RecordLocation getMark() {
109 return manager.getMark();
110 }
111
112 /***
113 * @see org.codehaus.activemq.journal.Journal#close()
114 */
115 public void close() throws IOException {
116 manager.close();
117 }
118
119 /***
120 * @see org.codehaus.activemq.journal.Journal#setJournalEventListener(org.codehaus.activemq.journal.JournalEventListener)
121 */
122 public void setJournalEventListener(JournalEventListener eventListener) {
123 manager.setJournalEventListener(eventListener);
124 }
125
126 /***
127 * @throws InvalidRecordLocationException
128 * @throws IOException
129 * @see org.codehaus.activemq.journal.Journal#getNextRecordLocation(org.codehaus.activemq.journal.RecordLocation)
130 */
131 public RecordLocation getNextRecordLocation(RecordLocation lastLocation)
132 throws IOException, InvalidRecordLocationException {
133
134 RecordLocationImpl rl = (RecordLocationImpl) lastLocation;
135 return manager.getNextRecordLocation(rl);
136 }
137
138 /***
139 * @see org.codehaus.activemq.journal.Journal#getNextRecordLocation(org.codehaus.activemq.journal.RecordLocation)
140 */
141 public byte[] read(RecordLocation location)
142 throws InvalidRecordLocationException, IOException {
143
144 RecordLocationImpl rl = (RecordLocationImpl) location;
145 return manager.read(rl);
146 }
147
148 /***
149 * @see java.lang.Object#toString()
150 */
151 public String toString() {
152 return "JournalImpl at '" + manager.getLogDirectory() + "' using " + manager.getTotalSegements() + " x " + ((float) manager.getInitialSegmentSize() / ((float) 1024 * 1024)) + " Meg log files.";
153 }
154 }