1 package org.codehaus.activemq.ra;
2
3 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
4
5 /***
6 */
7 public class CircularQueue {
8
9 private final int size;
10
11 private final SynchronizedBoolean stopping;
12
13
14
15 private final Object[] contents;
16 final private Object mutex = new Object();
17
18 private int start=0;
19
20 private int end=0;
21
22 public CircularQueue(int size, SynchronizedBoolean stopping) {
23 this.size = size;
24 contents = new Object[size];
25 this.stopping = stopping;
26 }
27
28 public Object get() {
29 synchronized(mutex) {
30 while( true ) {
31 Object ew = contents[start];
32 if (ew != null) {
33 start++;
34 if(start == contents.length) {
35 start=0;
36 }
37 return ew;
38 } else {
39 try {
40 mutex.wait();
41 if(stopping.get()) {
42 return null;
43 }
44 } catch (InterruptedException e) {
45 return null;
46 }
47 }
48 }
49 }
50 }
51
52 public void returnObject(Object worker) {
53 synchronized(mutex) {
54 contents[end++] = worker;
55 if( end == contents.length) {
56 end=0;
57 }
58 mutex.notify();
59 }
60 }
61
62 public int size() {
63 return contents.length;
64 }
65
66 public void drain() {
67 int i = 0;
68 while (i < size) {
69 if (get() != null) {
70 i++;
71 }
72 }
73 }
74
75
76 public void notifyWaiting() {
77 synchronized(mutex) {
78 mutex.notifyAll();
79 }
80 }
81
82 }