View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.service.impl;
19  
20  import com.sleepycat.util.FastInputStream;
21  import com.sleepycat.util.FastOutputStream;
22  import org.codehaus.activemq.service.QueueList;
23  import org.codehaus.activemq.service.QueueListEntry;
24  import org.codehaus.activemq.util.JMSExceptionHelper;
25  
26  import javax.jms.JMSException;
27  import java.io.ByteArrayInputStream;
28  import java.io.ByteArrayOutputStream;
29  import java.io.DataInputStream;
30  import java.io.DataOutputStream;
31  import java.io.IOException;
32  import java.io.Serializable;
33  
34  /***
35   * A base class which is useful for implementation inheritence when implementing
36   * a persistent QueueList
37   *
38   * @version $Revision: 1.5 $
39   */
40  public abstract class QueueListSupport implements QueueList {
41      protected static final Long HEAD_KEY = new Long(0);
42  
43      public static class Header implements Serializable {
44          private static final long serialVersionUID = 64734383295040L;
45  
46          public Long headKey;
47          public Long tailKey;
48          public long lastKeyValue;
49          public int size;
50  
51          public byte[] asBytes() throws IOException {
52              ByteArrayOutputStream buffer = new ByteArrayOutputStream();
53              DataOutputStream out = new DataOutputStream(buffer);
54              out.writeLong(unwrapLong(headKey));
55              out.writeLong(unwrapLong(tailKey));
56              out.writeLong(lastKeyValue);
57              out.writeInt(size);
58              return buffer.toByteArray();
59          }
60  
61          public void fromBytes(byte[] data) throws IOException {
62              DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
63              this.headKey = wrapLong(in.readLong());
64              this.tailKey = wrapLong(in.readLong());
65              this.lastKeyValue = in.readLong();
66              this.size = in.readInt();
67          }
68      }
69  
70      public static class Node implements Serializable, QueueListEntry {
71          private static final long serialVersionUID = 4609474001468609536L;
72  
73          public Long previousKey;
74          public Long nextKey;
75          public Object value;
76  
77          // not stored, but cached when read from the table
78          public transient Long key;
79  
80          public Object getElement() {
81              return value;
82          }
83  
84          public byte[] asBytes() throws IOException {
85              FastOutputStream buffer = new FastOutputStream();
86              DataOutputStream out = new DataOutputStream(buffer);
87              out.writeLong(unwrapLong(previousKey));
88              out.writeLong(unwrapLong(nextKey));
89              out.writeUTF((String) value);
90              return buffer.toByteArray();
91          }
92  
93          public void fromBytes(byte[] data) throws IOException {
94              DataInputStream in = new DataInputStream(new FastInputStream(data));
95              this.previousKey = wrapLong(in.readLong());
96              this.nextKey = wrapLong(in.readLong());
97              this.value = in.readUTF();
98          }
99      }
100 
101     public Object getFirst() throws JMSException {
102         try {
103             Long key = getHeader().headKey;
104             if (key != null) {
105                 Node node = getNode(key);
106                 if (node != null) {
107                     return node.getElement();
108                 }
109             }
110             return null;
111         }
112         catch (IOException e) {
113             throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
114         }
115     }
116 
117     public Object getLast() throws JMSException {
118         try {
119             Long key = getHeader().tailKey;
120             if (key != null) {
121                 Node node = getNode(key);
122                 if (node != null) {
123                     return node.getElement();
124                 }
125             }
126             return null;
127         }
128         catch (IOException e) {
129             throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
130         }
131     }
132 
133     public Object removeFirst() throws JMSException {
134         try {
135             Header header = getHeader();
136             Long key = header.headKey;
137             if (key != null) {
138                 Node node = getNode(key);
139                 if (node != null) {
140                     doRemoveNode(node);
141                     header.headKey = node.nextKey;
142                     --header.size;
143                     updateHeader(header);
144                     return node.getElement();
145                 }
146             }
147             return null;
148         }
149         catch (IOException e) {
150             throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
151         }
152     }
153 
154     public Object removeLast() throws JMSException {
155         try {
156             Header header = getHeader();
157             Long key = header.tailKey;
158             if (key != null) {
159                 Node node = getNode(key);
160                 if (node != null) {
161                     doRemoveNode(node);
162                     header.tailKey = node.previousKey;
163                     --header.size;
164                     updateHeader(header);
165                     return node.getElement();
166                 }
167             }
168             return null;
169         }
170         catch (IOException e) {
171             throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
172         }
173     }
174 
175     public QueueListEntry addFirst(Object value) throws JMSException {
176         try {
177             Header header = getHeader();
178             Node node = createNode();
179             node.value = value;
180             Long nextKey = header.headKey;
181             node.nextKey = nextKey;
182             Long key = createKey(header);
183             node.key = key;
184             updateNode(node);
185             updateNextNode(nextKey, key);
186             header.headKey = key;
187             if (header.tailKey == null) {
188                 header.tailKey = key;
189             }
190             header.size++;
191             updateHeader(header);
192             return node;
193         }
194         catch (IOException e) {
195             throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
196         }
197     }
198 
199     public QueueListEntry addLast(Object value) throws JMSException {
200         try {
201             Header header = getHeader();
202             return doAddLast(value, header);
203         }
204         catch (IOException e) {
205             throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
206         }
207     }
208 
209     public boolean contains(Object value) throws JMSException {
210         return indexOf(value) != -1;
211     }
212 
213     public int size() throws JMSException {
214         try {
215             return getHeader().size;
216         }
217         catch (IOException e) {
218             throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
219         }
220     }
221 
222     public boolean isEmpty() throws JMSException {
223         int size = size();
224         return size == 0;
225     }
226 
227     public QueueListEntry add(Object value) throws JMSException {
228         return addLast(value);
229     }
230 
231     public Object get(int index) throws JMSException {
232         try {
233             Node node = getNode(index);
234             if (node != null) {
235                 return node.value;
236             }
237             return null;
238         }
239         catch (IOException e) {
240             throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
241         }
242     }
243 
244     public Object set(int index, Object element) throws JMSException {
245         try {
246             Node node = getNode(index);
247             if (node != null) {
248                 Object previous = node.value;
249                 node.value = element;
250                 updateNode(node);
251                 return previous;
252             }
253             return null;
254         }
255         catch (IOException e) {
256             throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
257         }
258     }
259 
260     public void add(int index, Object element) throws JMSException {
261         if (index == 0) {
262             addFirst(element);
263         }
264         else {
265             try {
266                 Header header = getHeader();
267                 Node nextNode = getNode(header, index);
268                 doAddBefore(header, nextNode, element);
269             }
270             catch (IOException e) {
271                 throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
272             }
273         }
274     }
275 
276     public Object remove(int index) throws JMSException {
277         try {
278             Node node = getNode(index);
279             if (node != null) {
280                 removeNode(node);
281             }
282             return null;
283         }
284         catch (IOException e) {
285             throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
286         }
287     }
288 
289     public int indexOf(Object value) throws JMSException {
290         try {
291             Header header = getHeader();
292             Long key = header.headKey;
293             for (int i = 0; key != null; i++) {
294                 Node node = getNode(key);
295                 if (node == null) {
296                     break;
297                 }
298                 if (value == node.value || (value != null && value.equals(node.value))) {
299                     return i;
300                 }
301                 key = node.nextKey;
302             }
303             return -1;
304         }
305         catch (IOException e) {
306             throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
307         }
308     }
309 
310     public int lastIndexOf(Object value) throws JMSException {
311         try {
312             Header header = getHeader();
313             Long key = header.tailKey;
314             for (int i = header.size - 1; key != null; i--) {
315                 Node node = getNode(key);
316                 if (node == null) {
317                     break;
318                 }
319                 if (value == node.value || (value != null && value.equals(node.value))) {
320                     return i;
321                 }
322                 key = node.previousKey;
323             }
324             return -1;
325         }
326         catch (IOException e) {
327             throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
328         }
329     }
330 
331     public QueueListEntry getFirstEntry() throws JMSException {
332         try {
333             return getNode(getHeader().headKey);
334         }
335         catch (IOException e) {
336             throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
337         }
338     }
339 
340     public QueueListEntry getLastEntry() throws JMSException {
341         try {
342             return getNode(getHeader().tailKey);
343         }
344         catch (IOException e) {
345             throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
346         }
347     }
348 
349     public QueueListEntry getNextEntry(QueueListEntry entry) throws JMSException {
350         Node node = (Node) entry;
351         try {
352             return getNode(node.nextKey);
353         }
354         catch (IOException e) {
355             throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
356         }
357     }
358 
359     public QueueListEntry getPrevEntry(QueueListEntry entry) throws JMSException {
360         Node node = (Node) entry;
361         try {
362             return getNode(node.previousKey);
363         }
364         catch (IOException e) {
365             throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
366         }
367     }
368 
369     public QueueListEntry addBefore(Object value, QueueListEntry entry) throws JMSException {
370         try {
371             return doAddBefore(getHeader(), (Node) entry, value);
372         }
373         catch (IOException e) {
374             throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
375         }
376     }
377 
378     public void remove(QueueListEntry entry) throws JMSException {
379         try {
380             removeNode((Node) entry);
381         }
382         catch (IOException e) {
383             throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
384         }
385     }
386 
387     public Object[] toArray() throws JMSException {
388         try {
389             Header header = getHeader();
390             int size = header.size;
391             if (size == 0) {
392                 return EMPTY_ARRAY;
393             }
394             Long key = header.headKey;
395             Object[] answer = new Object[size];
396             for (int i = 0; i < size && key != null; i++) {
397                 Node node = getNode(key);
398                 if (node != null) {
399                     answer[i] = node.value;
400                     key = node.nextKey;
401                 }
402             }
403             return answer;
404         }
405         catch (IOException e) {
406             throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
407         }
408     }
409 
410     public void rotate() throws JMSException {
411         // TODO could tune this by just swizzling pointers
412         Object obj = removeFirst();
413         if (obj != null) {
414             addLast(obj);
415         }
416     }
417 
418     protected Long createKey(Header header) throws IOException, JMSException {
419         long value = header.lastKeyValue;
420         while (true) {
421             if (value == Long.MAX_VALUE) {
422                 value = 1;
423             }
424             else {
425                 value++;
426             }
427             Long key = new Long(value);
428             if (getNode(key) == null) {
429                 header.lastKeyValue = value;
430                 return key;
431             }
432         }
433     }
434 
435     protected boolean removeNode(Node node) throws IOException, JMSException {
436         Header header = null;
437         boolean updatedPrevious = false;
438         if (node.previousKey != null) {
439             // lets point the previous node to our next node
440             Node previousNode = getNode(node.previousKey);
441             if (previousNode != null) {
442                 previousNode.nextKey = node.nextKey;
443                 updateNode(previousNode);
444                 updatedPrevious = true;
445             }
446         }
447         if (!updatedPrevious) {
448             // lets update the head record
449             header = getHeader();
450             header.headKey = node.nextKey;
451         }
452 
453         boolean updatedNext = false;
454         if (node.nextKey != null) {
455             // lets point the next node to our previous node
456             Node nextNode = getNode(node.nextKey);
457             if (nextNode != null) {
458                 nextNode.previousKey = node.previousKey;
459                 updateNode(nextNode);
460                 updatedNext = true;
461             }
462         }
463         if (!updatedNext) {
464             // lets update the tail record
465             header = getHeader();
466             header.tailKey = node.previousKey;
467         }
468         return true;
469     }
470 
471     /***
472      * Looks up the header object, lazily creating one if the current table is empty
473      *
474      * @return
475      * @throws java.io.IOException
476      */
477     protected abstract Header getHeader() throws IOException, JMSException;
478 
479     /***
480      * Writes the header back to disk after its been changed
481      *
482      * @param header
483      * @throws java.io.IOException
484      */
485     protected abstract void updateHeader(Header header) throws IOException, JMSException;
486 
487     /***
488      * Updates the node
489      *
490      * @param node
491      * @throws java.io.IOException
492      */
493     protected abstract void updateNode(Node node) throws IOException, JMSException;
494 
495     protected abstract Node getNode(Long key) throws IOException, JMSException;
496 
497     protected Node getNode(int index) throws IOException, JMSException {
498         Header header = getHeader();
499         return getNode(header, index);
500     }
501 
502     protected Node getNode(Header header, int index) throws IOException, JMSException {
503         Node node = null;
504         int middle = header.size / 2;
505         if (index > middle) {
506             // lets navigate backwards
507             Long key = header.tailKey;
508             for (int i = header.size; i > index && key != null; i--) {
509                 node = getNode(key);
510                 if (node != null) {
511                     key = node.previousKey;
512                 }
513             }
514         }
515         else {
516             Long key = header.headKey;
517             for (int i = 0; i <= index && key != null; i++) {
518                 node = getNode(key);
519                 if (node != null) {
520                     key = node.nextKey;
521                 }
522             }
523         }
524         return node;
525     }
526 
527     protected Node doAddLast(Object value, Header header) throws IOException, JMSException {
528         Node node = createNode();
529         Long key = createKey(header);
530         node.key = key;
531         node.value = value;
532         Long previousKey = header.tailKey;
533         node.previousKey = previousKey;
534         updateNode(node);
535         updatePreviousNode(previousKey, key);
536         header.tailKey = key;
537         if (header.headKey == null) {
538             header.headKey = key;
539         }
540         header.size++;
541         updateHeader(header);
542         return node;
543     }
544 
545     protected void updateNextNode(Long nextKey, Long key) throws IOException, JMSException {
546         if (nextKey != null) {
547             Node nextNode = getNode(nextKey);
548             if (nextNode == null) {
549                 throw new IOException("Missing node for key: " + nextKey);
550             }
551             nextNode.previousKey = key;
552             updateNode(nextNode);
553         }
554     }
555 
556     protected void updatePreviousNode(Long previousKey, Long key) throws IOException, JMSException {
557         if (previousKey != null) {
558             Node previousNode = getNode(previousKey);
559             if (previousNode == null) {
560                 throw new IOException("Missing previous node for key: " + previousKey);
561             }
562             previousNode.nextKey = key;
563             updateNode(previousNode);
564         }
565     }
566 
567     protected Node doAddBefore(Header header, Node nextNode, Object element) throws JMSException, IOException {
568         if (nextNode == null) {
569             return doAddLast(element, header);
570         }
571         else {
572             // lets add before this nextNode
573             Long key = createKey(header);
574             Node node = createNode();
575             node.value = element;
576             node.key = key;
577             Long previousKey = nextNode.previousKey;
578             node.previousKey = previousKey;
579             node.nextKey = nextNode.key;
580             nextNode.previousKey = key;
581             header.size++;
582 
583             updateNode(node);
584             updateNode(nextNode);
585             updatePreviousNode(previousKey, key);
586             updateHeader(header);
587             return node;
588         }
589     }
590 
591     protected abstract void doRemoveNode(Node node) throws IOException, JMSException;
592 
593     protected static Long wrapLong(long value) {
594         // lets use 0 for null
595         if (value == 0) {
596             return null;
597         }
598         // TODO use a cache?
599         return new Long(value);
600     }
601 
602     protected static long unwrapLong(Long key) {
603         if (key != null) {
604             return key.longValue();
605         }
606         // lets use 0 for null
607         return 0;
608     }
609 
610     protected Node createNode() {
611         return new Node();
612     }
613 }