001 /*
002 The contents of this file are subject to the Mozilla Public License Version 1.1
003 (the "License"); you may not use this file except in compliance with the License.
004 You may obtain a copy of the License at http://www.mozilla.org/MPL/
005 Software distributed under the License is distributed on an "AS IS" basis,
006 WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the
007 specific language governing rights and limitations under the License.
008
009 The Original Code is "HL7Server.java". Description:
010 "A TCP/IP based server."
011
012 The Initial Developer of the Original Code is University Health Network. Copyright (C)
013 2004. All Rights Reserved.
014
015 Contributor(s): ______________________________________.
016
017 Alternatively, the contents of this file may be used under the terms of the
018 GNU General Public License (the �GPL�), in which case the provisions of the GPL are
019 applicable instead of those above. If you wish to allow use of your version of this
020 file only under the terms of the GPL and not to allow others to use your version
021 of this file under the MPL, indicate your decision by deleting the provisions above
022 and replace them with the notice and other provisions required by the GPL License.
023 If you do not delete the provisions above, a recipient may use your version of
024 this file under either the MPL or the GPL.
025 */
026
027 package ca.uhn.hl7v2.protocol.impl;
028
029 import java.io.IOException;
030 import java.net.MalformedURLException;
031 import java.net.ServerSocket;
032 import java.net.URL;
033 import java.util.ArrayList;
034 import java.util.Iterator;
035 import java.util.List;
036 import java.util.StringTokenizer;
037
038 import org.slf4j.Logger;
039 import org.slf4j.LoggerFactory;
040
041 import ca.uhn.hl7v2.HL7Exception;
042 import ca.uhn.hl7v2.protocol.ApplicationRouter;
043 import ca.uhn.hl7v2.protocol.Processor;
044 import ca.uhn.hl7v2.protocol.ProcessorContext;
045 import ca.uhn.hl7v2.protocol.SafeStorage;
046 import ca.uhn.hl7v2.protocol.TransportException;
047 import ca.uhn.hl7v2.protocol.TransportLayer;
048
049 /**
050 * A TCP/IP based server.
051 *
052 * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
053 * @version $Revision: 1.2 $ updated on $Date: 2009-06-30 13:30:45 $ by $Author: jamesagnew $
054 */
055 public class HL7Server {
056
057 private static Logger log = LoggerFactory.getLogger(HL7Server.class);
058
059 private final ServerSocket myServerSocket;
060 private ServerSocket myServerSocket2;
061 private final ApplicationRouter myRouter;
062 private final SafeStorage myStorage;
063
064 private boolean myIsRunning = false;
065 private List<Processor> myProcessors;
066
067 /**
068 * @param theServerSocket a ServerSocket on which to listen for connections that will
069 * be used for both locally-driven and remotely-driven message exchanges
070 * @param theRouter used to send incoming messages to appropriate <code>Application</code>s
071 * @param theStorage used to commit incoming messages to safe storage before returning
072 * an accept ACK
073 */
074 public HL7Server(ServerSocket theServerSocket, ApplicationRouter theRouter, SafeStorage theStorage) {
075 myServerSocket = theServerSocket;
076 myRouter = theRouter;
077 myStorage = theStorage;
078 initProcessorList();
079 }
080
081 /**
082 * @param theLocallyDriven a ServerSocket on which to listen for connections that will
083 * be used for locally-initiated message exchanges
084 * @param theRemotelyDriven a ServerSocket on which to listen for connections that will
085 * be used for remotely-initiated message exchanges
086 * @param theRouter used to send incoming messages to appropriate <code>Application</code>s
087 * @param theStorage used to commit incoming messages to safe storage before returning
088 * an accept ACK
089 */
090 public HL7Server(ServerSocket theLocallyDriven, ServerSocket theRemotelyDriven,
091 ApplicationRouter theRouter, SafeStorage theStorage) {
092
093 myServerSocket = theLocallyDriven;
094 myServerSocket2 = theRemotelyDriven;
095 myRouter = theRouter;
096 myStorage = theStorage;
097 initProcessorList();
098 }
099
100 //creates list and starts thread to clean dead processors from it
101 private void initProcessorList() {
102 myProcessors = new ArrayList<Processor>();
103
104 final List<Processor> processors = myProcessors;
105 Thread cleaner = new Thread() {
106 public void run() {
107 try {
108 Thread.sleep(1000);
109 } catch (InterruptedException e) {}
110
111 synchronized (processors) {
112 Iterator<Processor> it = processors.iterator();
113 while (it.hasNext()) {
114 Processor proc = it.next();
115 if (!proc.getContext().getLocallyDrivenTransportLayer().isConnected()
116 || !proc.getContext().getRemotelyDrivenTransportLayer().isConnected()) {
117 it.remove();
118 }
119 }
120 }
121 }
122 };
123 cleaner.start();
124 }
125
126 /**
127 * Accepts a single inbound connection if the same ServerSocket is used for
128 * all message exchanges, or a connection from each if two ServerSockets are
129 * being used.
130 *
131 * @param theAddress the IP address from which to accept connections (null means
132 * accept from any address). Connection attempts from other addresses will
133 * be ignored.
134 * @return a <code>Processor</code> connected to the given address
135 * @throws TransportException
136 */
137 public Processor accept(String theAddress) throws TransportException {
138 TransportLayer transport = getTransport(myServerSocket, theAddress);
139 ProcessorContext context = null;
140
141 if (myServerSocket2 == null) { //we're doing inbound & outbound on the same port
142 transport.connect();
143 context = new ProcessorContextImpl(myRouter, transport, myStorage);
144 } else {
145 TransportLayer transport2 = getTransport(myServerSocket2, theAddress);
146 DualTransportConnector connector = new DualTransportConnector(transport, transport2);
147 connector.connect();
148
149 context = new ProcessorContextImpl(myRouter, transport, transport2, myStorage);
150 }
151 return new ProcessorImpl(context, true);
152 }
153
154 private static TransportLayer getTransport(ServerSocket theServerSocket, String theAddress) throws TransportException {
155 ServerSocketStreamSource ss = new ServerSocketStreamSource(theServerSocket, theAddress);
156 return new MLLPTransport(ss);
157 }
158
159 /**
160 * Starts accepting connections in a new Thread. Note that this can be
161 * called multiple times with separate addresses. The stop() method ends
162 * all Threads started here.
163 *
164 * @param theAddress IP address from which connections are accepted (null
165 * means any address is OK)
166 */
167 public void start(final String theAddress) {
168 final HL7Server server = this;
169 Runnable acceptor = new Runnable() {
170 public void run() {
171 while (server.isRunning()) {
172 try {
173 Processor p = server.accept(theAddress);
174 if (!myIsRunning) {
175 p.stop();
176 } else {
177 server.newProcessor(p);
178 Thread.sleep(1);
179 }
180 } catch (TransportException e) {
181 log.error(e.getMessage(), e);
182 } catch (InterruptedException e) {
183 }
184 }
185 }
186 };
187
188 myIsRunning = true;
189
190 Thread thd = new Thread(acceptor);
191 thd.start();
192 }
193
194 private void newProcessor(Processor theProcessor) {
195 synchronized (myProcessors) {
196 myProcessors.add(theProcessor);
197 }
198 }
199
200 /**
201 * Stops running after the next connection is made.
202 */
203 public void stop() {
204 myIsRunning = false;
205 synchronized (myProcessors) {
206 for (Processor next : myProcessors) {
207 next.stop();
208 }
209 }
210 }
211
212 /**
213 * Returns <code>true</code> between when start() returns and when stop() is called.
214 *
215 * Note that this is not the same as checking whether there are any active connections to
216 * this server. To determine this, call {@link #getProcessors()} and check whether the array
217 * returned is non-empty.
218 *
219 * @return true between when start() returns and when stop() is called.
220 */
221 public boolean isRunning() {
222 return myIsRunning;
223 }
224
225 /**
226 * @return <code>Processor</code>s arising from connections to this server
227 */
228 public Processor[] getProcessors() {
229 synchronized (myProcessors) {
230 return (Processor[]) myProcessors.toArray(new Processor[0]);
231 }
232 }
233
234 /**
235 *
236 * @param theUrlSpec a string specifying an URL, which can optionally begin with "classpath:"
237 * @return the resource specified after "classpath:", if that's how it starts, otherwise
238 * new URL(theUrlSpec)
239 * @throws MalformedURLException
240 */
241 private static URL getURL(String theUrlSpec) throws MalformedURLException {
242 URL url = null;
243 if (theUrlSpec.startsWith("classpath:")) {
244 StringTokenizer tok = new StringTokenizer(theUrlSpec, ":", false);
245 tok.nextToken();
246 String resource = tok.nextToken();
247 url = Thread.currentThread().getContextClassLoader().getResource(resource);
248 } else {
249 url = new URL(theUrlSpec);
250 }
251 return url;
252 }
253
254 public static void main(String[] args) {
255 if (args.length < 1 || args.length > 3) {
256 System.out.println("Usage: HL7Server (shared_port | (locally_driven_port remotely_driven_port)) app_binding_URL");
257 System.exit(1);
258 }
259
260 SafeStorage storage = new NullSafeStorage();
261 ApplicationRouter router = new ApplicationRouterImpl();
262
263 try {
264 HL7Server server = null;
265 String appURL = null;
266 if (args.length == 2) {
267 int port = Integer.parseInt(args[0]);
268 server = new HL7Server(new ServerSocket(port), router, storage);
269 appURL = args[1];
270 } else {
271 int localPort = Integer.parseInt(args[0]);
272 int remotePort = Integer.parseInt(args[1]);
273 server = new HL7Server(new ServerSocket(localPort), new ServerSocket(remotePort), router, storage);
274 appURL = args[2];
275 }
276
277 ApplicationLoader.loadApplications(router, getURL(appURL));
278
279 server.start(null); //any address OK
280
281 } catch (NumberFormatException e) {
282 System.out.println("Port arguments must be integers");
283 System.exit(2);
284 } catch (IOException e) {
285 e.printStackTrace();
286 System.exit(3);
287 } catch (HL7Exception e) {
288 e.printStackTrace();
289 System.exit(4);
290 } catch (ClassNotFoundException e) {
291 e.printStackTrace();
292 System.exit(5);
293 } catch (InstantiationException e) {
294 e.printStackTrace();
295 System.exit(6);
296 } catch (IllegalAccessException e) {
297 e.printStackTrace();
298 System.exit(7);
299 }
300
301 }
302 }