001    /*
002    The contents of this file are subject to the Mozilla Public License Version 1.1 
003    (the "License"); you may not use this file except in compliance with the License. 
004    You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
005    Software distributed under the License is distributed on an "AS IS" basis, 
006    WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
007    specific language governing rights and limitations under the License. 
008    
009    The Original Code is "ProcessorImpl.java".  Description: 
010    "A default implementation of Processor." 
011    
012    The Initial Developer of the Original Code is University Health Network. Copyright (C) 
013    2004.  All Rights Reserved. 
014    
015    Contributor(s): ______________________________________. 
016    
017    Alternatively, the contents of this file may be used under the terms of the 
018    GNU General Public License (the  �GPL�), in which case the provisions of the GPL are 
019    applicable instead of those above.  If you wish to allow use of your version of this 
020    file only under the terms of the GPL and not to allow others to use your version 
021    of this file under the MPL, indicate your decision by deleting  the provisions above 
022    and replace  them with the notice and other provisions required by the GPL License.  
023    If you do not delete the provisions above, a recipient may use your version of 
024    this file under either the MPL or the GPL. 
025    */
026    
027    package ca.uhn.hl7v2.protocol.impl;
028    
029    import java.util.HashMap;
030    import java.util.Iterator;
031    import java.util.Map;
032    import java.util.concurrent.ExecutorService;
033    import java.util.concurrent.Executors;
034    
035    import org.slf4j.Logger;
036    import org.slf4j.LoggerFactory;
037    
038    import ca.uhn.hl7v2.HL7Exception;
039    import ca.uhn.hl7v2.preparser.PreParser;
040    import ca.uhn.hl7v2.protocol.Processor;
041    import ca.uhn.hl7v2.protocol.ProcessorContext;
042    import ca.uhn.hl7v2.protocol.TransportException;
043    import ca.uhn.hl7v2.protocol.TransportLayer;
044    import ca.uhn.hl7v2.protocol.Transportable;
045    
046    /**
047     * A default implementation of <code>Processor</code>.  
048     *  
049     * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
050     * @version $Revision: 1.4 $ updated on $Date: 2009-12-16 19:36:34 $ by $Author: jamesagnew $
051     */
052    public class ProcessorImpl implements Processor {
053    
054        private static final Logger log = LoggerFactory.getLogger(ProcessorImpl.class);
055    
056        private ProcessorContext myContext;
057        private final Map<String, ExpiringTransportable> myAcceptAcks;
058        private final Map<String, Long> myReservations;
059        private final Map<String, ExpiringTransportable> myAvailableMessages;
060        private boolean myThreaded; //true if separate threads are calling cycle()  
061        private Cycler ackCycler;
062        private Cycler nonAckCycler;
063        private ExecutorService myResponseExecutorService;
064        
065        /**
066         * @param theContext source of supporting services 
067         * @param isThreaded true if this class should create threads in which to call cycle(), and 
068         *  in which to send responses from Applications.  This is the preferred mode.  Use false 
069         *  if threading is not allowed, eg you are running the code in an EJB container.  In this case, 
070         *  the send() and receive() methods will call cycle() themselves as needed.  However, cycle() 
071         *  makes potentially blocking calls, so these methods may not return until the next message 
072         *  is received from the remote server, regardless of timeout.  Probably the worst example of this
073         *  would be if receive() was called to wait for an application ACK that was specified as "RE" (ie
074         *  required on error).  No response will be returned if the message is processed without error, 
075         *  and in a non-threaded environment, receive() will block forever.  Use true if you can, otherwise
076         *  study this class carefully.
077         *   
078         * TODO: write a MLLPTransport with non-blocking IO  
079         * TODO: reconnect transport layers on error and retry 
080         */
081        public ProcessorImpl(ProcessorContext theContext, boolean isThreaded) {
082            myContext = theContext;
083            myThreaded = isThreaded;
084            myAcceptAcks = new HashMap<String, ExpiringTransportable>();
085            myReservations = new HashMap<String, Long>();
086            myAvailableMessages = new HashMap<String, ExpiringTransportable>();
087            
088            if (isThreaded) {
089                myResponseExecutorService = Executors.newSingleThreadExecutor(); 
090    
091                    ackCycler = new Cycler(this, true);
092                Thread ackThd = new Thread(ackCycler);
093                ackThd.start();
094                nonAckCycler = new Cycler(this, false);
095                Thread nonAckThd = new Thread(nonAckCycler);
096                nonAckThd.start();            
097            }
098        }
099        
100        /**
101         * If self-threaded, stops threads that have been created.  
102         */
103        public void stop() {
104            if (myThreaded) {
105                ackCycler.stop();
106                nonAckCycler.stop();
107    
108                myResponseExecutorService.shutdownNow();
109            }
110        }
111    
112        /**
113         * @see ca.uhn.hl7v2.protocol.Processor#send(ca.uhn.hl7v2.protocol.Transportable, int, long)
114         */
115        public void send(Transportable theMessage, int maxRetries, long retryIntervalMillis) throws HL7Exception {
116            String[] fieldPaths = {"MSH-10", "MSH-15", "MSH-16"};
117            String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths);
118            String controlId = fields[0];
119            String needAcceptAck = fields[1];
120            String needAppAck = fields[2];
121            
122            checkValidAckNeededCode(needAcceptAck);
123            
124            trySend(myContext.getLocallyDrivenTransportLayer(), theMessage);
125            
126            boolean originalMode = (needAcceptAck == null && needAppAck == null); 
127            if (originalMode || !NE.equals(needAcceptAck)) {
128            
129                Transportable response = null;
130                int retries = 0;
131                do {
132                    long until = System.currentTimeMillis() + retryIntervalMillis;
133                    while (response == null && System.currentTimeMillis() < until) {
134                        synchronized (this) {
135                            ExpiringTransportable et = myAcceptAcks.remove(controlId);
136                            if (et == null) {
137                                cycleIfNeeded(true);
138                            } else {
139                                response = et.transportable;
140                            }
141                        }
142                        sleepIfNeeded();
143                    }
144                    
145                    if ((response == null && needAcceptAck != null && needAcceptAck.equals(AL))
146                            || (response != null && isReject(response))) {
147                        log.info("Resending message {}", controlId);
148                        trySend(myContext.getLocallyDrivenTransportLayer(), theMessage);
149                        response = null;                    
150                    }
151                    
152                    if (response != null && isError(response)) {
153                        String[] errMsgPath = {"MSA-3"};
154                        String[] errMsg = PreParser.getFields(response.getMessage(), errMsgPath);                    
155                        throw new HL7Exception("Error message received: " + errMsg[0]);
156                    }
157                    
158                } while (response == null && ++retries <= maxRetries);
159            }
160        }
161        
162        private void checkValidAckNeededCode(String theCode) throws HL7Exception {
163            //must be one of the below ... 
164            if ( !(theCode == null || theCode.equals("") 
165                    ||theCode.equals(AL) || theCode.equals(ER) 
166                    || theCode.equals(NE) || theCode.equals(SU)) ) {
167                throw new HL7Exception("MSH-15 must be AL, ER, NE, or SU in the outgoing message");
168            }            
169        }
170        
171        /**
172         * Calls cycle() if we do not expect another thread to be doing so
173         * @param expectingAck as in cycle
174         */
175        private void cycleIfNeeded(boolean expectingAck) throws HL7Exception {
176            if (!myThreaded) {
177                cycle(expectingAck);
178            }        
179        }
180        
181        /**
182         * Sleeps for 1 ms if externally threaded (this is to let the CPU idle).   
183         */
184        private void sleepIfNeeded() {
185            if (myThreaded) {
186                try {
187                    Thread.sleep(1);
188                } catch (InterruptedException e) { /* no problem */ }
189            }                
190        }
191        
192        /** Returns true if a CR or AR ACK */ 
193        private static boolean isReject(Transportable theMessage) throws HL7Exception {
194            boolean reject = false;
195            String[] fieldPaths = {"MSA-1"};
196            String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths);
197            if (fields[0] != null && (fields[0].equals(CR) || fields[0].equals(AR))) {
198                reject = true;
199            }        
200            return reject;
201        }
202    
203        /** Returns true if a CE or AE ACK */ 
204        private static boolean isError(Transportable theMessage) throws HL7Exception {
205            boolean error = false;
206            String[] fieldPaths = {"MSA-1"};
207            String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths);
208            if (fields[0] != null && (fields[0].equals(CE) || fields[0].equals(AE))) {
209                error = true;
210            }
211            return error;
212        }
213    
214        /**
215         * @see ca.uhn.hl7v2.protocol.Processor#reserve(java.lang.String, long)
216         */
217        public synchronized void reserve(String theAckId, long thePeriodMillis) {
218            Long expiry = new Long(System.currentTimeMillis() + thePeriodMillis);
219            myReservations.put(theAckId, expiry);
220        }
221        
222        /**
223         * Tries to send the message, and if there is an error reconnects and tries again. 
224         */
225        private void trySend(TransportLayer theTransport, Transportable theTransportable) throws TransportException {
226            try {
227                theTransport.send(theTransportable);
228            } catch (TransportException e) {
229                theTransport.disconnect();
230                theTransport.connect();
231                theTransport.send(theTransportable);
232            }
233        }
234        
235        
236        /**
237         * Tries to receive a message, and if there is an error reconnects and tries again. 
238         */
239        private Transportable tryReceive(TransportLayer theTransport) throws TransportException {
240            Transportable message = null;
241            try {
242                message = theTransport.receive();            
243            } catch (TransportException e) {
244                theTransport.disconnect();
245                theTransport.connect();
246                message = theTransport.receive();
247            }
248            return message;
249        }
250    
251        /** 
252         * @see ca.uhn.hl7v2.protocol.Processor#cycle(boolean)
253         */
254        public void cycle(boolean expectingAck) throws HL7Exception {
255            log.debug("In cycle()");
256            
257            cleanReservations();
258            cleanAcceptAcks();
259            cleanReservedMessages();
260    
261            Transportable in = null;
262            try {
263                if (expectingAck) {
264                    in = tryReceive(myContext.getLocallyDrivenTransportLayer());
265                } else {
266                    in = tryReceive(myContext.getRemotelyDrivenTransportLayer());
267                }
268            } catch (TransportException e) {
269                try {
270                    Thread.sleep(1000);
271                } catch (InterruptedException e1) {}
272                throw e;
273            }
274            
275            // log
276            if (in != null) {
277                   log.debug("Received message: {}", in.getMessage());
278            } else {
279                    log.debug("Received no message");
280            }
281            
282            // If we have a message, handle it
283            if (in != null) { 
284                String acceptAckNeeded = null;
285    //            String appAckNeeded = null;
286                String ackCode = null;
287                String ackId = null;
288                
289                try {
290                        String[] fieldPaths = {"MSH-15", "MSH-16", "MSA-1", "MSA-2"};
291                        String[] fields = PreParser.getFields(in.getMessage(), fieldPaths);         
292                                    acceptAckNeeded = fields[0];
293    //                              appAckNeeded = fields[1];
294                                    ackCode = fields[2];
295                                    ackId = fields[3];
296                } catch (HL7Exception e) {
297                    log.warn("Failed to parse accept ack fields in incoming message", e);
298                }
299                
300                if (ackId != null && ackCode != null && ackCode.startsWith("C")) {
301                    long expiryTime = System.currentTimeMillis() + 1000 * 60;
302                    myAcceptAcks.put(ackId, new ExpiringTransportable(in, expiryTime));
303                } else {
304                    AcceptAcknowledger.AcceptACK ack = AcceptAcknowledger.validate(getContext(), in);
305                
306                    if ((acceptAckNeeded != null && acceptAckNeeded.equals(AL)) 
307                        || (acceptAckNeeded != null && acceptAckNeeded.equals(ER) && !ack.isAcceptable()) 
308                        || (acceptAckNeeded != null && acceptAckNeeded.equals(SU) && ack.isAcceptable())) {
309                        trySend(myContext.getRemotelyDrivenTransportLayer(), ack.getMessage());    
310                    }
311      
312                    if (ack.isAcceptable()) {
313                        if (isReserved(ackId)) {
314                            
315                            log.debug("Received expected ACK message with ACK ID: {}", ackId);
316                            
317                            removeReservation(ackId);
318                            long expiryTime = System.currentTimeMillis() + 1000 * 60 * 5;                
319                            myAvailableMessages.put(ackId, new ExpiringTransportable(in, expiryTime));
320                            
321                        } else {
322    
323                            log.debug("Sending message to router");
324                            Transportable out = myContext.getRouter().processMessage(in);
325                            sendAppResponse(out);
326                            
327                        }
328                    } else {
329                            // TODO: should we do something more here? Might be nice to 
330                            // allow a configurable handler for this situation
331                            log.warn("Incoming message was not acceptable");
332                    }
333                    
334                }
335            } else {
336                String transport = expectingAck ? " Locally driven " : "Remotely driven";
337                log.debug("{} TransportLayer.receive() returned null.", transport);
338            }
339            
340            sleepIfNeeded();
341    
342            log.debug("Exiting cycle()");
343        }
344        
345        /** Sends in a new thread if isThreaded, otherwise in current thread */
346        private void sendAppResponse(final Transportable theResponse) {
347            final ProcessorImpl processor = this;
348            Runnable sender = new Runnable() {
349                public void run() {
350                    try {
351                            log.debug("Sending response: {}", theResponse);
352                            
353                        //TODO: make configurable 
354                            processor.send(theResponse, 2, 3000);
355                            
356                    } catch (HL7Exception e) {
357                        log.error("Error trying to send response from Application", e);
358                    }
359                }
360            };
361            
362            if (myThreaded) {
363                myResponseExecutorService.execute(sender);
364            } else {
365                sender.run();
366            }
367        }
368        
369        /**
370         * Removes expired message reservations from the reservation list.  
371         */
372        private synchronized void cleanReservations() {
373            Iterator<String> it = myReservations.keySet().iterator();
374            while (it.hasNext()) {
375                String ackId = it.next();
376                Long expiry = myReservations.get(ackId);
377                if (System.currentTimeMillis() > expiry.longValue()) {
378                    it.remove();
379                }
380            }
381        }
382        
383        /**
384         * Discards expired accept acknowledgements (these are used in retry protocol; see send()).   
385         */
386        private synchronized void cleanAcceptAcks() {
387            Iterator<String> it = myAcceptAcks.keySet().iterator();
388            while (it.hasNext()) {
389                String ackId = it.next();
390                ExpiringTransportable et = myAcceptAcks.get(ackId);
391                if (System.currentTimeMillis() > et.expiryTime) {
392                    it.remove();
393                }
394            }        
395        }
396        
397        private synchronized void cleanReservedMessages() throws HL7Exception {
398            Iterator<String> it = myAvailableMessages.keySet().iterator();
399            while (it.hasNext()) {
400                String ackId = it.next();            
401                ExpiringTransportable et = myAvailableMessages.get(ackId);
402                if (System.currentTimeMillis() > et.expiryTime) {
403                    it.remove();
404                    
405                    //send to an Application 
406                    Transportable out = myContext.getRouter().processMessage(et.transportable);
407                    sendAppResponse(out);                
408                }
409            }  
410        }
411        
412        private synchronized boolean isReserved(String ackId) {
413            boolean reserved = false;
414            if (myReservations.containsKey(ackId)) {
415                reserved = true;
416            }
417            return reserved;
418        }
419        
420        private synchronized void removeReservation(String ackId) {
421            myReservations.remove(ackId);
422        }
423        
424    
425        /**
426         * @see ca.uhn.hl7v2.protocol.Processor#isAvailable(java.lang.String)
427         */
428        public boolean isAvailable(String theAckId) {
429            boolean available = false;
430            if (myAvailableMessages.containsKey(theAckId)) {
431                available = true;
432            }
433            return available;
434        }
435    
436        /** 
437         * @see ca.uhn.hl7v2.protocol.Processor#receive(java.lang.String, long)
438         */
439        public Transportable receive(String theAckId, long theTimeoutMillis) throws HL7Exception {
440            if (!isReserved(theAckId)) {
441                reserve(theAckId, theTimeoutMillis);
442            }
443            
444            Transportable in = null;
445            long until = System.currentTimeMillis() + theTimeoutMillis;
446            do {
447                synchronized (this) {
448                    ExpiringTransportable et = myAvailableMessages.get(theAckId);                
449                    if (et == null) {
450                        cycleIfNeeded(false);
451                    } else {
452                        in = et.transportable;
453                    }
454                }
455                sleepIfNeeded();
456            } while (in == null && System.currentTimeMillis() < until);
457            return in;
458        }
459    
460        /** 
461         * @see ca.uhn.hl7v2.protocol.Processor#getContext()
462         */
463        public ProcessorContext getContext() {
464            return myContext;
465        }
466        
467        /**
468         * A struct for Transportable collection entries that time out.  
469         *  
470         * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
471         * @version $Revision: 1.4 $ updated on $Date: 2009-12-16 19:36:34 $ by $Author: jamesagnew $
472         */
473        class ExpiringTransportable {
474            public Transportable transportable;
475            public long expiryTime;
476            
477            public ExpiringTransportable(Transportable theTransportable, long theExpiryTime) {
478                transportable = theTransportable;
479                expiryTime = theExpiryTime;
480            }
481        }
482        
483        /**
484         * A Runnable that repeatedly calls the cycle() method of this class.  
485         * 
486         * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
487         * @version $Revision: 1.4 $ updated on $Date: 2009-12-16 19:36:34 $ by $Author: jamesagnew $
488         */
489        private static class Cycler implements Runnable {
490    
491            private Processor myProcessor;
492            private boolean myExpectingAck;
493            private boolean isRunning;
494            
495            /**
496             * @param theProcessor the processor on which to call cycle()
497             * @param isExpectingAck passed to cycle()
498             */
499            public Cycler(Processor theProcessor, boolean isExpectingAck) {
500                myProcessor = theProcessor;
501                myExpectingAck = isExpectingAck;
502                isRunning = true;
503            }
504            
505            /**
506             * Execution will stop at the end of the next cycle.  
507             */
508            public void stop() {
509                isRunning = false;
510            }
511            
512            /** 
513             * Calls cycle() repeatedly on the Processor given in the 
514             * constructor, until stop() is called.  
515             * 
516             * @see java.lang.Runnable#run()
517             */
518            public void run() {
519                while (isRunning) {
520                    try {
521                        myProcessor.cycle(myExpectingAck);
522                    } catch (HL7Exception e) {
523                        log.error("Error processing message", e);
524                    }
525                }
526            }        
527        }
528    
529    }