001 /*
002 The contents of this file are subject to the Mozilla Public License Version 1.1
003 (the "License"); you may not use this file except in compliance with the License.
004 You may obtain a copy of the License at http://www.mozilla.org/MPL/
005 Software distributed under the License is distributed on an "AS IS" basis,
006 WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the
007 specific language governing rights and limitations under the License.
008
009 The Original Code is "ProcessorImpl.java". Description:
010 "A default implementation of Processor."
011
012 The Initial Developer of the Original Code is University Health Network. Copyright (C)
013 2004. All Rights Reserved.
014
015 Contributor(s): ______________________________________.
016
017 Alternatively, the contents of this file may be used under the terms of the
018 GNU General Public License (the �GPL�), in which case the provisions of the GPL are
019 applicable instead of those above. If you wish to allow use of your version of this
020 file only under the terms of the GPL and not to allow others to use your version
021 of this file under the MPL, indicate your decision by deleting the provisions above
022 and replace them with the notice and other provisions required by the GPL License.
023 If you do not delete the provisions above, a recipient may use your version of
024 this file under either the MPL or the GPL.
025 */
026
027 package ca.uhn.hl7v2.protocol.impl;
028
029 import java.util.HashMap;
030 import java.util.Iterator;
031 import java.util.Map;
032 import java.util.concurrent.ExecutorService;
033 import java.util.concurrent.Executors;
034
035 import org.slf4j.Logger;
036 import org.slf4j.LoggerFactory;
037
038 import ca.uhn.hl7v2.HL7Exception;
039 import ca.uhn.hl7v2.preparser.PreParser;
040 import ca.uhn.hl7v2.protocol.Processor;
041 import ca.uhn.hl7v2.protocol.ProcessorContext;
042 import ca.uhn.hl7v2.protocol.TransportException;
043 import ca.uhn.hl7v2.protocol.TransportLayer;
044 import ca.uhn.hl7v2.protocol.Transportable;
045
046 /**
047 * A default implementation of <code>Processor</code>.
048 *
049 * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
050 * @version $Revision: 1.4 $ updated on $Date: 2009-12-16 19:36:34 $ by $Author: jamesagnew $
051 */
052 public class ProcessorImpl implements Processor {
053
054 private static final Logger log = LoggerFactory.getLogger(ProcessorImpl.class);
055
056 private ProcessorContext myContext;
057 private final Map<String, ExpiringTransportable> myAcceptAcks;
058 private final Map<String, Long> myReservations;
059 private final Map<String, ExpiringTransportable> myAvailableMessages;
060 private boolean myThreaded; //true if separate threads are calling cycle()
061 private Cycler ackCycler;
062 private Cycler nonAckCycler;
063 private ExecutorService myResponseExecutorService;
064
065 /**
066 * @param theContext source of supporting services
067 * @param isThreaded true if this class should create threads in which to call cycle(), and
068 * in which to send responses from Applications. This is the preferred mode. Use false
069 * if threading is not allowed, eg you are running the code in an EJB container. In this case,
070 * the send() and receive() methods will call cycle() themselves as needed. However, cycle()
071 * makes potentially blocking calls, so these methods may not return until the next message
072 * is received from the remote server, regardless of timeout. Probably the worst example of this
073 * would be if receive() was called to wait for an application ACK that was specified as "RE" (ie
074 * required on error). No response will be returned if the message is processed without error,
075 * and in a non-threaded environment, receive() will block forever. Use true if you can, otherwise
076 * study this class carefully.
077 *
078 * TODO: write a MLLPTransport with non-blocking IO
079 * TODO: reconnect transport layers on error and retry
080 */
081 public ProcessorImpl(ProcessorContext theContext, boolean isThreaded) {
082 myContext = theContext;
083 myThreaded = isThreaded;
084 myAcceptAcks = new HashMap<String, ExpiringTransportable>();
085 myReservations = new HashMap<String, Long>();
086 myAvailableMessages = new HashMap<String, ExpiringTransportable>();
087
088 if (isThreaded) {
089 myResponseExecutorService = Executors.newSingleThreadExecutor();
090
091 ackCycler = new Cycler(this, true);
092 Thread ackThd = new Thread(ackCycler);
093 ackThd.start();
094 nonAckCycler = new Cycler(this, false);
095 Thread nonAckThd = new Thread(nonAckCycler);
096 nonAckThd.start();
097 }
098 }
099
100 /**
101 * If self-threaded, stops threads that have been created.
102 */
103 public void stop() {
104 if (myThreaded) {
105 ackCycler.stop();
106 nonAckCycler.stop();
107
108 myResponseExecutorService.shutdownNow();
109 }
110 }
111
112 /**
113 * @see ca.uhn.hl7v2.protocol.Processor#send(ca.uhn.hl7v2.protocol.Transportable, int, long)
114 */
115 public void send(Transportable theMessage, int maxRetries, long retryIntervalMillis) throws HL7Exception {
116 String[] fieldPaths = {"MSH-10", "MSH-15", "MSH-16"};
117 String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths);
118 String controlId = fields[0];
119 String needAcceptAck = fields[1];
120 String needAppAck = fields[2];
121
122 checkValidAckNeededCode(needAcceptAck);
123
124 trySend(myContext.getLocallyDrivenTransportLayer(), theMessage);
125
126 boolean originalMode = (needAcceptAck == null && needAppAck == null);
127 if (originalMode || !NE.equals(needAcceptAck)) {
128
129 Transportable response = null;
130 int retries = 0;
131 do {
132 long until = System.currentTimeMillis() + retryIntervalMillis;
133 while (response == null && System.currentTimeMillis() < until) {
134 synchronized (this) {
135 ExpiringTransportable et = myAcceptAcks.remove(controlId);
136 if (et == null) {
137 cycleIfNeeded(true);
138 } else {
139 response = et.transportable;
140 }
141 }
142 sleepIfNeeded();
143 }
144
145 if ((response == null && needAcceptAck != null && needAcceptAck.equals(AL))
146 || (response != null && isReject(response))) {
147 log.info("Resending message {}", controlId);
148 trySend(myContext.getLocallyDrivenTransportLayer(), theMessage);
149 response = null;
150 }
151
152 if (response != null && isError(response)) {
153 String[] errMsgPath = {"MSA-3"};
154 String[] errMsg = PreParser.getFields(response.getMessage(), errMsgPath);
155 throw new HL7Exception("Error message received: " + errMsg[0]);
156 }
157
158 } while (response == null && ++retries <= maxRetries);
159 }
160 }
161
162 private void checkValidAckNeededCode(String theCode) throws HL7Exception {
163 //must be one of the below ...
164 if ( !(theCode == null || theCode.equals("")
165 ||theCode.equals(AL) || theCode.equals(ER)
166 || theCode.equals(NE) || theCode.equals(SU)) ) {
167 throw new HL7Exception("MSH-15 must be AL, ER, NE, or SU in the outgoing message");
168 }
169 }
170
171 /**
172 * Calls cycle() if we do not expect another thread to be doing so
173 * @param expectingAck as in cycle
174 */
175 private void cycleIfNeeded(boolean expectingAck) throws HL7Exception {
176 if (!myThreaded) {
177 cycle(expectingAck);
178 }
179 }
180
181 /**
182 * Sleeps for 1 ms if externally threaded (this is to let the CPU idle).
183 */
184 private void sleepIfNeeded() {
185 if (myThreaded) {
186 try {
187 Thread.sleep(1);
188 } catch (InterruptedException e) { /* no problem */ }
189 }
190 }
191
192 /** Returns true if a CR or AR ACK */
193 private static boolean isReject(Transportable theMessage) throws HL7Exception {
194 boolean reject = false;
195 String[] fieldPaths = {"MSA-1"};
196 String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths);
197 if (fields[0] != null && (fields[0].equals(CR) || fields[0].equals(AR))) {
198 reject = true;
199 }
200 return reject;
201 }
202
203 /** Returns true if a CE or AE ACK */
204 private static boolean isError(Transportable theMessage) throws HL7Exception {
205 boolean error = false;
206 String[] fieldPaths = {"MSA-1"};
207 String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths);
208 if (fields[0] != null && (fields[0].equals(CE) || fields[0].equals(AE))) {
209 error = true;
210 }
211 return error;
212 }
213
214 /**
215 * @see ca.uhn.hl7v2.protocol.Processor#reserve(java.lang.String, long)
216 */
217 public synchronized void reserve(String theAckId, long thePeriodMillis) {
218 Long expiry = new Long(System.currentTimeMillis() + thePeriodMillis);
219 myReservations.put(theAckId, expiry);
220 }
221
222 /**
223 * Tries to send the message, and if there is an error reconnects and tries again.
224 */
225 private void trySend(TransportLayer theTransport, Transportable theTransportable) throws TransportException {
226 try {
227 theTransport.send(theTransportable);
228 } catch (TransportException e) {
229 theTransport.disconnect();
230 theTransport.connect();
231 theTransport.send(theTransportable);
232 }
233 }
234
235
236 /**
237 * Tries to receive a message, and if there is an error reconnects and tries again.
238 */
239 private Transportable tryReceive(TransportLayer theTransport) throws TransportException {
240 Transportable message = null;
241 try {
242 message = theTransport.receive();
243 } catch (TransportException e) {
244 theTransport.disconnect();
245 theTransport.connect();
246 message = theTransport.receive();
247 }
248 return message;
249 }
250
251 /**
252 * @see ca.uhn.hl7v2.protocol.Processor#cycle(boolean)
253 */
254 public void cycle(boolean expectingAck) throws HL7Exception {
255 log.debug("In cycle()");
256
257 cleanReservations();
258 cleanAcceptAcks();
259 cleanReservedMessages();
260
261 Transportable in = null;
262 try {
263 if (expectingAck) {
264 in = tryReceive(myContext.getLocallyDrivenTransportLayer());
265 } else {
266 in = tryReceive(myContext.getRemotelyDrivenTransportLayer());
267 }
268 } catch (TransportException e) {
269 try {
270 Thread.sleep(1000);
271 } catch (InterruptedException e1) {}
272 throw e;
273 }
274
275 // log
276 if (in != null) {
277 log.debug("Received message: {}", in.getMessage());
278 } else {
279 log.debug("Received no message");
280 }
281
282 // If we have a message, handle it
283 if (in != null) {
284 String acceptAckNeeded = null;
285 // String appAckNeeded = null;
286 String ackCode = null;
287 String ackId = null;
288
289 try {
290 String[] fieldPaths = {"MSH-15", "MSH-16", "MSA-1", "MSA-2"};
291 String[] fields = PreParser.getFields(in.getMessage(), fieldPaths);
292 acceptAckNeeded = fields[0];
293 // appAckNeeded = fields[1];
294 ackCode = fields[2];
295 ackId = fields[3];
296 } catch (HL7Exception e) {
297 log.warn("Failed to parse accept ack fields in incoming message", e);
298 }
299
300 if (ackId != null && ackCode != null && ackCode.startsWith("C")) {
301 long expiryTime = System.currentTimeMillis() + 1000 * 60;
302 myAcceptAcks.put(ackId, new ExpiringTransportable(in, expiryTime));
303 } else {
304 AcceptAcknowledger.AcceptACK ack = AcceptAcknowledger.validate(getContext(), in);
305
306 if ((acceptAckNeeded != null && acceptAckNeeded.equals(AL))
307 || (acceptAckNeeded != null && acceptAckNeeded.equals(ER) && !ack.isAcceptable())
308 || (acceptAckNeeded != null && acceptAckNeeded.equals(SU) && ack.isAcceptable())) {
309 trySend(myContext.getRemotelyDrivenTransportLayer(), ack.getMessage());
310 }
311
312 if (ack.isAcceptable()) {
313 if (isReserved(ackId)) {
314
315 log.debug("Received expected ACK message with ACK ID: {}", ackId);
316
317 removeReservation(ackId);
318 long expiryTime = System.currentTimeMillis() + 1000 * 60 * 5;
319 myAvailableMessages.put(ackId, new ExpiringTransportable(in, expiryTime));
320
321 } else {
322
323 log.debug("Sending message to router");
324 Transportable out = myContext.getRouter().processMessage(in);
325 sendAppResponse(out);
326
327 }
328 } else {
329 // TODO: should we do something more here? Might be nice to
330 // allow a configurable handler for this situation
331 log.warn("Incoming message was not acceptable");
332 }
333
334 }
335 } else {
336 String transport = expectingAck ? " Locally driven " : "Remotely driven";
337 log.debug("{} TransportLayer.receive() returned null.", transport);
338 }
339
340 sleepIfNeeded();
341
342 log.debug("Exiting cycle()");
343 }
344
345 /** Sends in a new thread if isThreaded, otherwise in current thread */
346 private void sendAppResponse(final Transportable theResponse) {
347 final ProcessorImpl processor = this;
348 Runnable sender = new Runnable() {
349 public void run() {
350 try {
351 log.debug("Sending response: {}", theResponse);
352
353 //TODO: make configurable
354 processor.send(theResponse, 2, 3000);
355
356 } catch (HL7Exception e) {
357 log.error("Error trying to send response from Application", e);
358 }
359 }
360 };
361
362 if (myThreaded) {
363 myResponseExecutorService.execute(sender);
364 } else {
365 sender.run();
366 }
367 }
368
369 /**
370 * Removes expired message reservations from the reservation list.
371 */
372 private synchronized void cleanReservations() {
373 Iterator<String> it = myReservations.keySet().iterator();
374 while (it.hasNext()) {
375 String ackId = it.next();
376 Long expiry = myReservations.get(ackId);
377 if (System.currentTimeMillis() > expiry.longValue()) {
378 it.remove();
379 }
380 }
381 }
382
383 /**
384 * Discards expired accept acknowledgements (these are used in retry protocol; see send()).
385 */
386 private synchronized void cleanAcceptAcks() {
387 Iterator<String> it = myAcceptAcks.keySet().iterator();
388 while (it.hasNext()) {
389 String ackId = it.next();
390 ExpiringTransportable et = myAcceptAcks.get(ackId);
391 if (System.currentTimeMillis() > et.expiryTime) {
392 it.remove();
393 }
394 }
395 }
396
397 private synchronized void cleanReservedMessages() throws HL7Exception {
398 Iterator<String> it = myAvailableMessages.keySet().iterator();
399 while (it.hasNext()) {
400 String ackId = it.next();
401 ExpiringTransportable et = myAvailableMessages.get(ackId);
402 if (System.currentTimeMillis() > et.expiryTime) {
403 it.remove();
404
405 //send to an Application
406 Transportable out = myContext.getRouter().processMessage(et.transportable);
407 sendAppResponse(out);
408 }
409 }
410 }
411
412 private synchronized boolean isReserved(String ackId) {
413 boolean reserved = false;
414 if (myReservations.containsKey(ackId)) {
415 reserved = true;
416 }
417 return reserved;
418 }
419
420 private synchronized void removeReservation(String ackId) {
421 myReservations.remove(ackId);
422 }
423
424
425 /**
426 * @see ca.uhn.hl7v2.protocol.Processor#isAvailable(java.lang.String)
427 */
428 public boolean isAvailable(String theAckId) {
429 boolean available = false;
430 if (myAvailableMessages.containsKey(theAckId)) {
431 available = true;
432 }
433 return available;
434 }
435
436 /**
437 * @see ca.uhn.hl7v2.protocol.Processor#receive(java.lang.String, long)
438 */
439 public Transportable receive(String theAckId, long theTimeoutMillis) throws HL7Exception {
440 if (!isReserved(theAckId)) {
441 reserve(theAckId, theTimeoutMillis);
442 }
443
444 Transportable in = null;
445 long until = System.currentTimeMillis() + theTimeoutMillis;
446 do {
447 synchronized (this) {
448 ExpiringTransportable et = myAvailableMessages.get(theAckId);
449 if (et == null) {
450 cycleIfNeeded(false);
451 } else {
452 in = et.transportable;
453 }
454 }
455 sleepIfNeeded();
456 } while (in == null && System.currentTimeMillis() < until);
457 return in;
458 }
459
460 /**
461 * @see ca.uhn.hl7v2.protocol.Processor#getContext()
462 */
463 public ProcessorContext getContext() {
464 return myContext;
465 }
466
467 /**
468 * A struct for Transportable collection entries that time out.
469 *
470 * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
471 * @version $Revision: 1.4 $ updated on $Date: 2009-12-16 19:36:34 $ by $Author: jamesagnew $
472 */
473 class ExpiringTransportable {
474 public Transportable transportable;
475 public long expiryTime;
476
477 public ExpiringTransportable(Transportable theTransportable, long theExpiryTime) {
478 transportable = theTransportable;
479 expiryTime = theExpiryTime;
480 }
481 }
482
483 /**
484 * A Runnable that repeatedly calls the cycle() method of this class.
485 *
486 * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
487 * @version $Revision: 1.4 $ updated on $Date: 2009-12-16 19:36:34 $ by $Author: jamesagnew $
488 */
489 private static class Cycler implements Runnable {
490
491 private Processor myProcessor;
492 private boolean myExpectingAck;
493 private boolean isRunning;
494
495 /**
496 * @param theProcessor the processor on which to call cycle()
497 * @param isExpectingAck passed to cycle()
498 */
499 public Cycler(Processor theProcessor, boolean isExpectingAck) {
500 myProcessor = theProcessor;
501 myExpectingAck = isExpectingAck;
502 isRunning = true;
503 }
504
505 /**
506 * Execution will stop at the end of the next cycle.
507 */
508 public void stop() {
509 isRunning = false;
510 }
511
512 /**
513 * Calls cycle() repeatedly on the Processor given in the
514 * constructor, until stop() is called.
515 *
516 * @see java.lang.Runnable#run()
517 */
518 public void run() {
519 while (isRunning) {
520 try {
521 myProcessor.cycle(myExpectingAck);
522 } catch (HL7Exception e) {
523 log.error("Error processing message", e);
524 }
525 }
526 }
527 }
528
529 }