1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.message.util;
20 import java.util.ArrayList;
21 import java.util.List;
22 import javax.jms.JMSException;
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.codehaus.activemq.message.Packet;
26 import org.codehaus.activemq.service.QueueListEntry;
27 import org.codehaus.activemq.service.impl.DefaultQueueList;
28
29 /***
30 * MemoryBoundedQueue is a queue bounded by memory usage for Packets
31 *
32 * @version $Revision: 1.6 $
33 */
34 public class MemoryBoundedQueue implements BoundedPacketQueue {
35 private MemoryBoundedQueueManager queueManager;
36 private String name;
37 private boolean stopped = false;
38 private boolean closed = false;
39 private long memoryUsedByThisQueue;
40 private Object outLock = new Object();
41 private Object inLock = new Object();
42 private DefaultQueueList internalList = new DefaultQueueList();
43 private static final int WAIT_TIMEOUT = 100;
44 private static final Log log = LogFactory.getLog(MemoryBoundedQueueManager.class);
45
46 /***
47 * Constructor
48 *
49 * @param name
50 * @param manager
51 */
52 MemoryBoundedQueue(String name, MemoryBoundedQueueManager manager) {
53 this.name = name;
54 this.queueManager = manager;
55 }
56
57 /***
58 * @return the name of this MemoryBoundedQueue
59 */
60 public String getName() {
61 return name;
62 }
63
64 /***
65 * @return a pretty print of this queue
66 */
67 public String toString() {
68 return "" + name + " , cardinality = " + size() + " memory usage = " + memoryUsedByThisQueue;
69 }
70
71 /***
72 * @return the number of items held by this queue
73 */
74 public int size() {
75 return internalList.size();
76 }
77
78 /***
79 * @return an aproximation the memory used by this queue
80 */
81 public long getLocalMemoryUsedByThisQueue() {
82 return memoryUsedByThisQueue;
83 }
84
85 /***
86 * close and remove this queue from the MemoryBoundedQueueManager
87 */
88 public void close() {
89 try {
90 clear();
91 closed = true;
92 synchronized (outLock) {
93 outLock.notifyAll();
94 }
95 synchronized (inLock) {
96 inLock.notifyAll();
97 }
98 }
99 catch (Throwable e) {
100 e.printStackTrace();
101 }
102 finally {
103 queueManager.removeMemoryBoundedQueue(getName());
104 }
105 }
106
107 /***
108 * Enqueue a Packet without checking memory usage limits
109 *
110 * @param packet
111 */
112 public void enqueueNoBlock(Packet packet) {
113 if (!closed) {
114 internalList.add(packet);
115 incrementMemoryUsed(packet);
116 synchronized (outLock) {
117 outLock.notify();
118 }
119 }
120 }
121
122 /***
123 * Enqueue a Packet to this queue
124 *
125 * @param packet
126 */
127 public void enqueue(Packet packet) {
128 if (!queueManager.isFull()) {
129 enqueueNoBlock(packet);
130 }
131 else {
132 synchronized (inLock) {
133 try {
134 while (queueManager.isFull() && !closed) {
135 inLock.wait(WAIT_TIMEOUT);
136 }
137 }
138 catch (InterruptedException ie) {
139 }
140 }
141 enqueueNoBlock(packet);
142 }
143 }
144
145 /***
146 * Enqueue a packet to the head of the queue with total disregard for memory constraints
147 *
148 * @param packet
149 */
150 public final void enqueueFirstNoBlock(Packet packet) {
151 if (!closed) {
152 internalList.addFirst(packet);
153 incrementMemoryUsed(packet);
154 synchronized (outLock) {
155 outLock.notify();
156 }
157 }
158 }
159
160 /***
161 * Enqueue a Packet to the head of the queue
162 *
163 * @param packet
164 * @throws InterruptedException
165 */
166 public void enqueueFirst(Packet packet) throws InterruptedException {
167 if (!queueManager.isFull()) {
168 enqueueFirstNoBlock(packet);
169 }
170 else {
171 synchronized (inLock) {
172 while (queueManager.isFull() && !closed) {
173 inLock.wait(WAIT_TIMEOUT);
174 }
175 }
176 enqueueFirstNoBlock(packet);
177 }
178 }
179
180 /***
181 * @return the first dequeued Packet or blocks until one is available
182 * @throws InterruptedException
183 */
184 public Packet dequeue() throws InterruptedException {
185 Packet result = null;
186 synchronized (outLock) {
187 while (internalList.isEmpty() && !closed) {
188 outLock.wait(WAIT_TIMEOUT);
189 }
190 result = dequeueNoWait();
191 }
192 return result;
193 }
194
195 /***
196 * Dequeues a Packet from the head of the queue
197 *
198 * @param timeInMillis time to wait for a Packet to be available
199 * @return the first Packet or null if none available within <I>timeInMillis </I>
200 * @throws InterruptedException
201 */
202 public Packet dequeue(long timeInMillis) throws InterruptedException {
203 Packet result = null;
204 if (timeInMillis == 0) {
205 result = dequeue();
206 }
207 else {
208 synchronized (outLock) {
209
210 long waitTime = timeInMillis;
211 long start = (timeInMillis <= 0) ? 0 : System.currentTimeMillis();
212 while (!closed) {
213 result = dequeueNoWait();
214 if (result != null || waitTime <= 0) {
215 break;
216 }
217 else {
218 outLock.wait(waitTime);
219 waitTime = timeInMillis - (System.currentTimeMillis() - start);
220 }
221 }
222 }
223 }
224 return result;
225 }
226
227 /***
228 * dequeues a Packet from the head of the queue
229 *
230 * @return the Packet at the head of the queue or null, if none is available
231 * @throws InterruptedException
232 */
233 public Packet dequeueNoWait() throws InterruptedException {
234 Packet packet = null;
235 if (stopped) {
236 synchronized (outLock) {
237 while (stopped && !closed) {
238 outLock.wait(WAIT_TIMEOUT);
239 }
240 }
241 }
242 packet = (Packet) internalList.removeFirst();
243 decrementMemoryUsed(packet);
244 if (packet != null) {
245 synchronized (inLock) {
246 inLock.notify();
247 }
248 }
249 return packet;
250 }
251
252 /***
253 * @return true if the queue is enabled for dequeing (default = true)
254 */
255 public boolean isStarted() {
256 return stopped == false;
257 }
258
259 /***
260 * disable dequeueing
261 */
262 public void stop() {
263 synchronized (outLock) {
264 stopped = true;
265 }
266 }
267
268 /***
269 * enable dequeueing
270 */
271 public void start() {
272 stopped = false;
273 synchronized (outLock) {
274 outLock.notifyAll();
275 }
276 synchronized (inLock) {
277 inLock.notifyAll();
278 }
279 }
280
281 /***
282 * Remove a packet from the queue
283 *
284 * @param packet
285 * @return true if the packet was found
286 */
287 public boolean remove(Packet packet) {
288 boolean result = false;
289 if (!internalList.isEmpty()) {
290 result = internalList.remove(packet);
291 }
292 if (result) {
293 decrementMemoryUsed(packet);
294 }
295 synchronized (inLock) {
296 inLock.notify();
297 }
298 return result;
299 }
300
301 /***
302 * Remove a Packet by it's id
303 *
304 * @param id
305 * @return
306 */
307 public Packet remove(String id) {
308 Packet result = null;
309 QueueListEntry entry = internalList.getFirstEntry();
310 try {
311 while (entry != null) {
312 Packet p = (Packet) entry.getElement();
313 if (p.getId().equals(id)) {
314 result = p;
315 internalList.remove(entry);
316 break;
317 }
318 entry = internalList.getNextEntry(entry);
319 }
320 }
321 catch (JMSException jmsEx) {
322 jmsEx.printStackTrace();
323 }
324 synchronized (inLock) {
325 inLock.notify();
326 }
327 return result;
328 }
329
330 /***
331 * remove any Packets in the queue
332 */
333 public void clear() {
334 while (!internalList.isEmpty()) {
335 Packet packet = (Packet) internalList.removeFirst();
336 decrementMemoryUsed(packet);
337 }
338 synchronized (inLock) {
339 inLock.notifyAll();
340 }
341 }
342
343 /***
344 * @return true if the queue is empty
345 */
346 public boolean isEmpty() {
347 return internalList.isEmpty();
348 }
349
350 /***
351 * retrieve a Packet at an indexed position in the queue
352 *
353 * @param index
354 * @return
355 */
356 public Packet get(int index) {
357 return (Packet) internalList.get(index);
358 }
359
360 /***
361 * Retrieve a shallow copy of the contents as a list
362 *
363 * @return a list containing the bounded queue contents
364 */
365 public List getContents() {
366 Object[] array = internalList.toArray();
367 List list = new ArrayList();
368 for (int i = 0;i < array.length;i++) {
369 list.add(array[i]);
370 }
371 return list;
372 }
373
374 private synchronized void incrementMemoryUsed(Packet packet) {
375 if (packet != null) {
376 memoryUsedByThisQueue += queueManager.incrementMemoryUsed(packet);
377 }
378 }
379
380 private synchronized void decrementMemoryUsed(Packet packet) {
381 if (packet != null) {
382 memoryUsedByThisQueue -= queueManager.decrementMemoryUsed(packet);
383 }
384 }
385 }