001 package ca.uhn.hl7v2.concurrent;
002
003 import java.util.Collection;
004 import java.util.Map;
005 import java.util.Set;
006 import java.util.concurrent.Callable;
007 import java.util.concurrent.ConcurrentHashMap;
008 import java.util.concurrent.ConcurrentMap;
009 import java.util.concurrent.CountDownLatch;
010 import java.util.concurrent.ExecutorService;
011 import java.util.concurrent.Executors;
012 import java.util.concurrent.Future;
013 import java.util.concurrent.TimeUnit;
014
015 /**
016 * Default Implementation of a {@link BlockingMap}.
017 * <p>
018 * Note: While it is not actively prevented that more then one thread waits for
019 * an entry, it is not guaranteed that all waiting threads will receive the
020 * entry once it became available. Other implementations may choose to count the
021 * waiting threads and/or to remove an available value after a grace period.
022 *
023 * @param <K>
024 * @param <V>
025 */
026 public class BlockingHashMap<K, V> implements BlockingMap<K, V> {
027
028 private final ConcurrentMap<K, V> map = new ConcurrentHashMap<K, V>();
029 private final ConcurrentMap<K, CountDownLatch> latches = new ConcurrentHashMap<K, CountDownLatch>();
030 private final ExecutorService executor;
031
032 public BlockingHashMap() {
033 this(Executors.newCachedThreadPool());
034 }
035
036 public BlockingHashMap(ExecutorService executor) {
037 super();
038 this.executor = executor;
039 }
040
041 /**
042 * Returns the keys of available entries
043 *
044 * @see java.util.Map#keySet()
045 */
046 public Set<K> keySet() {
047 return map.keySet();
048 }
049
050 /**
051 * Returns an available entry without removing it from the map
052 *
053 * @see java.util.Map#get(java.lang.Object)
054 */
055 public V get(Object key) {
056 return map.get(key);
057 }
058
059 /**
060 * Returns <code>true</code> if an entry with the given key is available
061 *
062 * @see java.util.Map#containsKey(java.lang.Object)
063 */
064 public boolean containsKey(Object key) {
065 return map.containsKey(key);
066 }
067
068 /**
069 * @see java.util.Map#put(java.lang.Object, java.lang.Object)
070 */
071 synchronized public V put(K key, V value) {
072 V result = map.put(key, value);
073 latchFor(key).countDown();
074 return result;
075 }
076
077 /**
078 * @see ca.uhn.hl7v2.concurrent.BlockingMap#give(java.lang.Object,
079 * java.lang.Object)
080 */
081 synchronized public boolean give(K key, V value) {
082 if (!latches.containsKey(key)) {
083 return false;
084 }
085 put(key, value);
086 return true;
087 }
088
089 public V take(K key) throws InterruptedException {
090 latchFor(key).await();
091 latches.remove(key);
092 return map.remove(key); // likely to fail there are n > 1 consumers
093 }
094
095
096 public Future<V> asyncTake(final K key) throws InterruptedException {
097 latchFor(key);
098 return executor.submit(new Callable<V>() {
099
100 public V call() throws Exception {
101 return take(key);
102 }
103 });
104 }
105
106 public V poll(K key, long timeout, TimeUnit unit)
107 throws InterruptedException {
108 if (latchFor(key).await(timeout, unit)) {
109 latches.remove(key);
110 return map.remove(key);
111 }
112 return null;
113 }
114
115 public Future<V> asyncPoll(final K key, final long timeout, final TimeUnit unit) {
116 latchFor(key);
117 return executor.submit(new Callable<V>() {
118
119 public V call() throws Exception {
120 return poll(key, timeout, unit);
121 }
122 });
123 }
124
125
126 /**
127 * Returns true if no entry is available for consumers
128 *
129 * @see java.util.Map#isEmpty()
130 */
131 public boolean isEmpty() {
132 return map.isEmpty();
133 }
134
135 /**
136 * Returns the number of available values
137 *
138 * @see java.util.Map#size()
139 */
140 public int size() {
141 return map.size();
142 }
143
144 /**
145 * Removes an entry, regardless whether a value has been set or not. Waiting
146 * consumers will receive a null value.
147 *
148 * @see java.util.Map#remove(java.lang.Object)
149 */
150 synchronized public V remove(Object key) {
151 V result = map.remove(key);
152 CountDownLatch latch = latches.remove(key);
153 if (latch != null)
154 latch.countDown();
155 return result;
156 }
157
158 /**
159 * Clears all existing entries. Waiting consumers will receive a null value
160 * for each removed entry.
161 *
162 * @see java.util.Map#clear()
163 */
164 public void clear() {
165 for (K key : latches.keySet()) {
166 remove(key);
167 }
168 }
169
170 public Collection<V> values() {
171 return map.values();
172 }
173
174 public Set<java.util.Map.Entry<K, V>> entrySet() {
175 return map.entrySet();
176 }
177
178 public void putAll(Map<? extends K, ? extends V> t) {
179 for (Entry<? extends K, ? extends V> entry : t.entrySet()) {
180 put(entry.getKey(), entry.getValue());
181 }
182 }
183
184 public boolean containsValue(Object value) {
185 return map.containsValue(value);
186 }
187
188 private synchronized CountDownLatch latchFor(K key) {
189 CountDownLatch latch = latches.get(key);
190 if (latch == null) {
191 latch = new CountDownLatch(1);
192 latches.put(key, latch);
193 }
194 return latch;
195 }
196
197 }