001    /**
002    The contents of this file are subject to the Mozilla Public License Version 1.1 
003    (the "License"); you may not use this file except in compliance with the License. 
004    You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
005    Software distributed under the License is distributed on an "AS IS" basis, 
006    WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
007    specific language governing rights and limitations under the License. 
008    
009    The Original Code is "ConnectionHub.java".  Description: 
010    "Provides access to shared HL7 Connections" 
011    
012    The Initial Developer of the Original Code is University Health Network. Copyright (C) 
013    2001.  All Rights Reserved. 
014    
015    Contributor(s): ______________________________________. 
016    
017    Alternatively, the contents of this file may be used under the terms of the 
018    GNU General Public License (the  �GPL�), in which case the provisions of the GPL are 
019    applicable instead of those above.  If you wish to allow use of your version of this 
020    file only under the terms of the GPL and not to allow others to use your version 
021    of this file under the MPL, indicate your decision by deleting  the provisions above 
022    and replace  them with the notice and other provisions required by the GPL License.  
023    If you do not delete the provisions above, a recipient may use your version of 
024    this file under either the MPL or the GPL. 
025     */
026    
027    package ca.uhn.hl7v2.app;
028    
029    import java.util.Collections;
030    import java.util.Map;
031    import java.util.Set;
032    import java.util.concurrent.ConcurrentHashMap;
033    import java.util.concurrent.ConcurrentMap;
034    import java.util.concurrent.ExecutorService;
035    
036    import org.slf4j.Logger;
037    import org.slf4j.LoggerFactory;
038    
039    import ca.uhn.hl7v2.HL7Exception;
040    import ca.uhn.hl7v2.concurrent.DefaultExecutorService;
041    import ca.uhn.hl7v2.llp.LowerLayerProtocol;
042    import ca.uhn.hl7v2.parser.Parser;
043    
044    /**
045     * <p>
046     * Provides access to shared HL7 Connections. The ConnectionHub has at most one
047     * connection to any given address at any time.
048     * </p>
049     * <p>
050     * <b>Synchronization Note:</b> This class should be safe to use in a
051     * multithreaded environment. A synchronization mutex is maintained for any
052     * given target host and port, so that if two threads are trying to connect to
053     * two separate destinations neither will block, but if two threads are trying
054     * to connect to the same destination, one will block until the other has
055     * finished trying. Use caution if this class is to be used in an environment
056     * where a very large (over 1000) number of target host/port destinations will
057     * be accessed at the same time.
058     * </p>
059     * 
060     * @author Bryan Tripp
061     */
062    public class ConnectionHub {
063    
064            /**
065             * Set a system property with this key to a string containing an integer
066             * larger than the default ("1000") if you need to connect to a very large
067             * number of targets at the same time in a multithreaded environment.
068             */
069            public static final String MAX_CONCURRENT_TARGETS = ConnectionHub.class
070                            .getName() + ".maxSize";
071            private static final Logger log = LoggerFactory
072                            .getLogger(ConnectionHub.class);
073            private static ConnectionHub instance = null;
074            private final CountingMap<ConnectionData, Connection> connections;
075            private final ConcurrentMap<String, String> connectionMutexes = new ConcurrentHashMap<String, String>();
076            private final ExecutorService executorService;
077            
078            /** Creates a new instance of ConnectionHub */
079            private ConnectionHub(ExecutorService executorService) {
080                    this.executorService = executorService;
081                    connections = new CountingMap<ConnectionData, Connection>() {
082    
083                            @Override
084                            protected Connection open(ConnectionData connectionData)
085                                            throws Exception {
086                                    return ConnectionFactory.open(connectionData,
087                                                    ConnectionHub.this.executorService);
088                            }
089    
090                            @Override
091                            protected void dispose(Connection connection) {
092                                    connection.close();
093                            }
094    
095                    };
096            }
097    
098            /** Returns the singleton instance of ConnectionHub */
099            public static ConnectionHub getInstance() {
100                    return getInstance(DefaultExecutorService.getDefaultService());
101            }
102    
103            public static void shutdown() {
104                    ConnectionHub hub = getInstance();
105                    if (DefaultExecutorService.isDefaultService(hub.executorService)) {
106                            hub.executorService.shutdown();
107                            instance = null;
108                    }
109            }
110    
111            /** Returns the singleton instance of ConnectionHub. If called */
112            public synchronized static ConnectionHub getInstance(ExecutorService service) {
113                    if (instance == null || service.isShutdown()) {
114                            instance = new ConnectionHub(service);
115                    }
116                    return instance;
117            }
118    
119            /**
120             * Returns a Connection to the given address, opening this Connection if
121             * necessary. The given Parser will only be used if a new Connection is
122             * opened, so there is no guarantee that the Connection returned will be
123             * using the Parser you provide. If you need explicit access to the Parser
124             * the Connection is using, call <code>Connection.getParser()</code>.
125             */
126            public Connection attach(String host, int port, Parser parser,
127                            Class<? extends LowerLayerProtocol> llpClass) throws HL7Exception {
128                    return attach(host, port, parser, llpClass, false);
129            }
130    
131            public Connection attach(String host, int port, Parser parser,
132                            Class<? extends LowerLayerProtocol> llpClass, boolean tls)
133                            throws HL7Exception {
134                    return attach(host, port, 0, parser, llpClass, tls);
135            }
136    
137            public Connection attach(String host, int port, Parser parser,
138                            LowerLayerProtocol llp, boolean tls)
139                            throws HL7Exception {
140                    return attach(host, port, 0, parser, llp, tls);
141            }
142    
143            public Connection attach(String host, int outboundPort, int inboundPort,
144                            Parser parser, Class<? extends LowerLayerProtocol> llpClass)
145                            throws HL7Exception {
146                    return attach(host, outboundPort, inboundPort, parser, llpClass, false);
147            }
148    
149            public Connection attach(String host, int outboundPort, int inboundPort,
150                            Parser parser, Class<? extends LowerLayerProtocol> llpClass,
151                            boolean tls) throws HL7Exception {
152                    try {
153                            LowerLayerProtocol llp = llpClass.newInstance();
154                            return attach(host, outboundPort, inboundPort, parser, llp, tls);
155                    } catch (InstantiationException e) {
156                            throw new HL7Exception("Cannot open connection to " + host + ":"
157                                            + outboundPort, e);
158                    } catch (IllegalAccessException e) {
159                            throw new HL7Exception("Cannot open connection to " + host + ":"
160                                            + outboundPort, e);
161                    }
162            }
163    
164            public Connection attach(String host, int outboundPort, int inboundPort, Parser parser, LowerLayerProtocol llp, boolean tls) throws HL7Exception {
165                    return attach(new ConnectionData(host, outboundPort, inboundPort,
166                                    parser, llp, tls));
167            }
168    
169            public Connection attach(ConnectionData data) throws HL7Exception {
170                    try {
171                            Connection conn = null;
172                            // Disallow establishing same connection targets concurrently
173                            connectionMutexes.putIfAbsent(data.toString(), data.toString());
174                            String mutex = connectionMutexes.get(data.toString());
175                            synchronized (mutex) {
176                                    discardConnectionIfStale(connections.get(data));
177                                    // Create connection or increase counter
178                                    conn = connections.put(data);
179                            }
180                            return conn;
181                    } catch (Exception e) {
182                            throw new HL7Exception("Cannot open connection to "
183                                            + data.getHost() + ":" + data.getPort() + "/"
184                                            + data.getPort2(), e);
185                    }
186            }
187    
188            private void discardConnectionIfStale(Connection conn) {
189                    if (conn != null && !conn.isOpen()) {
190                            log.info(
191                                            "Discarding connection which appears to be closed. Remote addr: {}",
192                                            conn.getRemoteAddress());
193                            discard(conn);
194                            conn = null;
195                    }
196            }
197    
198            /**
199             * Informs the ConnectionHub that you are done with the given Connection -
200             * if no other code is using it, it will be closed, so you should not
201             * attempt to use a Connection after detaching from it. If the connection is
202             * not enlisted, this method does nothing.
203             */
204            public void detach(Connection c) {
205                    ConnectionData cd = connections.find(c);
206                    if (cd != null)
207                            connections.remove(cd);
208            }
209    
210            /**
211             * Closes and discards the given Connection so that it can not be returned
212             * in subsequent calls to attach(). This method is to be used when there is
213             * a problem with a Connection, e.g. socket connection closed by remote
214             * host.
215             */
216            public void discard(Connection c) {
217                    ConnectionData cd = connections.find(c);
218                    if (cd != null)
219                            connections.removeAllOf(cd);
220            }
221    
222            public void discardAll() {
223                    for (ConnectionData cd : allConnections()) {
224                            connections.removeAllOf(cd);
225                    }
226            }
227    
228            public Set<? extends ConnectionData> allConnections() {
229                    return connections.keySet();
230            }
231    
232            public Connection getKnownConnection(ConnectionData key) {
233                    return connections.get(key);
234            }
235    
236            public boolean isOpen(ConnectionData key) {
237                    return getKnownConnection(key).isOpen();
238            }
239    
240    
241            /**
242             * Helper class that implements a map that increases/decreases a counter
243             * when an entry is added/removed. It is furthermore intended that an
244             * entry's value is derived from its key.
245             * 
246             * @param <K>
247             *            key class
248             * @param <D>
249             *            managed value class
250             */
251            private abstract class CountingMap<K, D> {
252                    private Map<K, Count> content;
253    
254                    protected abstract D open(K key) throws Exception;
255    
256                    protected abstract void dispose(D value);
257    
258                    public CountingMap() {
259                            super();
260                            content = new ConcurrentHashMap<K, Count>();
261                    }
262    
263                    /**
264                     * If the key exists, the counter is increased. Otherwise, a value is
265                     * created, and the key/value pair is added to the map.
266                     */
267                    public D put(K key) throws Exception {
268                            if (content.containsKey(key)) {
269                                    return content.put(key, content.get(key).increase()).getValue();
270                            } else {
271                                    Count c = new Count(open(key));
272                                    content.put(key, c);
273                                    return c.getValue();
274                            }
275                    }
276    
277                    public Set<K> keySet() {
278                            return Collections.unmodifiableSet(content.keySet());
279                    }
280    
281                    public D get(K key) {
282                            return content.containsKey(key) ? content.get(key).getValue()
283                                            : null;
284                    }
285    
286                    public K find(D value) {
287                            for (Map.Entry<K, Count> entry : content.entrySet()) {
288                                    if (entry.getValue().getValue().equals(value)) {
289                                            return entry.getKey();
290                                    }
291                            }
292                            return null;
293                    }
294    
295                    /**
296                     * If the counter of the key/value is greater than one, the counter is
297                     * decreased. Otherwise, the entry is removed and the value is cleaned
298                     * up.
299                     */
300                    public D remove(K key) {
301                            Count pair = content.get(key);
302                            if (pair == null)
303                                    return null;
304                            if (pair.isLast()) {
305                                    return removeAllOf(key);
306                            }
307                            return content.put(key, content.get(key).decrease()).getValue();
308                    }
309    
310                    /**
311                     * The key/value entry is removed and the value is cleaned up.
312                     */
313                    public D removeAllOf(K key) {
314                            D removed = content.remove(key).value;
315                            dispose(removed);
316                            return removed;
317                    }
318    
319                    private class Count {
320                            private D value;
321                            private int count;
322    
323                            public Count(D value) {
324                                    this(value, 1);
325                            }
326    
327                            private Count(D value, int number) {
328                                    this.value = value;
329                                    this.count = number;
330                            }
331    
332                            public D getValue() {
333                                    return value;
334                            }
335    
336                            Count increase() {
337                                    return new Count(value, count + 1);
338                            }
339    
340                            boolean isLast() {
341                                    return count == 1;
342                            }
343    
344                            Count decrease() {
345                                    return !isLast() ? new Count(value, count - 1) : null;
346                            }
347    
348                    }
349    
350            }
351    
352    }