1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.service.impl;
19
20 import com.sleepycat.util.FastInputStream;
21 import com.sleepycat.util.FastOutputStream;
22 import org.codehaus.activemq.service.QueueList;
23 import org.codehaus.activemq.service.QueueListEntry;
24 import org.codehaus.activemq.util.JMSExceptionHelper;
25
26 import javax.jms.JMSException;
27 import java.io.ByteArrayInputStream;
28 import java.io.ByteArrayOutputStream;
29 import java.io.DataInputStream;
30 import java.io.DataOutputStream;
31 import java.io.IOException;
32 import java.io.Serializable;
33
34 /***
35 * A base class which is useful for implementation inheritence when implementing
36 * a persistent QueueList
37 *
38 * @version $Revision: 1.5 $
39 */
40 public abstract class QueueListSupport implements QueueList {
41 protected static final Long HEAD_KEY = new Long(0);
42
43 public static class Header implements Serializable {
44 private static final long serialVersionUID = 64734383295040L;
45
46 public Long headKey;
47 public Long tailKey;
48 public long lastKeyValue;
49 public int size;
50
51 public byte[] asBytes() throws IOException {
52 ByteArrayOutputStream buffer = new ByteArrayOutputStream();
53 DataOutputStream out = new DataOutputStream(buffer);
54 out.writeLong(unwrapLong(headKey));
55 out.writeLong(unwrapLong(tailKey));
56 out.writeLong(lastKeyValue);
57 out.writeInt(size);
58 return buffer.toByteArray();
59 }
60
61 public void fromBytes(byte[] data) throws IOException {
62 DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
63 this.headKey = wrapLong(in.readLong());
64 this.tailKey = wrapLong(in.readLong());
65 this.lastKeyValue = in.readLong();
66 this.size = in.readInt();
67 }
68 }
69
70 public static class Node implements Serializable, QueueListEntry {
71 private static final long serialVersionUID = 4609474001468609536L;
72
73 public Long previousKey;
74 public Long nextKey;
75 public Object value;
76
77
78 public transient Long key;
79
80 public Object getElement() {
81 return value;
82 }
83
84 public byte[] asBytes() throws IOException {
85 FastOutputStream buffer = new FastOutputStream();
86 DataOutputStream out = new DataOutputStream(buffer);
87 out.writeLong(unwrapLong(previousKey));
88 out.writeLong(unwrapLong(nextKey));
89 out.writeUTF((String) value);
90 return buffer.toByteArray();
91 }
92
93 public void fromBytes(byte[] data) throws IOException {
94 DataInputStream in = new DataInputStream(new FastInputStream(data));
95 this.previousKey = wrapLong(in.readLong());
96 this.nextKey = wrapLong(in.readLong());
97 this.value = in.readUTF();
98 }
99 }
100
101 public Object getFirst() throws JMSException {
102 try {
103 Long key = getHeader().headKey;
104 if (key != null) {
105 Node node = getNode(key);
106 if (node != null) {
107 return node.getElement();
108 }
109 }
110 return null;
111 }
112 catch (IOException e) {
113 throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
114 }
115 }
116
117 public Object getLast() throws JMSException {
118 try {
119 Long key = getHeader().tailKey;
120 if (key != null) {
121 Node node = getNode(key);
122 if (node != null) {
123 return node.getElement();
124 }
125 }
126 return null;
127 }
128 catch (IOException e) {
129 throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
130 }
131 }
132
133 public Object removeFirst() throws JMSException {
134 try {
135 Header header = getHeader();
136 Long key = header.headKey;
137 if (key != null) {
138 Node node = getNode(key);
139 if (node != null) {
140 doRemoveNode(node);
141 header.headKey = node.nextKey;
142 --header.size;
143 updateHeader(header);
144 return node.getElement();
145 }
146 }
147 return null;
148 }
149 catch (IOException e) {
150 throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
151 }
152 }
153
154 public Object removeLast() throws JMSException {
155 try {
156 Header header = getHeader();
157 Long key = header.tailKey;
158 if (key != null) {
159 Node node = getNode(key);
160 if (node != null) {
161 doRemoveNode(node);
162 header.tailKey = node.previousKey;
163 --header.size;
164 updateHeader(header);
165 return node.getElement();
166 }
167 }
168 return null;
169 }
170 catch (IOException e) {
171 throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
172 }
173 }
174
175 public QueueListEntry addFirst(Object value) throws JMSException {
176 try {
177 Header header = getHeader();
178 Node node = createNode();
179 node.value = value;
180 Long nextKey = header.headKey;
181 node.nextKey = nextKey;
182 Long key = createKey(header);
183 node.key = key;
184 updateNode(node);
185 updateNextNode(nextKey, key);
186 header.headKey = key;
187 if (header.tailKey == null) {
188 header.tailKey = key;
189 }
190 header.size++;
191 updateHeader(header);
192 return node;
193 }
194 catch (IOException e) {
195 throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
196 }
197 }
198
199 public QueueListEntry addLast(Object value) throws JMSException {
200 try {
201 Header header = getHeader();
202 return doAddLast(value, header);
203 }
204 catch (IOException e) {
205 throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
206 }
207 }
208
209 public boolean contains(Object value) throws JMSException {
210 return indexOf(value) != -1;
211 }
212
213 public int size() throws JMSException {
214 try {
215 return getHeader().size;
216 }
217 catch (IOException e) {
218 throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
219 }
220 }
221
222 public boolean isEmpty() throws JMSException {
223 int size = size();
224 return size == 0;
225 }
226
227 public QueueListEntry add(Object value) throws JMSException {
228 return addLast(value);
229 }
230
231 public Object get(int index) throws JMSException {
232 try {
233 Node node = getNode(index);
234 if (node != null) {
235 return node.value;
236 }
237 return null;
238 }
239 catch (IOException e) {
240 throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
241 }
242 }
243
244 public Object set(int index, Object element) throws JMSException {
245 try {
246 Node node = getNode(index);
247 if (node != null) {
248 Object previous = node.value;
249 node.value = element;
250 updateNode(node);
251 return previous;
252 }
253 return null;
254 }
255 catch (IOException e) {
256 throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
257 }
258 }
259
260 public void add(int index, Object element) throws JMSException {
261 if (index == 0) {
262 addFirst(element);
263 }
264 else {
265 try {
266 Header header = getHeader();
267 Node nextNode = getNode(header, index);
268 doAddBefore(header, nextNode, element);
269 }
270 catch (IOException e) {
271 throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
272 }
273 }
274 }
275
276 public Object remove(int index) throws JMSException {
277 try {
278 Node node = getNode(index);
279 if (node != null) {
280 removeNode(node);
281 }
282 return null;
283 }
284 catch (IOException e) {
285 throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
286 }
287 }
288
289 public int indexOf(Object value) throws JMSException {
290 try {
291 Header header = getHeader();
292 Long key = header.headKey;
293 for (int i = 0; key != null; i++) {
294 Node node = getNode(key);
295 if (node == null) {
296 break;
297 }
298 if (value == node.value || (value != null && value.equals(node.value))) {
299 return i;
300 }
301 key = node.nextKey;
302 }
303 return -1;
304 }
305 catch (IOException e) {
306 throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
307 }
308 }
309
310 public int lastIndexOf(Object value) throws JMSException {
311 try {
312 Header header = getHeader();
313 Long key = header.tailKey;
314 for (int i = header.size - 1; key != null; i--) {
315 Node node = getNode(key);
316 if (node == null) {
317 break;
318 }
319 if (value == node.value || (value != null && value.equals(node.value))) {
320 return i;
321 }
322 key = node.previousKey;
323 }
324 return -1;
325 }
326 catch (IOException e) {
327 throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
328 }
329 }
330
331 public QueueListEntry getFirstEntry() throws JMSException {
332 try {
333 return getNode(getHeader().headKey);
334 }
335 catch (IOException e) {
336 throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
337 }
338 }
339
340 public QueueListEntry getLastEntry() throws JMSException {
341 try {
342 return getNode(getHeader().tailKey);
343 }
344 catch (IOException e) {
345 throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
346 }
347 }
348
349 public QueueListEntry getNextEntry(QueueListEntry entry) throws JMSException {
350 Node node = (Node) entry;
351 try {
352 return getNode(node.nextKey);
353 }
354 catch (IOException e) {
355 throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
356 }
357 }
358
359 public QueueListEntry getPrevEntry(QueueListEntry entry) throws JMSException {
360 Node node = (Node) entry;
361 try {
362 return getNode(node.previousKey);
363 }
364 catch (IOException e) {
365 throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
366 }
367 }
368
369 public QueueListEntry addBefore(Object value, QueueListEntry entry) throws JMSException {
370 try {
371 return doAddBefore(getHeader(), (Node) entry, value);
372 }
373 catch (IOException e) {
374 throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
375 }
376 }
377
378 public void remove(QueueListEntry entry) throws JMSException {
379 try {
380 removeNode((Node) entry);
381 }
382 catch (IOException e) {
383 throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
384 }
385 }
386
387 public Object[] toArray() throws JMSException {
388 try {
389 Header header = getHeader();
390 int size = header.size;
391 if (size == 0) {
392 return EMPTY_ARRAY;
393 }
394 Long key = header.headKey;
395 Object[] answer = new Object[size];
396 for (int i = 0; i < size && key != null; i++) {
397 Node node = getNode(key);
398 if (node != null) {
399 answer[i] = node.value;
400 key = node.nextKey;
401 }
402 }
403 return answer;
404 }
405 catch (IOException e) {
406 throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
407 }
408 }
409
410 public void rotate() throws JMSException {
411
412 Object obj = removeFirst();
413 if (obj != null) {
414 addLast(obj);
415 }
416 }
417
418 protected Long createKey(Header header) throws IOException, JMSException {
419 long value = header.lastKeyValue;
420 while (true) {
421 if (value == Long.MAX_VALUE) {
422 value = 1;
423 }
424 else {
425 value++;
426 }
427 Long key = new Long(value);
428 if (getNode(key) == null) {
429 header.lastKeyValue = value;
430 return key;
431 }
432 }
433 }
434
435 protected boolean removeNode(Node node) throws IOException, JMSException {
436 Header header = null;
437 boolean updatedPrevious = false;
438 if (node.previousKey != null) {
439
440 Node previousNode = getNode(node.previousKey);
441 if (previousNode != null) {
442 previousNode.nextKey = node.nextKey;
443 updateNode(previousNode);
444 updatedPrevious = true;
445 }
446 }
447 if (!updatedPrevious) {
448
449 header = getHeader();
450 header.headKey = node.nextKey;
451 }
452
453 boolean updatedNext = false;
454 if (node.nextKey != null) {
455
456 Node nextNode = getNode(node.nextKey);
457 if (nextNode != null) {
458 nextNode.previousKey = node.previousKey;
459 updateNode(nextNode);
460 updatedNext = true;
461 }
462 }
463 if (!updatedNext) {
464
465 header = getHeader();
466 header.tailKey = node.previousKey;
467 }
468 return true;
469 }
470
471 /***
472 * Looks up the header object, lazily creating one if the current table is empty
473 *
474 * @return
475 * @throws java.io.IOException
476 */
477 protected abstract Header getHeader() throws IOException, JMSException;
478
479 /***
480 * Writes the header back to disk after its been changed
481 *
482 * @param header
483 * @throws java.io.IOException
484 */
485 protected abstract void updateHeader(Header header) throws IOException, JMSException;
486
487 /***
488 * Updates the node
489 *
490 * @param node
491 * @throws java.io.IOException
492 */
493 protected abstract void updateNode(Node node) throws IOException, JMSException;
494
495 protected abstract Node getNode(Long key) throws IOException, JMSException;
496
497 protected Node getNode(int index) throws IOException, JMSException {
498 Header header = getHeader();
499 return getNode(header, index);
500 }
501
502 protected Node getNode(Header header, int index) throws IOException, JMSException {
503 Node node = null;
504 int middle = header.size / 2;
505 if (index > middle) {
506
507 Long key = header.tailKey;
508 for (int i = header.size; i > index && key != null; i--) {
509 node = getNode(key);
510 if (node != null) {
511 key = node.previousKey;
512 }
513 }
514 }
515 else {
516 Long key = header.headKey;
517 for (int i = 0; i <= index && key != null; i++) {
518 node = getNode(key);
519 if (node != null) {
520 key = node.nextKey;
521 }
522 }
523 }
524 return node;
525 }
526
527 protected Node doAddLast(Object value, Header header) throws IOException, JMSException {
528 Node node = createNode();
529 Long key = createKey(header);
530 node.key = key;
531 node.value = value;
532 Long previousKey = header.tailKey;
533 node.previousKey = previousKey;
534 updateNode(node);
535 updatePreviousNode(previousKey, key);
536 header.tailKey = key;
537 if (header.headKey == null) {
538 header.headKey = key;
539 }
540 header.size++;
541 updateHeader(header);
542 return node;
543 }
544
545 protected void updateNextNode(Long nextKey, Long key) throws IOException, JMSException {
546 if (nextKey != null) {
547 Node nextNode = getNode(nextKey);
548 if (nextNode == null) {
549 throw new IOException("Missing node for key: " + nextKey);
550 }
551 nextNode.previousKey = key;
552 updateNode(nextNode);
553 }
554 }
555
556 protected void updatePreviousNode(Long previousKey, Long key) throws IOException, JMSException {
557 if (previousKey != null) {
558 Node previousNode = getNode(previousKey);
559 if (previousNode == null) {
560 throw new IOException("Missing previous node for key: " + previousKey);
561 }
562 previousNode.nextKey = key;
563 updateNode(previousNode);
564 }
565 }
566
567 protected Node doAddBefore(Header header, Node nextNode, Object element) throws JMSException, IOException {
568 if (nextNode == null) {
569 return doAddLast(element, header);
570 }
571 else {
572
573 Long key = createKey(header);
574 Node node = createNode();
575 node.value = element;
576 node.key = key;
577 Long previousKey = nextNode.previousKey;
578 node.previousKey = previousKey;
579 node.nextKey = nextNode.key;
580 nextNode.previousKey = key;
581 header.size++;
582
583 updateNode(node);
584 updateNode(nextNode);
585 updatePreviousNode(previousKey, key);
586 updateHeader(header);
587 return node;
588 }
589 }
590
591 protected abstract void doRemoveNode(Node node) throws IOException, JMSException;
592
593 protected static Long wrapLong(long value) {
594
595 if (value == 0) {
596 return null;
597 }
598
599 return new Long(value);
600 }
601
602 protected static long unwrapLong(Long key) {
603 if (key != null) {
604 return key.longValue();
605 }
606
607 return 0;
608 }
609
610 protected Node createNode() {
611 return new Node();
612 }
613 }