001    package ca.uhn.hl7v2.concurrent;
002    
003    import java.util.Collection;
004    import java.util.Map;
005    import java.util.Set;
006    import java.util.concurrent.Callable;
007    import java.util.concurrent.ConcurrentHashMap;
008    import java.util.concurrent.ConcurrentMap;
009    import java.util.concurrent.CountDownLatch;
010    import java.util.concurrent.ExecutorService;
011    import java.util.concurrent.Executors;
012    import java.util.concurrent.Future;
013    import java.util.concurrent.TimeUnit;
014    
015    /**
016     * Default Implementation of a {@link BlockingMap}.
017     * <p>
018     * Note: While it is not actively prevented that more then one thread waits for
019     * an entry, it is not guaranteed that all waiting threads will receive the
020     * entry once it became available. Other implementations may choose to count the
021     * waiting threads and/or to remove an available value after a grace period.
022     * 
023     * @param <K>
024     * @param <V>
025     */
026    public class BlockingHashMap<K, V> implements BlockingMap<K, V> {
027    
028            private final ConcurrentMap<K, V> map = new ConcurrentHashMap<K, V>();
029            private final ConcurrentMap<K, CountDownLatch> latches = new ConcurrentHashMap<K, CountDownLatch>();
030            private final ExecutorService executor;
031            
032            public BlockingHashMap() {
033                    this(Executors.newCachedThreadPool());
034            }
035            
036            public BlockingHashMap(ExecutorService executor) {
037                    super();
038                    this.executor = executor;
039            }
040    
041            /**
042             * Returns the keys of available entries
043             * 
044             * @see java.util.Map#keySet()
045             */
046            public Set<K> keySet() {
047                    return map.keySet();
048            }
049    
050            /**
051             * Returns an available entry without removing it from the map
052             * 
053             * @see java.util.Map#get(java.lang.Object)
054             */
055            public V get(Object key) {
056                    return map.get(key);
057            }
058    
059            /**
060             * Returns <code>true</code> if an entry with the given key is available
061             * 
062             * @see java.util.Map#containsKey(java.lang.Object)
063             */
064            public boolean containsKey(Object key) {
065                    return map.containsKey(key);
066            }
067    
068            /**
069             * @see java.util.Map#put(java.lang.Object, java.lang.Object)
070             */
071            synchronized public V put(K key, V value) {
072                    V result = map.put(key, value);
073                    latchFor(key).countDown();
074                    return result;
075            }
076    
077            /**
078             * @see ca.uhn.hl7v2.concurrent.BlockingMap#give(java.lang.Object,
079             *      java.lang.Object)
080             */
081            synchronized public boolean give(K key, V value) {
082                    if (!latches.containsKey(key)) {
083                            return false;
084                    }
085                    put(key, value);
086                    return true;
087            }
088    
089            public V take(K key) throws InterruptedException {
090                    latchFor(key).await();
091                    latches.remove(key);
092                    return map.remove(key); // likely to fail there are n > 1 consumers
093            }
094            
095    
096            public Future<V> asyncTake(final K key) throws InterruptedException {
097                    latchFor(key);
098                    return executor.submit(new Callable<V>() {
099    
100                            public V call() throws Exception {
101                                    return take(key);
102                            }
103                    });
104            }
105    
106            public V poll(K key, long timeout, TimeUnit unit)
107                            throws InterruptedException {
108                    if (latchFor(key).await(timeout, unit)) {
109                            latches.remove(key);
110                            return map.remove(key);
111                    }
112                    return null;
113            }
114            
115            public Future<V> asyncPoll(final K key, final long timeout, final TimeUnit unit) {
116                    latchFor(key);
117                    return executor.submit(new Callable<V>() {
118    
119                            public V call() throws Exception {
120                                    return poll(key, timeout, unit);
121                            }
122                    });             
123            }
124            
125    
126            /**
127             * Returns true if no entry is available for consumers
128             * 
129             * @see java.util.Map#isEmpty()
130             */
131            public boolean isEmpty() {
132                    return map.isEmpty();
133            }
134    
135            /**
136             * Returns the number of available values
137             * 
138             * @see java.util.Map#size()
139             */
140            public int size() {
141                    return map.size();
142            }
143    
144            /**
145             * Removes an entry, regardless whether a value has been set or not. Waiting
146             * consumers will receive a null value.
147             * 
148             * @see java.util.Map#remove(java.lang.Object)
149             */
150            synchronized public V remove(Object key) {
151                    V result = map.remove(key);
152                    CountDownLatch latch = latches.remove(key);
153                    if (latch != null)
154                            latch.countDown();
155                    return result;
156            }
157    
158            /**
159             * Clears all existing entries. Waiting consumers will receive a null value
160             * for each removed entry.
161             * 
162             * @see java.util.Map#clear()
163             */
164            public void clear() {
165                    for (K key : latches.keySet()) {
166                            remove(key);
167                    }
168            }
169    
170            public Collection<V> values() {
171                    return map.values();
172            }
173    
174            public Set<java.util.Map.Entry<K, V>> entrySet() {
175                    return map.entrySet();
176            }
177    
178            public void putAll(Map<? extends K, ? extends V> t) {
179                    for (Entry<? extends K, ? extends V> entry : t.entrySet()) {
180                            put(entry.getKey(), entry.getValue());
181                    }
182            }
183    
184            public boolean containsValue(Object value) {
185                    return map.containsValue(value);
186            }
187    
188            private synchronized CountDownLatch latchFor(K key) {
189                    CountDownLatch latch = latches.get(key);
190                    if (latch == null) {
191                            latch = new CountDownLatch(1);
192                            latches.put(key, latch);
193                    }
194                    return latch;
195            }
196    
197    }