1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.message.util;
20 import java.util.Iterator;
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.codehaus.activemq.capacity.BasicCapacityMonitor;
24 import org.codehaus.activemq.message.Packet;
25 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
26 import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
27
28 /***
29 * A factory manager for MemoryBoundedQueue and also ensures that the maximum memory used by all active
30 * MemoryBoundedQueues cewated by this instance stays within the memory usage bounds set.
31 *
32 * @version $Revision: 1.2 $
33 */
34 public class MemoryBoundedQueueManager extends BasicCapacityMonitor {
35 private static final int OBJECT_OVERHEAD = 50;
36 SynchronizedLong totalMemoryUsedSize = new SynchronizedLong(0);
37 private ConcurrentHashMap activeQueues = new ConcurrentHashMap();
38 private static final Log log = LogFactory.getLog(MemoryBoundedQueueManager.class);
39
40 /***
41 * @param name
42 * @param maxSize
43 */
44 public MemoryBoundedQueueManager(String name, long maxSize) {
45 super(name, maxSize);
46 }
47
48 /***
49 * retrieve a named MemoryBoundedQueue or creates one if not found
50 *
51 * @param name
52 * @return an named instance of a MemoryBoundedQueue
53 */
54 public MemoryBoundedQueue getMemoryBoundedQueue(String name) {
55 MemoryBoundedQueue result = (MemoryBoundedQueue) activeQueues.get(name);
56 if (result == null) {
57 result = new MemoryBoundedQueue(name, this);
58 activeQueues.put(name, result);
59 }
60 return result;
61 }
62
63 /***
64 * close this queue manager and all associated MemoryBoundedQueues
65 */
66 public void close() {
67 for (Iterator i = activeQueues.values().iterator();i.hasNext();) {
68 MemoryBoundedQueue mbq = (MemoryBoundedQueue) i.next();
69 mbq.close();
70 }
71 activeQueues.clear();
72 }
73
74 /***
75 * @return the calculated total memory usage assocated with all it's queues
76 */
77 public long getTotalMemoryUsedSize() {
78 return totalMemoryUsedSize.get();
79 }
80
81 /***
82 * @return true if this MemoryBoundedQueueManager has reached it's predefined limit
83 */
84 public boolean isFull() {
85 boolean result = totalMemoryUsedSize.get() >= super.getValueLimit();
86 return result;
87 }
88
89 int incrementMemoryUsed(Packet obj) {
90 int size = OBJECT_OVERHEAD;
91 if (obj != null) {
92 if (obj.getMemoryUsageReferenceCount() == 0) {
93 size += obj.getMemoryUsage();
94 }
95 obj.incrementMemoryReferenceCount();
96 }
97 totalMemoryUsedSize.add(size);
98 super.setCurrentValue(totalMemoryUsedSize.get());
99 return size;
100 }
101
102 int decrementMemoryUsed(Packet obj) {
103 int size = OBJECT_OVERHEAD;
104 if (obj != null) {
105 obj.decrementMemoryReferenceCount();
106 if (obj.getMemoryUsageReferenceCount() == 0) {
107 size += obj.getMemoryUsage();
108 }
109 }
110 totalMemoryUsedSize.subtract(size);
111 super.setCurrentValue(totalMemoryUsedSize.get());
112 return size;
113 }
114
115 protected void finalize() {
116 close();
117 }
118
119 void removeMemoryBoundedQueue(String name) {
120 activeQueues.remove(name);
121 }
122 }