001    /**
002    The contents of this file are subject to the Mozilla Public License Version 1.1 
003    (the "License"); you may not use this file except in compliance with the License. 
004    You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
005    Software distributed under the License is distributed on an "AS IS" basis, 
006    WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
007    specific language governing rights and limitations under the License. 
008    
009    The Original Code is "Initiator.java".  Description: 
010    "Performs the initiation role of a message exchange accorging to HL7's original 
011     mode rules." 
012    
013    The Initial Developer of the Original Code is University Health Network. Copyright (C) 
014    2002.  All Rights Reserved. 
015    
016    Contributor(s): ______________________________________. 
017    
018    Alternatively, the contents of this file may be used under the terms of the 
019    GNU General Public License (the  �GPL�), in which case the provisions of the GPL are 
020    applicable instead of those above.  If you wish to allow use of your version of this 
021    file only under the terms of the GPL and not to allow others to use your version 
022    of this file under the MPL, indicate your decision by deleting  the provisions above 
023    and replace  them with the notice and other provisions required by the GPL License.  
024    If you do not delete the provisions above, a recipient may use your version of 
025    this file under either the MPL or the GPL. 
026    
027     */
028    
029    package ca.uhn.hl7v2.app;
030    
031    import java.io.IOException;
032    import java.net.Socket;
033    import java.util.concurrent.ExecutionException;
034    import java.util.concurrent.Future;
035    
036    import org.slf4j.Logger;
037    import org.slf4j.LoggerFactory;
038    
039    import ca.uhn.hl7v2.HL7Exception;
040    import ca.uhn.hl7v2.llp.LLPException;
041    import ca.uhn.hl7v2.llp.LowerLayerProtocol;
042    import ca.uhn.hl7v2.model.Message;
043    import ca.uhn.hl7v2.parser.Parser;
044    import ca.uhn.hl7v2.parser.PipeParser;
045    import ca.uhn.hl7v2.util.MessageIDGenerator;
046    import ca.uhn.hl7v2.util.Terser;
047    
048    /**
049     * <p>
050     * Performs the initiation role of a message exchange (i.e sender of the first
051     * message; analogous to the client in a client-server interaction), according
052     * to HL7's original mode processing rules.
053     * </p>
054     * <p>
055     * The <code>sendAndReceive(...)</code> method blocks until either a response is
056     * received with the matching message ID, or until a timeout period has passed.
057     * The timeout defaults to 10000 ms (10 sec) but can be configured by setting
058     * the system property "ca.uhn.hl7v2.app.initiator.timeout" to an integer value
059     * representing the number of ms after which to time out.
060     * </p>
061     * <p>
062     * At the time of writing, enhanced mode, two-phase reply, continuation
063     * messages, and batch processing are unsupported.
064     * </p>
065     * 
066     * @author Bryan Tripp
067     */
068    public class Initiator {
069    
070            private static final Logger log = LoggerFactory.getLogger(Initiator.class);
071            private static final Logger rawOutbound = LoggerFactory
072                            .getLogger("ca.uhn.hl7v2.raw.outbound");
073            private static final Logger rawInbound = LoggerFactory
074                            .getLogger("ca.uhn.hl7v2.raw.inbound");
075            private Connection conn;
076            private int timeoutMillis = 10000;
077    
078            /**
079             * Creates a new instance of Initiator.
080             * 
081             * @param conn
082             *            the Connection associated with this Initiator.
083             */
084            Initiator(Connection conn) throws LLPException {
085                    this.conn = conn;
086    
087                    // see if timeout has been set
088                    String timeout = System
089                                    .getProperty("ca.uhn.hl7v2.app.initiator.timeout");
090                    if (timeout != null) {
091                            try {
092                                    timeoutMillis = Integer.parseInt(timeout);
093                                    log.debug("Setting Initiator timeout to {} ms", timeout);
094                            } catch (NumberFormatException e) {
095                                    log.warn(timeout
096                                                    + " is not a valid integer - Initiator is using default timeout");
097                            }
098                    }
099            }
100    
101            /**
102             * Sends a message to a responder system, receives the reply, and returns
103             * the reply as a Message object. This method is thread-safe - multiple
104             * threads can share an Initiator and call this method. Responses are
105             * returned to the calling thread on the basis of message ID.
106             */
107            public Message sendAndReceive(Message out) throws HL7Exception,
108                            LLPException, IOException {
109                    if (out == null) {
110                            throw new HL7Exception("Can't encode null message",
111                                            HL7Exception.REQUIRED_FIELD_MISSING);
112                    }
113    
114                    // register message with response Receiver(s) (by message ID)
115                    Terser t = new Terser(out);
116                    String messID = t.get("/MSH-10");
117    
118                    if (messID == null || messID.length() == 0) {
119                            throw new HL7Exception(
120                                            "MSH segment missing required field Control ID (MSH-10)",
121                                            HL7Exception.REQUIRED_FIELD_MISSING);
122                    }
123    
124                    // log and send message
125                    String outbound = conn.getParser().encode(out);
126                    rawOutbound.debug(outbound);
127                    Future<String> inbound = null;
128                    try {
129                            String message = null;
130                            inbound = conn.waitForResponse(messID, timeoutMillis);
131                            conn.getSendWriter().writeMessage(outbound);
132                            if (inbound != null && (message = inbound.get()) != null) {
133                                    // log that we got the message
134                                    log.debug("Initiator received message: {}", message);
135                                    rawInbound.debug(message);
136                                    Message response = conn.getParser().parse(message);
137                                    log.debug("response parsed");
138                                    return response;
139                            }
140                    } catch (IOException e) {
141                            if (inbound != null)
142                                    inbound.cancel(true);
143                            conn.close();
144                            throw e;
145                    } catch (InterruptedException e) {
146                    } catch (ExecutionException e) {
147                    }
148    
149                    throw new HL7Exception(
150                                    "Timeout waiting for response to message with control ID "
151                                                    + messID);
152            }
153    
154            /**
155             * Sets the time (in milliseconds) that the initiator will wait for a
156             * response for a given message before timing out and throwing an exception
157             * (default is 10 seconds).
158             */
159            public void setTimeoutMillis(int timeout) {
160                    this.timeoutMillis = timeout;
161            }
162    
163            /**
164             * Test harness
165             */
166            public static void main(String args[]) {
167                    if (args.length != 2) {
168                            System.out.println("Usage: ca.uhn.hl7v2.app.Initiator host port");
169                    }
170    
171                    try {
172    
173                            // set up connection to server
174                            String host = args[0];
175                            int port = Integer.parseInt(args[1]);
176    
177                            final Parser parser = new PipeParser();
178                            LowerLayerProtocol llp = LowerLayerProtocol.makeLLP();
179                            Connection connection = new Connection(parser, llp, new Socket(
180                                            host, port));
181                            final Initiator initiator = connection.getInitiator();
182                            connection.activate();
183                            final String outText = "MSH|^~\\&|||||||ACK^^ACK|||R|2.4|\rMSA|AA";
184    
185                            // get a bunch of threads to send messages
186                            for (int i = 0; i < 1000; i++) {
187                                    Thread sender = new Thread(new Runnable() {
188                                            public void run() {
189                                                    try {
190                                                            // get message ID
191                                                            String ID = MessageIDGenerator.getInstance()
192                                                                            .getNewID();
193                                                            Message out = parser.parse(outText);
194                                                            Terser tOut = new Terser(out);
195                                                            tOut.set("/MSH-10", ID);
196    
197                                                            // send, get response
198                                                            Message in = initiator.sendAndReceive(out);
199                                                            // get ACK ID
200                                                            Terser tIn = new Terser(in);
201                                                            String ackID = tIn.get("/MSA-2");
202                                                            if (ID.equals(ackID)) {
203                                                                    System.out.println("OK - ack ID matches");
204                                                            } else {
205                                                                    throw new RuntimeException(
206                                                                                    "Ack ID for message " + ID + " is "
207                                                                                                    + ackID);
208                                                            }
209    
210                                                    } catch (Exception e) {
211                                                            e.printStackTrace();
212                                                    }
213                                            }
214                                    });
215                                    sender.start();
216                            }
217    
218                    } catch (Exception e) {
219                            e.printStackTrace();
220                    }
221            }
222    
223    }