001 /**
002 The contents of this file are subject to the Mozilla Public License Version 1.1
003 (the "License"); you may not use this file except in compliance with the License.
004 You may obtain a copy of the License at http://www.mozilla.org/MPL/
005 Software distributed under the License is distributed on an "AS IS" basis,
006 WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the
007 specific language governing rights and limitations under the License.
008
009 The Original Code is "Connection.java". Description:
010 "A TCP/IP connection to a remote HL7 server."
011
012 The Initial Developer of the Original Code is University Health Network. Copyright (C)
013 2002. All Rights Reserved.
014
015 Contributor(s): ______________________________________.
016
017 Alternatively, the contents of this file may be used under the terms of the
018 GNU General Public License (the �GPL�), in which case the provisions of the GPL are
019 applicable instead of those above. If you wish to allow use of your version of this
020 file only under the terms of the GPL and not to allow others to use your version
021 of this file under the MPL, indicate your decision by deleting the provisions above
022 and replace them with the notice and other provisions required by the GPL License.
023 If you do not delete the provisions above, a recipient may use your version of
024 this file under either the MPL or the GPL.
025 */
026
027 package ca.uhn.hl7v2.app;
028
029 import java.io.IOException;
030 import java.net.InetAddress;
031 import java.net.Socket;
032 import java.util.ArrayList;
033 import java.util.Iterator;
034 import java.util.List;
035 import java.util.concurrent.ExecutorService;
036 import java.util.concurrent.Future;
037 import java.util.concurrent.TimeUnit;
038
039 import javax.net.ssl.SSLSocket;
040
041 import org.slf4j.Logger;
042 import org.slf4j.LoggerFactory;
043
044 import ca.uhn.hl7v2.concurrent.BlockingMap;
045 import ca.uhn.hl7v2.concurrent.BlockingHashMap;
046 import ca.uhn.hl7v2.concurrent.DefaultExecutorService;
047 import ca.uhn.hl7v2.llp.HL7Writer;
048 import ca.uhn.hl7v2.llp.LLPException;
049 import ca.uhn.hl7v2.llp.LowerLayerProtocol;
050 import ca.uhn.hl7v2.parser.Parser;
051
052 /**
053 * A TCP/IP connection to a remote HL7 server.
054 *
055 * @author Bryan Tripp
056 */
057 public class Connection {
058
059 private static final Logger log = LoggerFactory.getLogger(Connection.class);
060
061 private Initiator initiator;
062 private Responder responder;
063 private List<Socket> sockets;
064 private HL7Writer ackWriter;
065 private HL7Writer sendWriter;
066 private Parser parser;
067 private BlockingMap<String, String> responses;
068 private List<Receiver> receivers;
069 private boolean open = true;
070 private ExecutorService executorService;
071 boolean outbound;
072
073 /**
074 * Creates a new instance of Connection, with inbound and outbound
075 * communication on a single port.
076 */
077 public Connection(Parser parser, LowerLayerProtocol llp,
078 Socket bidirectional) throws LLPException, IOException {
079 this(parser, llp, bidirectional, DefaultExecutorService
080 .getDefaultService());
081 }
082
083 public Connection(Parser parser, LowerLayerProtocol llp,
084 Socket bidirectional, ExecutorService executorService)
085 throws LLPException, IOException {
086 init(parser, executorService);
087 ackWriter = llp.getWriter(bidirectional.getOutputStream());
088 sendWriter = ackWriter;
089 this.executorService = executorService;
090 sockets.add(bidirectional);
091 receivers.add(new Receiver(this, llp.getReader(bidirectional
092 .getInputStream())));
093 this.initiator = new Initiator(this);
094 }
095
096 /**
097 * Creates a new instance of Connection, with inbound communication on one
098 * port and outbound on another.
099 */
100 public Connection(Parser parser, LowerLayerProtocol llp, Socket inbound,
101 Socket outbound) throws LLPException, IOException {
102 this(parser, llp, inbound, outbound, DefaultExecutorService
103 .getDefaultService());
104 }
105
106 /**
107 * Creates a new instance of Connection, with inbound communication on one
108 * port and outbound on another.
109 */
110 public Connection(Parser parser, LowerLayerProtocol llp, Socket inbound,
111 Socket outbound, ExecutorService executorService)
112 throws LLPException, IOException {
113 init(parser, executorService);
114 ackWriter = llp.getWriter(inbound.getOutputStream());
115 sendWriter = llp.getWriter(outbound.getOutputStream());
116 sockets.add(outbound); // always add outbound first ... see getRemoteAddress()
117 sockets.add(inbound);
118
119 receivers.add(new Receiver(this,
120 llp.getReader(inbound.getInputStream())));
121 receivers.add(new Receiver(this, llp.getReader(outbound
122 .getInputStream())));
123 this.initiator = new Initiator(this);
124 }
125
126 /** Common initialization tasks */
127 private void init(Parser parser, ExecutorService executorService)
128 throws LLPException {
129 this.parser = parser;
130 this.executorService = executorService;
131 sockets = new ArrayList<Socket>();
132 responses = new BlockingHashMap<String, String>(executorService);
133 receivers = new ArrayList<Receiver>(2);
134 responder = new Responder(parser);
135 }
136
137 /**
138 * Start the receiver thread(s)
139 */
140 public void activate() {
141 if (receivers != null) {
142 for (Receiver receiver : receivers) {
143 receiver.start();
144 }
145 }
146 }
147
148 public ExecutorService getExecutorService() {
149 return executorService;
150 }
151
152 /**
153 * Returns the address of the remote host to which this Connection is
154 * connected. If separate inbound and outbound sockets are used, the address
155 * of the outbound socket is returned (the addresses should normally be the
156 * same, but this isn't checked).
157 */
158 public InetAddress getRemoteAddress() {
159 Socket s = sockets.get(0);
160 return s.getInetAddress();
161 }
162
163 /**
164 * Returns the remote port on the remote host to which this Connection is
165 * connected. If separate inbound and outbound sockets are used, the port of
166 * the outbound socket is returned.
167 */
168 public int getRemotePort() {
169 Socket s = sockets.get(0);
170 return s.getPort();
171 }
172
173 /** Returns the Initiator associated with this connection */
174 public Initiator getInitiator() {
175 return this.initiator;
176 }
177
178 /** Returns the Responder associated with this connection */
179 public Responder getResponder() {
180 return this.responder;
181 }
182
183 public boolean isSecure() {
184 if (isOpen() && sockets.size() > 0) {
185 return (sockets.get(0) instanceof SSLSocket);
186 } else {
187 throw new IllegalStateException(
188 "Can't determine status on closed socket");
189 }
190 }
191
192 /**
193 * Returns the HL7Writer through which unsolicited outbound messages should
194 * be sent.
195 */
196 protected HL7Writer getSendWriter() {
197 return this.sendWriter;
198 }
199
200 /**
201 * Returns the HL7Writer through which responses to inbound messages should
202 * be sent.
203 */
204 protected HL7Writer getAckWriter() {
205 return this.ackWriter;
206 }
207
208 public Parser getParser() {
209 return this.parser;
210 }
211
212 public String toString() {
213 StringBuffer buf = new StringBuffer();
214 buf.append(getRemoteAddress().getHostName());
215 buf.append(":");
216 for (Iterator<Socket> iter = sockets.iterator(); iter.hasNext();) {
217 Socket socket = iter.next();
218 buf.append(socket.getPort());
219 if (iter.hasNext())
220 buf.append(",");
221 }
222 return buf.toString();
223 }
224
225 /**
226 * Reserves a future incoming message by ack ID. When the incoming message
227 * with the given ack ID arrives, the message will be returned.
228 */
229 protected Future<String> waitForResponse(final String messageID,
230 long timeout) throws InterruptedException {
231 return responses.asyncPoll(messageID, timeout, TimeUnit.MILLISECONDS);
232 }
233
234 /**
235 * Given the ack ID (MSA-2) of a message, notifies a waiting consumer thread
236 * about a received response.
237 */
238 protected boolean isRecipientWaiting(String ackID, String message) {
239 return responses.give(ackID, message);
240 }
241
242 /** Stops running Receiver threads and closes open sockets */
243 public void close() {
244 // Mark all running receiver threads to be stopped
245 for (Receiver receiver : receivers) {
246 if (receiver.isRunning())
247 receiver.stop();
248 }
249 // Forces open sockets to be closed. This causes the Receiver threads to
250 // eventually terminate
251 for (Socket socket : sockets) {
252 try {
253 if (!socket.isClosed())
254 socket.close();
255 } catch (Exception e) {
256 log.error("Error while stopping threads and closing sockets", e);
257 }
258 }
259
260 open = false;
261 }
262
263 public boolean isOpen() {
264 return open;
265 }
266
267 }