001    /**
002    The contents of this file are subject to the Mozilla Public License Version 1.1 
003    (the "License"); you may not use this file except in compliance with the License. 
004    You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
005    Software distributed under the License is distributed on an "AS IS" basis, 
006    WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
007    specific language governing rights and limitations under the License. 
008    
009    The Original Code is "Connection.java".  Description: 
010    "A TCP/IP connection to a remote HL7 server." 
011    
012    The Initial Developer of the Original Code is University Health Network. Copyright (C) 
013    2002.  All Rights Reserved. 
014    
015    Contributor(s): ______________________________________. 
016    
017    Alternatively, the contents of this file may be used under the terms of the 
018    GNU General Public License (the  �GPL�), in which case the provisions of the GPL are 
019    applicable instead of those above.  If you wish to allow use of your version of this 
020    file only under the terms of the GPL and not to allow others to use your version 
021    of this file under the MPL, indicate your decision by deleting  the provisions above 
022    and replace  them with the notice and other provisions required by the GPL License.  
023    If you do not delete the provisions above, a recipient may use your version of 
024    this file under either the MPL or the GPL. 
025     */
026    
027    package ca.uhn.hl7v2.app;
028    
029    import java.io.IOException;
030    import java.net.InetAddress;
031    import java.net.Socket;
032    import java.util.ArrayList;
033    import java.util.Iterator;
034    import java.util.List;
035    import java.util.concurrent.ExecutorService;
036    import java.util.concurrent.Future;
037    import java.util.concurrent.TimeUnit;
038    
039    import javax.net.ssl.SSLSocket;
040    
041    import org.slf4j.Logger;
042    import org.slf4j.LoggerFactory;
043    
044    import ca.uhn.hl7v2.concurrent.BlockingMap;
045    import ca.uhn.hl7v2.concurrent.BlockingHashMap;
046    import ca.uhn.hl7v2.concurrent.DefaultExecutorService;
047    import ca.uhn.hl7v2.llp.HL7Writer;
048    import ca.uhn.hl7v2.llp.LLPException;
049    import ca.uhn.hl7v2.llp.LowerLayerProtocol;
050    import ca.uhn.hl7v2.parser.Parser;
051    
052    /**
053     * A TCP/IP connection to a remote HL7 server.
054     * 
055     * @author Bryan Tripp
056     */
057    public class Connection {
058    
059            private static final Logger log = LoggerFactory.getLogger(Connection.class);
060    
061            private Initiator initiator;
062            private Responder responder;
063            private List<Socket> sockets;
064            private HL7Writer ackWriter;
065            private HL7Writer sendWriter;
066            private Parser parser;
067            private BlockingMap<String, String> responses;
068            private List<Receiver> receivers;
069            private boolean open = true;
070            private ExecutorService executorService;
071            boolean outbound;
072    
073            /**
074             * Creates a new instance of Connection, with inbound and outbound
075             * communication on a single port.
076             */
077            public Connection(Parser parser, LowerLayerProtocol llp,
078                            Socket bidirectional) throws LLPException, IOException {
079                    this(parser, llp, bidirectional, DefaultExecutorService
080                                    .getDefaultService());
081            }
082    
083            public Connection(Parser parser, LowerLayerProtocol llp,
084                            Socket bidirectional, ExecutorService executorService)
085                            throws LLPException, IOException {
086                    init(parser, executorService);
087                    ackWriter = llp.getWriter(bidirectional.getOutputStream());
088                    sendWriter = ackWriter;
089                    this.executorService = executorService;
090                    sockets.add(bidirectional);
091                    receivers.add(new Receiver(this, llp.getReader(bidirectional
092                                    .getInputStream())));
093                    this.initiator = new Initiator(this);
094            }
095    
096            /**
097             * Creates a new instance of Connection, with inbound communication on one
098             * port and outbound on another.
099             */
100            public Connection(Parser parser, LowerLayerProtocol llp, Socket inbound,
101                            Socket outbound) throws LLPException, IOException {
102                    this(parser, llp, inbound, outbound, DefaultExecutorService
103                                    .getDefaultService());
104            }
105    
106            /**
107             * Creates a new instance of Connection, with inbound communication on one
108             * port and outbound on another.
109             */
110            public Connection(Parser parser, LowerLayerProtocol llp, Socket inbound,
111                            Socket outbound, ExecutorService executorService)
112                            throws LLPException, IOException {
113                    init(parser, executorService);
114                    ackWriter = llp.getWriter(inbound.getOutputStream());
115                    sendWriter = llp.getWriter(outbound.getOutputStream());
116                    sockets.add(outbound); // always add outbound first ... see getRemoteAddress()
117                    sockets.add(inbound);
118    
119                    receivers.add(new Receiver(this,
120                                    llp.getReader(inbound.getInputStream())));
121                    receivers.add(new Receiver(this, llp.getReader(outbound
122                                    .getInputStream())));
123                    this.initiator = new Initiator(this);
124            }
125    
126            /** Common initialization tasks */
127            private void init(Parser parser, ExecutorService executorService)
128                            throws LLPException {
129                    this.parser = parser;
130                    this.executorService = executorService;
131                    sockets = new ArrayList<Socket>();
132                    responses = new BlockingHashMap<String, String>(executorService);
133                    receivers = new ArrayList<Receiver>(2);
134                    responder = new Responder(parser);
135            }
136    
137            /**
138             * Start the receiver thread(s)
139             */
140            public void activate() {
141                    if (receivers != null) {
142                            for (Receiver receiver : receivers) {
143                                    receiver.start();
144                            }
145                    }
146            }
147    
148            public ExecutorService getExecutorService() {
149                    return executorService;
150            }
151    
152            /**
153             * Returns the address of the remote host to which this Connection is
154             * connected. If separate inbound and outbound sockets are used, the address
155             * of the outbound socket is returned (the addresses should normally be the
156             * same, but this isn't checked).
157             */
158            public InetAddress getRemoteAddress() {
159                    Socket s = sockets.get(0);
160                    return s.getInetAddress();
161            }
162    
163            /**
164             * Returns the remote port on the remote host to which this Connection is
165             * connected. If separate inbound and outbound sockets are used, the port of
166             * the outbound socket is returned.
167             */
168            public int getRemotePort() {
169                    Socket s = sockets.get(0);
170                    return s.getPort();
171            }
172    
173            /** Returns the Initiator associated with this connection */
174            public Initiator getInitiator() {
175                    return this.initiator;
176            }
177    
178            /** Returns the Responder associated with this connection */
179            public Responder getResponder() {
180                    return this.responder;
181            }
182    
183            public boolean isSecure() {
184                    if (isOpen() && sockets.size() > 0) {
185                            return (sockets.get(0) instanceof SSLSocket);
186                    } else {
187                            throw new IllegalStateException(
188                                            "Can't determine status on closed socket");
189                    }
190            }
191    
192            /**
193             * Returns the HL7Writer through which unsolicited outbound messages should
194             * be sent.
195             */
196            protected HL7Writer getSendWriter() {
197                    return this.sendWriter;
198            }
199    
200            /**
201             * Returns the HL7Writer through which responses to inbound messages should
202             * be sent.
203             */
204            protected HL7Writer getAckWriter() {
205                    return this.ackWriter;
206            }
207    
208            public Parser getParser() {
209                    return this.parser;
210            }
211    
212            public String toString() {
213                    StringBuffer buf = new StringBuffer();
214                    buf.append(getRemoteAddress().getHostName());
215                    buf.append(":");
216                    for (Iterator<Socket> iter = sockets.iterator(); iter.hasNext();) {
217                            Socket socket = iter.next();
218                            buf.append(socket.getPort());
219                            if (iter.hasNext())
220                                    buf.append(",");
221                    }
222                    return buf.toString();
223            }
224    
225            /**
226             * Reserves a future incoming message by ack ID. When the incoming message
227             * with the given ack ID arrives, the message will be returned.
228             */
229            protected Future<String> waitForResponse(final String messageID,
230                            long timeout) throws InterruptedException {
231                    return responses.asyncPoll(messageID, timeout, TimeUnit.MILLISECONDS);
232            }
233    
234            /**
235             * Given the ack ID (MSA-2) of a message, notifies a waiting consumer thread
236             * about a received response.
237             */
238            protected boolean isRecipientWaiting(String ackID, String message) {
239                    return responses.give(ackID, message);
240            }
241    
242            /** Stops running Receiver threads and closes open sockets */
243            public void close() {
244                    // Mark all running receiver threads to be stopped
245                    for (Receiver receiver : receivers) {
246                            if (receiver.isRunning())
247                                    receiver.stop();
248                    }
249                    // Forces open sockets to be closed. This causes the Receiver threads to
250                    // eventually terminate
251                    for (Socket socket : sockets) {
252                            try {
253                                    if (!socket.isClosed())
254                                            socket.close();
255                            } catch (Exception e) {
256                                    log.error("Error while stopping threads and closing sockets", e);
257                            }
258                    }
259    
260                    open = false;
261            }
262    
263            public boolean isOpen() {
264                    return open;
265            }
266    
267    }