001 /**
002 The contents of this file are subject to the Mozilla Public License Version 1.1
003 (the "License"); you may not use this file except in compliance with the License.
004 You may obtain a copy of the License at http://www.mozilla.org/MPL/
005 Software distributed under the License is distributed on an "AS IS" basis,
006 WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the
007 specific language governing rights and limitations under the License.
008
009 The Original Code is "ConnectionHub.java". Description:
010 "Provides access to shared HL7 Connections"
011
012 The Initial Developer of the Original Code is University Health Network. Copyright (C)
013 2001. All Rights Reserved.
014
015 Contributor(s): ______________________________________.
016
017 Alternatively, the contents of this file may be used under the terms of the
018 GNU General Public License (the �GPL�), in which case the provisions of the GPL are
019 applicable instead of those above. If you wish to allow use of your version of this
020 file only under the terms of the GPL and not to allow others to use your version
021 of this file under the MPL, indicate your decision by deleting the provisions above
022 and replace them with the notice and other provisions required by the GPL License.
023 If you do not delete the provisions above, a recipient may use your version of
024 this file under either the MPL or the GPL.
025 */
026
027 package ca.uhn.hl7v2.app;
028
029 import java.util.Collections;
030 import java.util.Map;
031 import java.util.Set;
032 import java.util.concurrent.ConcurrentHashMap;
033 import java.util.concurrent.ConcurrentMap;
034 import java.util.concurrent.ExecutorService;
035
036 import org.slf4j.Logger;
037 import org.slf4j.LoggerFactory;
038
039 import ca.uhn.hl7v2.HL7Exception;
040 import ca.uhn.hl7v2.concurrent.DefaultExecutorService;
041 import ca.uhn.hl7v2.llp.LowerLayerProtocol;
042 import ca.uhn.hl7v2.parser.Parser;
043
044 /**
045 * <p>
046 * Provides access to shared HL7 Connections. The ConnectionHub has at most one
047 * connection to any given address at any time.
048 * </p>
049 * <p>
050 * <b>Synchronization Note:</b> This class should be safe to use in a
051 * multithreaded environment. A synchronization mutex is maintained for any
052 * given target host and port, so that if two threads are trying to connect to
053 * two separate destinations neither will block, but if two threads are trying
054 * to connect to the same destination, one will block until the other has
055 * finished trying. Use caution if this class is to be used in an environment
056 * where a very large (over 1000) number of target host/port destinations will
057 * be accessed at the same time.
058 * </p>
059 *
060 * @author Bryan Tripp
061 */
062 public class ConnectionHub {
063
064 /**
065 * Set a system property with this key to a string containing an integer
066 * larger than the default ("1000") if you need to connect to a very large
067 * number of targets at the same time in a multithreaded environment.
068 */
069 public static final String MAX_CONCURRENT_TARGETS = ConnectionHub.class
070 .getName() + ".maxSize";
071 private static final Logger log = LoggerFactory
072 .getLogger(ConnectionHub.class);
073 private static ConnectionHub instance = null;
074 private final CountingMap<ConnectionData, Connection> connections;
075 private final ConcurrentMap<String, String> connectionMutexes = new ConcurrentHashMap<String, String>();
076 private final ExecutorService executorService;
077
078 /** Creates a new instance of ConnectionHub */
079 private ConnectionHub(ExecutorService executorService) {
080 this.executorService = executorService;
081 connections = new CountingMap<ConnectionData, Connection>() {
082
083 @Override
084 protected Connection open(ConnectionData connectionData)
085 throws Exception {
086 return ConnectionFactory.open(connectionData,
087 ConnectionHub.this.executorService);
088 }
089
090 @Override
091 protected void dispose(Connection connection) {
092 connection.close();
093 }
094
095 };
096 }
097
098 /** Returns the singleton instance of ConnectionHub */
099 public static ConnectionHub getInstance() {
100 return getInstance(DefaultExecutorService.getDefaultService());
101 }
102
103 public static void shutdown() {
104 ConnectionHub hub = getInstance();
105 if (DefaultExecutorService.isDefaultService(hub.executorService)) {
106 hub.executorService.shutdown();
107 instance = null;
108 }
109 }
110
111 /** Returns the singleton instance of ConnectionHub. If called */
112 public synchronized static ConnectionHub getInstance(ExecutorService service) {
113 if (instance == null || service.isShutdown()) {
114 instance = new ConnectionHub(service);
115 }
116 return instance;
117 }
118
119 /**
120 * Returns a Connection to the given address, opening this Connection if
121 * necessary. The given Parser will only be used if a new Connection is
122 * opened, so there is no guarantee that the Connection returned will be
123 * using the Parser you provide. If you need explicit access to the Parser
124 * the Connection is using, call <code>Connection.getParser()</code>.
125 */
126 public Connection attach(String host, int port, Parser parser,
127 Class<? extends LowerLayerProtocol> llpClass) throws HL7Exception {
128 return attach(host, port, parser, llpClass, false);
129 }
130
131 public Connection attach(String host, int port, Parser parser,
132 Class<? extends LowerLayerProtocol> llpClass, boolean tls)
133 throws HL7Exception {
134 return attach(host, port, 0, parser, llpClass, tls);
135 }
136
137 public Connection attach(String host, int port, Parser parser,
138 LowerLayerProtocol llp, boolean tls)
139 throws HL7Exception {
140 return attach(host, port, 0, parser, llp, tls);
141 }
142
143 public Connection attach(String host, int outboundPort, int inboundPort,
144 Parser parser, Class<? extends LowerLayerProtocol> llpClass)
145 throws HL7Exception {
146 return attach(host, outboundPort, inboundPort, parser, llpClass, false);
147 }
148
149 public Connection attach(String host, int outboundPort, int inboundPort,
150 Parser parser, Class<? extends LowerLayerProtocol> llpClass,
151 boolean tls) throws HL7Exception {
152 try {
153 LowerLayerProtocol llp = llpClass.newInstance();
154 return attach(host, outboundPort, inboundPort, parser, llp, tls);
155 } catch (InstantiationException e) {
156 throw new HL7Exception("Cannot open connection to " + host + ":"
157 + outboundPort, e);
158 } catch (IllegalAccessException e) {
159 throw new HL7Exception("Cannot open connection to " + host + ":"
160 + outboundPort, e);
161 }
162 }
163
164 public Connection attach(String host, int outboundPort, int inboundPort, Parser parser, LowerLayerProtocol llp, boolean tls) throws HL7Exception {
165 return attach(new ConnectionData(host, outboundPort, inboundPort,
166 parser, llp, tls));
167 }
168
169 public Connection attach(ConnectionData data) throws HL7Exception {
170 try {
171 Connection conn = null;
172 // Disallow establishing same connection targets concurrently
173 connectionMutexes.putIfAbsent(data.toString(), data.toString());
174 String mutex = connectionMutexes.get(data.toString());
175 synchronized (mutex) {
176 discardConnectionIfStale(connections.get(data));
177 // Create connection or increase counter
178 conn = connections.put(data);
179 }
180 return conn;
181 } catch (Exception e) {
182 throw new HL7Exception("Cannot open connection to "
183 + data.getHost() + ":" + data.getPort() + "/"
184 + data.getPort2(), e);
185 }
186 }
187
188 private void discardConnectionIfStale(Connection conn) {
189 if (conn != null && !conn.isOpen()) {
190 log.info(
191 "Discarding connection which appears to be closed. Remote addr: {}",
192 conn.getRemoteAddress());
193 discard(conn);
194 conn = null;
195 }
196 }
197
198 /**
199 * Informs the ConnectionHub that you are done with the given Connection -
200 * if no other code is using it, it will be closed, so you should not
201 * attempt to use a Connection after detaching from it. If the connection is
202 * not enlisted, this method does nothing.
203 */
204 public void detach(Connection c) {
205 ConnectionData cd = connections.find(c);
206 if (cd != null)
207 connections.remove(cd);
208 }
209
210 /**
211 * Closes and discards the given Connection so that it can not be returned
212 * in subsequent calls to attach(). This method is to be used when there is
213 * a problem with a Connection, e.g. socket connection closed by remote
214 * host.
215 */
216 public void discard(Connection c) {
217 ConnectionData cd = connections.find(c);
218 if (cd != null)
219 connections.removeAllOf(cd);
220 }
221
222 public void discardAll() {
223 for (ConnectionData cd : allConnections()) {
224 connections.removeAllOf(cd);
225 }
226 }
227
228 public Set<? extends ConnectionData> allConnections() {
229 return connections.keySet();
230 }
231
232 public Connection getKnownConnection(ConnectionData key) {
233 return connections.get(key);
234 }
235
236 public boolean isOpen(ConnectionData key) {
237 return getKnownConnection(key).isOpen();
238 }
239
240
241 /**
242 * Helper class that implements a map that increases/decreases a counter
243 * when an entry is added/removed. It is furthermore intended that an
244 * entry's value is derived from its key.
245 *
246 * @param <K>
247 * key class
248 * @param <D>
249 * managed value class
250 */
251 private abstract class CountingMap<K, D> {
252 private Map<K, Count> content;
253
254 protected abstract D open(K key) throws Exception;
255
256 protected abstract void dispose(D value);
257
258 public CountingMap() {
259 super();
260 content = new ConcurrentHashMap<K, Count>();
261 }
262
263 /**
264 * If the key exists, the counter is increased. Otherwise, a value is
265 * created, and the key/value pair is added to the map.
266 */
267 public D put(K key) throws Exception {
268 if (content.containsKey(key)) {
269 return content.put(key, content.get(key).increase()).getValue();
270 } else {
271 Count c = new Count(open(key));
272 content.put(key, c);
273 return c.getValue();
274 }
275 }
276
277 public Set<K> keySet() {
278 return Collections.unmodifiableSet(content.keySet());
279 }
280
281 public D get(K key) {
282 return content.containsKey(key) ? content.get(key).getValue()
283 : null;
284 }
285
286 public K find(D value) {
287 for (Map.Entry<K, Count> entry : content.entrySet()) {
288 if (entry.getValue().getValue().equals(value)) {
289 return entry.getKey();
290 }
291 }
292 return null;
293 }
294
295 /**
296 * If the counter of the key/value is greater than one, the counter is
297 * decreased. Otherwise, the entry is removed and the value is cleaned
298 * up.
299 */
300 public D remove(K key) {
301 Count pair = content.get(key);
302 if (pair == null)
303 return null;
304 if (pair.isLast()) {
305 return removeAllOf(key);
306 }
307 return content.put(key, content.get(key).decrease()).getValue();
308 }
309
310 /**
311 * The key/value entry is removed and the value is cleaned up.
312 */
313 public D removeAllOf(K key) {
314 D removed = content.remove(key).value;
315 dispose(removed);
316 return removed;
317 }
318
319 private class Count {
320 private D value;
321 private int count;
322
323 public Count(D value) {
324 this(value, 1);
325 }
326
327 private Count(D value, int number) {
328 this.value = value;
329 this.count = number;
330 }
331
332 public D getValue() {
333 return value;
334 }
335
336 Count increase() {
337 return new Count(value, count + 1);
338 }
339
340 boolean isLast() {
341 return count == 1;
342 }
343
344 Count decrease() {
345 return !isLast() ? new Count(value, count - 1) : null;
346 }
347
348 }
349
350 }
351
352 }