1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.message.util;
20 import java.util.ArrayList;
21 import java.util.List;
22 import junit.framework.Test;
23 import junit.framework.TestCase;
24 import junit.framework.TestSuite;
25 import org.codehaus.activemq.capacity.CapacityMonitorEvent;
26 import org.codehaus.activemq.capacity.CapacityMonitorEventListener;
27 import org.codehaus.activemq.message.Packet;
28 import org.codehaus.activemq.message.Receipt;
29 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
30 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
31
32 /***
33 * MemoryBoundedQueueTest
34 *
35 * @version $Revision: 1.2 $
36 */
37 public class MemoryBoundedQueueTest extends TestCase {
38 private static final int TEST_INSTANCE_SIZE = 2048;
39 private static final int TEST_ENQUEUE_SIZE = TEST_INSTANCE_SIZE / 2;
40 private static final String QUEUE_NAME = "TestQueue";
41 private final int TOTAL_LOAD = 50000;
42 private final int NUMBER_CONSUMERS = 10;
43 private SynchronizedInt count = new SynchronizedInt(0);
44 private SynchronizedInt stoppedCount = new SynchronizedInt(0);
45 private final MemoryBoundedQueueManager queueManager = new MemoryBoundedQueueManager("testmanager", 1024 * 1024);
46 private class Dequeue implements Runnable {
47 private MemoryBoundedQueue queue;
48 private Object mutex;
49 private int num = 0;
50 private int internalCount = 0;
51 private int localCount;
52
53 Dequeue(MemoryBoundedQueue q, int num, Object mutex, int localCount) {
54 this.queue = q;
55 this.num = num;
56 this.mutex = mutex;
57 this.localCount = localCount;
58 }
59
60 public void run() {
61 while (internalCount < localCount) {
62 try {
63 Packet obj = queue.dequeue();
64 if (obj != null) {
65 count.increment();
66 internalCount++;
67 if (count.get() == TOTAL_LOAD) {
68 synchronized (mutex) {
69 queue.stop();
70 mutex.notify();
71 }
72 }
73 }
74 else {
75 break;
76 }
77 }
78 catch (InterruptedException ie) {
79 ie.printStackTrace();
80 }
81 Thread.yield();
82 }
83 stoppedCount.increment();
84 }
85
86 public String toString() {
87 String result = "Dequeue(" + num + ") count = " + internalCount;
88 return result;
89 }
90 }
91
92 public MemoryBoundedQueueTest(String s) {
93 super(s);
94
95
96
97
98
99
100
101
102
103
104 }
105
106 protected void setUp() {
107 }
108
109 protected void tearDown() {
110 }
111
112 public void testLoad() throws Exception {
113 Object mutex = new Object();
114 final MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
115 queueManager.setValueLimit(TEST_INSTANCE_SIZE * 100);
116 final List list = new ArrayList(NUMBER_CONSUMERS);
117 int numberOfMessages = TOTAL_LOAD / NUMBER_CONSUMERS;
118 for (int i = 0;i < NUMBER_CONSUMERS;i++) {
119 Dequeue dq = new Dequeue(queue, i, mutex, numberOfMessages);
120 list.add(dq);
121 Thread t = new Thread(dq);
122 t.setPriority(Thread.NORM_PRIORITY - 1);
123 t.start();
124 }
125 Thread t = new Thread(new Runnable() {
126 public void run() {
127 try {
128 while (count.get() < TOTAL_LOAD) {
129 Thread.sleep(250);
130
131 }
132 }
133 catch (Throwable e) {
134 e.printStackTrace();
135 }
136 }
137 });
138 t.setPriority(Thread.MAX_PRIORITY);
139 t.start();
140 for (int i = 0;i < TOTAL_LOAD;i++) {
141 Receipt rec = new Receipt();
142 rec.setMemoryUsage(TEST_INSTANCE_SIZE);
143 queue.enqueue(rec);
144 }
145 try {
146 synchronized (mutex) {
147 while (count.get() < TOTAL_LOAD) {
148 mutex.wait(250);
149 }
150 }
151 }
152 catch (InterruptedException ie) {
153 ie.printStackTrace();
154 }
155
156 Thread.sleep(250);
157 assertTrue(stoppedCount.get() == NUMBER_CONSUMERS);
158
159 assertTrue(queueManager.getTotalMemoryUsedSize() == 0);
160 queue.close();
161 }
162
163 public void testClear() {
164 final MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
165 queueManager.setValueLimit(TEST_INSTANCE_SIZE);
166 Receipt obj = new Receipt();
167 queue.enqueue(obj);
168 queue.clear();
169 assertTrue(queue.size() == 0);
170 queue.close();
171 }
172
173 public void testDequeue() throws Exception {
174 final MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
175 queueManager.setValueLimit(TEST_INSTANCE_SIZE * 100);
176 Receipt obj = new Receipt();
177 queue.enqueue(obj);
178 Object result = queue.dequeue();
179 assertTrue(result == obj);
180 queue.close();
181 }
182
183 public void testClose() {
184 /*** @todo: Insert test code here. Use assertEquals(), for example. */
185 final MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
186 queueManager.setValueLimit(TEST_ENQUEUE_SIZE);
187 final SynchronizedBoolean success = new SynchronizedBoolean(false);
188 final MemoryBoundedQueue q1 = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
189 assertTrue(queue == q1);
190 Thread t = new Thread(new Runnable() {
191 public void run() {
192 try {
193 Thread.sleep(250);
194 queue.dequeue();
195 }
196 catch (Exception e) {
197 e.printStackTrace();
198 }
199 synchronized (success) {
200 success.set(true);
201 success.notify();
202 }
203 }
204 });
205 t.start();
206 queue.close();
207 try {
208 synchronized (success) {
209 if (!success.get()) {
210 success.wait(2000);
211 }
212 }
213 }
214 catch (Throwable e) {
215 e.printStackTrace();
216 }
217 assertTrue(success.get());
218
219 MemoryBoundedQueue q2 = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
220 assertTrue(queue != q2);
221 }
222
223 public void testDequeueNoWait() throws Exception {
224 final MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
225 Object obj = queue.dequeueNoWait();
226 assertTrue(obj == null);
227 queue.close();
228 }
229
230 public void testEnqueueFirst() throws Exception {
231 final MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
232 assertTrue(queueManager.getTotalMemoryUsedSize() == 0);
233 Object mutex = new Object();
234 queueManager.setValueLimit(TEST_INSTANCE_SIZE * 100);
235 for (int i = 0;i < 10;i++) {
236 queue.enqueue(new Receipt());
237 }
238 Receipt test = new Receipt();
239 test.setId("FIRST");
240 queue.enqueueFirst(test);
241 Object obj = queue.dequeue();
242 assertTrue(obj == test);
243 queue.close();
244 }
245
246 public void testEnqueueNoBlock() {
247 MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
248 queueManager.setValueLimit(TEST_ENQUEUE_SIZE);
249 Receipt test = new Receipt();
250 queue.enqueueNoBlock(test);
251 assertTrue(true);
252 queue.close();
253 }
254
255 public void testIsEmpty() {
256 int size = 10;
257 MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
258 for (int i = 0;i < size;i++) {
259 queue.enqueue(new Receipt());
260 }
261 queue.clear();
262 assertTrue(queue.isEmpty());
263 queue.close();
264 }
265
266 public void testRemove() {
267 MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
268 Receipt test = new Receipt();
269 queue.enqueue(test);
270 assertTrue(queue.remove(test));
271 queue.close();
272 }
273
274 public void testSize() {
275 int size = 10;
276 MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
277 for (int i = 0;i < size;i++) {
278 queue.enqueue(new Receipt());
279 }
280 assertTrue(queue.size() == size);
281 queue.close();
282 }
283
284 public void testRemovePacket(){
285 int size = 100;
286 MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
287 List list = new ArrayList(size);
288 for (int i = 0;i < size;i++) {
289 Packet p = new Receipt();
290 p.setId(""+i);
291 list.add(p);
292 queue.enqueue(p);
293 }
294 for (int i =0; i < size; i++){
295 queue.remove((Packet)list.get(i));
296 }
297 assertTrue(queue.size() == 0);
298 queue.close();
299 }
300
301 public void testRemovePacketById(){
302 int size = 100;
303 MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
304 List list = new ArrayList(size);
305 for (int i = 0;i < size;i++) {
306 Packet p = new Receipt();
307 p.setId(""+i);
308 list.add(p);
309 queue.enqueue(p);
310 }
311 for (int i =0; i < size; i++){
312 Packet p = (Packet)list.get(i);
313 Packet removed = queue.remove(p.getId());
314 assertTrue(removed != null);
315 assertTrue(removed == p);
316 }
317 assertTrue(queue.size() == 0);
318 queue.close();
319 }
320
321 public static Test suite() {
322 return new TestSuite(MemoryBoundedQueueTest.class);
323 }
324
325 public static void main(String[] args) {
326 MemoryBoundedQueueTest test = new MemoryBoundedQueueTest("test");
327 test.setUp();
328 test.testClose();
329 }
330 }