1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.message.util;
20  import java.util.ArrayList;
21  import java.util.List;
22  import junit.framework.Test;
23  import junit.framework.TestCase;
24  import junit.framework.TestSuite;
25  import org.codehaus.activemq.capacity.CapacityMonitorEvent;
26  import org.codehaus.activemq.capacity.CapacityMonitorEventListener;
27  import org.codehaus.activemq.message.Packet;
28  import org.codehaus.activemq.message.Receipt;
29  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
30  import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
31  
32  /***
33   * MemoryBoundedQueueTest
34   * 
35   * @version $Revision: 1.2 $
36   */
37  public class MemoryBoundedQueueTest extends TestCase {
38      private static final int TEST_INSTANCE_SIZE = 2048;
39      private static final int TEST_ENQUEUE_SIZE = TEST_INSTANCE_SIZE / 2;
40      private static final String QUEUE_NAME = "TestQueue";
41      private final int TOTAL_LOAD = 50000;
42      private final int NUMBER_CONSUMERS = 10;
43      private SynchronizedInt count = new SynchronizedInt(0);
44      private SynchronizedInt stoppedCount = new SynchronizedInt(0);
45      private final MemoryBoundedQueueManager queueManager = new MemoryBoundedQueueManager("testmanager", 1024 * 1024);
46      private class Dequeue implements Runnable {
47          private MemoryBoundedQueue queue;
48          private Object mutex;
49          private int num = 0;
50          private int internalCount = 0;
51          private int localCount;
52  
53          Dequeue(MemoryBoundedQueue q, int num, Object mutex, int localCount) {
54              this.queue = q;
55              this.num = num;
56              this.mutex = mutex;
57              this.localCount = localCount;
58          }
59  
60          public void run() {
61              while (internalCount < localCount) {
62                  try {
63                      Packet obj = queue.dequeue();
64                      if (obj != null) {
65                          count.increment();
66                          internalCount++;
67                          if (count.get() == TOTAL_LOAD) {
68                              synchronized (mutex) {
69                                  queue.stop();
70                                  mutex.notify();
71                              }
72                          }
73                      }
74                      else {
75                          break;
76                      }
77                  }
78                  catch (InterruptedException ie) {
79                      ie.printStackTrace();
80                  }
81                  Thread.yield();
82              }
83              stoppedCount.increment();
84          }
85  
86          public String toString() {
87              String result = "Dequeue(" + num + ") count = " + internalCount;
88              return result;
89          }
90      }
91  
92      public MemoryBoundedQueueTest(String s) {
93          super(s);
94          /*
95          queueManager.addCapacityChangedEventListener(new CapacityMonitorEventListener(){
96  
97              public void capacityChanged(CapacityMonitorEvent event) {
98                  System.out.println("Capacity Changed: = " + event);
99                  
100             }
101             
102         });
103         */
104     }
105 
106     protected void setUp() {
107     }
108 
109     protected void tearDown() {
110     }
111 
112     public void testLoad() throws Exception {
113         Object mutex = new Object();
114         final MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
115         queueManager.setValueLimit(TEST_INSTANCE_SIZE * 100);
116         final List list = new ArrayList(NUMBER_CONSUMERS);
117         int numberOfMessages = TOTAL_LOAD / NUMBER_CONSUMERS;
118         for (int i = 0;i < NUMBER_CONSUMERS;i++) {
119             Dequeue dq = new Dequeue(queue, i, mutex, numberOfMessages);
120             list.add(dq);
121             Thread t = new Thread(dq);
122             t.setPriority(Thread.NORM_PRIORITY - 1);
123             t.start();
124         }
125         Thread t = new Thread(new Runnable() {
126             public void run() {
127                 try {
128                     while (count.get() < TOTAL_LOAD) {
129                         Thread.sleep(250);
130                         //System.out.println("Count so far = " + count);
131                     }
132                 }
133                 catch (Throwable e) {
134                     e.printStackTrace();
135                 }
136             }
137         });
138         t.setPriority(Thread.MAX_PRIORITY);
139         t.start();
140         for (int i = 0;i < TOTAL_LOAD;i++) {
141             Receipt rec = new Receipt();
142             rec.setMemoryUsage(TEST_INSTANCE_SIZE);
143             queue.enqueue(rec);
144         }
145         try {
146             synchronized (mutex) {
147                 while (count.get() < TOTAL_LOAD) {
148                     mutex.wait(250);
149                 }
150             }
151         }
152         catch (InterruptedException ie) {
153             ie.printStackTrace();
154         }
155         //System.out.println("Finished!");
156         Thread.sleep(250);
157         assertTrue(stoppedCount.get() == NUMBER_CONSUMERS);
158         //System.out.println("total memory left = " + MemoryBoundedQueueManager.instance.getTotalMemoryUsedSize());
159         assertTrue(queueManager.getTotalMemoryUsedSize() == 0);
160         queue.close();
161     }
162 
163     public void testClear() {
164         final MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
165         queueManager.setValueLimit(TEST_INSTANCE_SIZE);
166         Receipt obj = new Receipt();
167         queue.enqueue(obj);
168         queue.clear();
169         assertTrue(queue.size() == 0);
170         queue.close();
171     }
172 
173     public void testDequeue() throws Exception {
174         final MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
175         queueManager.setValueLimit(TEST_INSTANCE_SIZE * 100);
176         Receipt obj = new Receipt();
177         queue.enqueue(obj);
178         Object result = queue.dequeue();
179         assertTrue(result == obj);
180         queue.close();
181     }
182 
183     public void testClose() {
184         /*** @todo: Insert test code here. Use assertEquals(), for example. */
185         final MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
186         queueManager.setValueLimit(TEST_ENQUEUE_SIZE);
187         final SynchronizedBoolean success = new SynchronizedBoolean(false);
188         final MemoryBoundedQueue q1 = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
189         assertTrue(queue == q1);
190         Thread t = new Thread(new Runnable() {
191             public void run() {
192                 try {
193                     Thread.sleep(250);
194                     queue.dequeue();
195                 }
196                 catch (Exception e) {
197                     e.printStackTrace();
198                 }
199                 synchronized (success) {
200                     success.set(true);
201                     success.notify();
202                 }
203             }
204         });
205         t.start();
206         queue.close();
207         try {
208             synchronized (success) {
209                 if (!success.get()) {
210                     success.wait(2000);
211                 }
212             }
213         }
214         catch (Throwable e) {
215             e.printStackTrace();
216         }
217         assertTrue(success.get());
218        
219         MemoryBoundedQueue q2 = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
220         assertTrue(queue != q2);
221     }
222 
223     public void testDequeueNoWait() throws Exception {
224         final MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
225         Object obj = queue.dequeueNoWait();
226         assertTrue(obj == null);
227         queue.close();
228     }
229 
230     public void testEnqueueFirst() throws Exception {
231         final MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
232         assertTrue(queueManager.getTotalMemoryUsedSize() == 0);
233         Object mutex = new Object();
234         queueManager.setValueLimit(TEST_INSTANCE_SIZE * 100);
235         for (int i = 0;i < 10;i++) {
236             queue.enqueue(new Receipt());
237         }
238         Receipt test = new Receipt();
239         test.setId("FIRST");
240         queue.enqueueFirst(test);
241         Object obj = queue.dequeue();
242         assertTrue(obj == test);
243         queue.close();
244     }
245 
246     public void testEnqueueNoBlock() {
247         MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
248         queueManager.setValueLimit(TEST_ENQUEUE_SIZE);
249         Receipt test = new Receipt();
250         queue.enqueueNoBlock(test);
251         assertTrue(true);
252         queue.close();
253     }
254 
255     public void testIsEmpty() {
256         int size = 10;
257         MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
258         for (int i = 0;i < size;i++) {
259             queue.enqueue(new Receipt());
260         }
261         queue.clear();
262         assertTrue(queue.isEmpty());
263         queue.close();
264     }
265 
266     public void testRemove() {
267         MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
268         Receipt test = new Receipt();
269         queue.enqueue(test);
270         assertTrue(queue.remove(test));
271         queue.close();
272     }
273 
274     public void testSize() {
275         int size = 10;
276         MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
277         for (int i = 0;i < size;i++) {
278             queue.enqueue(new Receipt());
279         }
280         assertTrue(queue.size() == size);
281         queue.close();
282     }
283     
284     public void testRemovePacket(){
285         int size = 100;
286         MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
287         List list = new ArrayList(size);
288         for (int i = 0;i < size;i++) {
289             Packet p = new Receipt();
290             p.setId(""+i);
291             list.add(p);
292             queue.enqueue(p);
293         }
294         for (int i =0; i < size; i++){
295             queue.remove((Packet)list.get(i));
296         }
297         assertTrue(queue.size() == 0);
298         queue.close();
299     }
300     
301     public void testRemovePacketById(){
302         int size = 100;
303         MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
304         List list = new ArrayList(size);
305         for (int i = 0;i < size;i++) {
306             Packet p = new Receipt();
307             p.setId(""+i);
308             list.add(p);
309             queue.enqueue(p);
310         }
311         for (int i =0; i < size; i++){
312             Packet p = (Packet)list.get(i);
313             Packet removed = queue.remove(p.getId());
314             assertTrue(removed != null);
315             assertTrue(removed == p);
316         }
317         assertTrue(queue.size() == 0);
318         queue.close();
319     }
320 
321     public static Test suite() {
322         return new TestSuite(MemoryBoundedQueueTest.class);
323     }
324 
325     public static void main(String[] args) {
326         MemoryBoundedQueueTest test = new MemoryBoundedQueueTest("test");
327         test.setUp();
328         test.testClose();
329     }
330 }