001    /**
002    The contents of this file are subject to the Mozilla Public License Version 1.1 
003    (the "License"); you may not use this file except in compliance with the License. 
004    You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
005    Software distributed under the License is distributed on an "AS IS" basis, 
006    WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
007    specific language governing rights and limitations under the License. 
008    
009    The Original Code is "TwoPortService.java".  Description: 
010    "A TCP/IP-based HL7 Service that uses separate ports for inbound and outbound messages." 
011    
012    The Initial Developer of the Original Code is University Health Network. Copyright (C) 
013    2001.  All Rights Reserved. 
014    
015    Contributor(s): ______________________________________. 
016    
017    Alternatively, the contents of this file may be used under the terms of the 
018    GNU General Public License (the  �GPL�), in which case the provisions of the GPL are 
019    applicable instead of those above.  If you wish to allow use of your version of this 
020    file only under the terms of the GPL and not to allow others to use your version 
021    of this file under the MPL, indicate your decision by deleting  the provisions above 
022    and replace  them with the notice and other provisions required by the GPL License.  
023    If you do not delete the provisions above, a recipient may use your version of 
024    this file under either the MPL or the GPL. 
025     */
026    
027    package ca.uhn.hl7v2.app;
028    
029    import java.io.File;
030    import java.io.IOException;
031    import java.net.Socket;
032    import java.net.SocketException;
033    import java.util.HashMap;
034    import java.util.Map;
035    import java.util.concurrent.BlockingQueue;
036    import java.util.concurrent.ExecutorService;
037    import java.util.concurrent.LinkedBlockingQueue;
038    import java.util.concurrent.TimeUnit;
039    
040    import org.slf4j.Logger;
041    import org.slf4j.LoggerFactory;
042    
043    import ca.uhn.hl7v2.app.AcceptorThread.AcceptedSocket;
044    import ca.uhn.hl7v2.concurrent.DefaultExecutorService;
045    import ca.uhn.hl7v2.llp.LLPException;
046    import ca.uhn.hl7v2.llp.LowerLayerProtocol;
047    import ca.uhn.hl7v2.parser.Parser;
048    import ca.uhn.hl7v2.parser.PipeParser;
049    
050    /**
051     * A TCP/IP-based HL7 Service that uses separate ports for inbound and outbound
052     * messages. A connection is only activated when the same remote host connects
053     * to both the inbound and outbound ports.
054     * 
055     * @author Bryan Tripp
056     */
057    public class TwoPortService extends HL7Service {
058    
059            private static final Logger log = LoggerFactory
060                            .getLogger(TwoPortService.class);
061    
062            private Map<String, AcceptedSocket> waitingForSecondSocket = new HashMap<String, AcceptedSocket>();
063            private int inboundPort;
064            private int outboundPort;
065            private boolean tls;
066            private BlockingQueue<AcceptedSocket> queue;
067            private AcceptorThread inboundAcceptor, outboundAcceptor;
068    
069            public TwoPortService(int inboundPort, int outboundPort) {
070                    this(new PipeParser(), LowerLayerProtocol.makeLLP(), inboundPort,
071                                    outboundPort, false);
072            }
073    
074            public TwoPortService(int inboundPort, int outboundPort, boolean tls) {
075                    this(new PipeParser(), LowerLayerProtocol.makeLLP(), inboundPort,
076                                    outboundPort, tls);
077            }
078    
079            /** Creates a new instance of TwoPortService */
080            public TwoPortService(Parser parser, LowerLayerProtocol llp,
081                            int inboundPort, int outboundPort, boolean tls) {
082                    this(parser, llp, inboundPort, outboundPort, tls,
083                                    DefaultExecutorService.getDefaultService());
084            }
085    
086            /** Creates a new instance of TwoPortService */
087            public TwoPortService(Parser parser, LowerLayerProtocol llp,
088                            int inboundPort, int outboundPort, boolean tls,
089                            ExecutorService executorService) {
090                    super(parser, llp, executorService);
091                    this.queue = new LinkedBlockingQueue<AcceptedSocket>();
092                    this.inboundPort = inboundPort;
093                    this.outboundPort = outboundPort;
094                    this.tls = tls;
095            }
096    
097            /**
098             * Launches two threads that concurrently listen on the inboundPort and
099             * outboundPort.
100             * 
101             * @see ca.uhn.hl7v2.app.HL7Service#afterStartup()
102             */
103            @Override
104            protected void afterStartup() {
105                    try {
106                            super.afterStartup();
107                            inboundAcceptor = createAcceptThread(inboundPort);
108                            outboundAcceptor = createAcceptThread(outboundPort);
109                            inboundAcceptor.start();
110                            outboundAcceptor.start();
111                            log.info("TwoPortService running on ports {} and {}", inboundPort,
112                                            outboundPort);
113                    } catch (IOException e) {
114                            log.error("Could not run TwoPortService on ports {} and {}",
115                                            inboundPort, outboundPort);
116                            throw new RuntimeException(e);
117                    }
118            }
119    
120            /**
121             * Terminate the two acceptor threads
122             * 
123             * @see ca.uhn.hl7v2.app.HL7Service#afterTermination()
124             */
125            @Override
126            protected void afterTermination() {
127                    super.afterTermination();
128                    inboundAcceptor.stop();
129                    outboundAcceptor.stop();
130            }
131    
132            /**
133             * Polls for accepted sockets
134             */
135            protected void handle() {
136                    if (inboundAcceptor.getServiceExitedWithException() != null) {
137                            setServiceExitedWithException(inboundAcceptor.getServiceExitedWithException());
138                    }
139                    if (outboundAcceptor.getServiceExitedWithException() != null) {
140                            setServiceExitedWithException(outboundAcceptor.getServiceExitedWithException());
141                    }
142                    
143                    try {
144                            Connection conn = acceptConnection(queue.poll(2, TimeUnit.SECONDS));
145                            if (conn != null) {
146                                    log.info("Accepted connection from "
147                                                    + conn.getRemoteAddress().getHostAddress());
148                                    newConnection(conn);
149                            }
150                    } catch (Exception e) {
151                            log.error("Error while accepting connections: ", e);
152                    }
153            }
154    
155            /**
156             * Helper method that checks whether the newSocket completes a two-port
157             * connection or not. If yes, the {@link Connection} object is created and
158             * returned.
159             */
160            private Connection acceptConnection(AcceptedSocket newSocket)
161                            throws LLPException, IOException {
162                    Connection conn = null;
163                    if (newSocket != null) {
164                            String address = newSocket.socket.getInetAddress().getHostAddress();
165                            AcceptedSocket otherSocket = waitingForSecondSocket.remove(address);
166                            if (otherSocket != null && otherSocket.origin != newSocket.origin) {
167                                    log.debug("Socket {} completes a two-port connection",
168                                                    newSocket.socket);
169                                    Socket in = getInboundSocket(newSocket, otherSocket);
170                                    Socket out = getOutboundSocket(newSocket, otherSocket);
171                                    conn = new Connection(parser, llp, in, out,
172                                                    getExecutorService());
173                            } else {
174                                    log.debug(
175                                                    "Registered {} Still waiting for second socket for two-port connection",
176                                                    newSocket.socket);
177                                    waitingForSecondSocket.put(address, newSocket);
178                            }
179                    }
180                    return conn;
181            }
182    
183            private Socket getInboundSocket(AcceptedSocket socket1,
184                            AcceptedSocket socket2) {
185                    return socket1.origin == inboundAcceptor ? socket1.socket
186                                    : socket2.socket;
187            }
188    
189            private Socket getOutboundSocket(AcceptedSocket socket1,
190                            AcceptedSocket socket2) {
191                    return socket1.origin == outboundAcceptor ? socket1.socket
192                                    : socket2.socket;
193            }
194    
195            protected AcceptorThread createAcceptThread(int port)
196                            throws SocketException, IOException {
197                    return new AcceptorThread(port, tls, getExecutorService(), queue);
198            }
199    
200            /**
201             * Run server from command line. Inbound and outbound port numbers should be
202             * provided as arguments, and a file containing a list of Applications to
203             * use can also be specified as an optional argument (as per
204             * <code>super.loadApplicationsFromFile(...)</code>). Uses the default
205             * LowerLayerProtocol.
206             */
207            public static void main(String args[]) {
208                    if (args.length < 2 || args.length > 3) {
209                            System.out
210                                            .println("Usage: ca.uhn.hl7v2.app.TwoPortService inbound_port outbound_port [application_spec_file_name]");
211                            System.exit(1);
212                    }
213    
214                    int inPort = 0;
215                    int outPort = 0;
216                    try {
217                            inPort = Integer.parseInt(args[0]);
218                            outPort = Integer.parseInt(args[1]);
219                    } catch (NumberFormatException e) {
220                            System.err.println("One of the given ports (" + args[0] + " or "
221                                            + args[1] + ") is not an integer.");
222                            System.exit(1);
223                    }
224    
225                    File appFile = null;
226                    if (args.length == 3) {
227                            appFile = new File(args[2]);
228                    }
229    
230                    try {
231                            TwoPortService server = new TwoPortService(inPort, outPort);
232                            if (appFile != null)
233                                    server.loadApplicationsFromFile(appFile);
234                            server.start();
235                    } catch (Exception e) {
236                            e.printStackTrace();
237                    }
238    
239            }
240    
241    }