001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.servicemix.eip.patterns;
018    
019    import java.util.Date;
020    import java.util.concurrent.TimeoutException;
021    
022    import javax.jbi.messaging.ExchangeStatus;
023    import javax.jbi.messaging.InOnly;
024    import javax.jbi.messaging.InOut;
025    import javax.jbi.messaging.MessageExchange;
026    import javax.jbi.messaging.MessagingException;
027    import javax.jbi.messaging.NormalizedMessage;
028    import javax.jbi.messaging.RobustInOnly;
029    
030    import org.apache.commons.logging.Log;
031    import org.apache.commons.logging.LogFactory;
032    import org.apache.servicemix.common.util.MessageUtil;
033    import org.apache.servicemix.eip.EIPEndpoint;
034    import org.apache.servicemix.eip.support.ExchangeTarget;
035    import org.apache.servicemix.expression.Expression;
036    import org.apache.servicemix.expression.PropertyExpression;
037    import org.apache.servicemix.timers.Timer;
038    import org.apache.servicemix.timers.TimerListener;
039    
040    /**
041     * 
042     * @author gnodet
043     * @org.apache.xbean.XBean element="async-bridge"
044     */
045    public class AsyncBridge extends EIPEndpoint {
046    
047        public static final String CORRID = "org.apache.servicemix.eip.asyncbridge.corrid";
048    
049        private static final Log LOG = LogFactory.getLog(AsyncBridge.class);
050    
051        private Expression requestCorrId = new Expression() {
052            public Object evaluate(MessageExchange exchange, NormalizedMessage message) throws MessagingException {
053                return exchange.getExchangeId();
054            }
055        };
056        private String responseCorrIdProperty = CORRID;
057        private Expression responseCorrId;
058        private long timeout;
059        private ExchangeTarget target;
060        private boolean useRobustInOnly;
061    
062        /**
063         * @return the timeout
064         */
065        public long getTimeout() {
066            return timeout;
067        }
068    
069        /**
070         * @param timeout the timeout to set
071         */
072        public void setTimeout(long timeout) {
073            this.timeout = timeout;
074        }
075    
076        /**
077         * @return the target
078         */
079        public ExchangeTarget getTarget() {
080            return target;
081        }
082    
083        /**
084         * @param target the target to set
085         */
086        public void setTarget(ExchangeTarget target) {
087            this.target = target;
088        }
089        
090        /**
091         * @return the requestCorrId
092         */
093        public Expression getRequestCorrId() {
094            return requestCorrId;
095        }
096    
097        /**
098         * @param requestCorrId the requestCorrId to set
099         */
100        public void setRequestCorrId(Expression requestCorrId) {
101            this.requestCorrId = requestCorrId;
102        }
103    
104        /**
105         * @return the responseCorrIdProperty
106         */
107        public String getResponseCorrIdProperty() {
108            return responseCorrIdProperty;
109        }
110    
111        /**
112         * @param responseCorrIdProperty the responseCorrIdProperty to set
113         */
114        public void setResponseCorrIdProperty(String responseCorrIdProperty) {
115            this.responseCorrIdProperty = responseCorrIdProperty;
116        }
117    
118        /**
119         * @return the responseCorrId
120         */
121        public Expression getResponseCorrId() {
122            return responseCorrId;
123        }
124    
125        /**
126         * @param responseCorrId the responseCorrId to set
127         */
128        public void setResponseCorrId(Expression responseCorrId) {
129            this.responseCorrId = responseCorrId;
130        }
131    
132        /**
133         * @return the useRobustInOnly
134         */
135        public boolean isUseRobustInOnly() {
136            return useRobustInOnly;
137        }
138    
139        /**
140         * @param useRobustInOnly the useRobustInOnly to set
141         */
142        public void setUseRobustInOnly(boolean useRobustInOnly) {
143            this.useRobustInOnly = useRobustInOnly;
144        }
145    
146        /*
147         * (non-Javadoc)
148         * @see org.apache.servicemix.eip.EIPEndpoint#start()
149         */
150        public void start() throws Exception {
151            super.start();
152            if (responseCorrId == null) {
153                responseCorrId = new PropertyExpression(responseCorrIdProperty);
154            }
155        }
156    
157        /* (non-Javadoc)
158         * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
159         */
160        protected void processSync(MessageExchange exchange) throws Exception {
161            throw new IllegalStateException();
162        }
163    
164        /* (non-Javadoc)
165         * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
166         */
167        protected void processAsync(MessageExchange exchange) throws Exception {
168            throw new IllegalStateException();
169        }
170        
171        /* (non-Javadoc)
172         * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
173         */
174        public void process(MessageExchange exchange) throws Exception {
175            // Three exchanges are involved: the first InOut will be called t0,
176            // the InOnly send will be called t1 and the InOnly received will be called t2
177    
178            if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
179                // Step1: receive t0 as the first message
180                if (exchange instanceof InOut && exchange.getStatus() == ExchangeStatus.ACTIVE) {
181                    MessageExchange t0 = exchange;
182                    MessageExchange t1;
183                    final String correlationId = (String) requestCorrId.evaluate(t0, t0.getMessage("in"));
184                    if (correlationId == null || correlationId.length() == 0) {
185                        throw new IllegalArgumentException("Could not retrieve correlation id for incoming exchange");
186                    }
187                    store.store(correlationId + ".t0", t0);
188                    t1 = useRobustInOnly ? getExchangeFactory().createRobustInOnlyExchange()
189                                         : getExchangeFactory().createInOnlyExchange();
190                    target.configureTarget(t1, getContext());
191                    MessageUtil.transferInToIn(t0, t1);
192                    t1.setProperty(responseCorrIdProperty, correlationId);
193                    t1.getMessage("in").setProperty(responseCorrIdProperty, correlationId);
194                    send(t1);
195                // Receive the done / error from t0
196                } else if (exchange instanceof InOut && exchange.getStatus() != ExchangeStatus.ACTIVE) {
197                    MessageExchange t0 = exchange;
198                    MessageExchange t1;
199                    MessageExchange t2;
200                    final String correlationId = (String) requestCorrId.evaluate(t0, t0.getMessage("in"));
201                    t1 = (MessageExchange) store.load(correlationId + ".t1");
202                    t2 = (MessageExchange) store.load(correlationId + ".t2");
203                    if (t1 != null) {
204                        done(t1);
205                    }
206                    if (t2 != null) {
207                        done(t2);
208                    }
209                // Receive the response from t2
210                } else if ((exchange instanceof InOnly || exchange instanceof RobustInOnly) && exchange.getStatus() == ExchangeStatus.ACTIVE) {
211                    MessageExchange t0;
212                    MessageExchange t2 = exchange;
213                    final String correlationId = (String) responseCorrId.evaluate(t2, t2.getMessage("in"));
214                    if (correlationId == null || correlationId.length() == 0) {
215                        throw new IllegalArgumentException("Could not retrieve correlation id for incoming exchange");
216                    }
217                    t0 = (MessageExchange) store.load(correlationId + ".t0");
218                    store.store(correlationId + ".t2", t2);
219                    // The request is found and has not timed out
220                    if (t0 != null) {
221                        MessageUtil.transferInToOut(t2, t0);
222                        send(t0);
223                    }
224                } else {
225                    throw new IllegalStateException();
226                }
227            // Handle an exchange as a CONSUMER
228            } else {
229                // Step 2: receive t1 response
230                // If this is an error or a fault, transfer it from t1 to t0 and send,
231                // else, start a timeout to wait for t2
232                MessageExchange t1 = exchange;
233                // an error
234                final String correlationId = (String) t1.getProperty(responseCorrIdProperty);
235                if (t1.getStatus() == ExchangeStatus.ERROR) {
236                    MessageExchange t0 = (MessageExchange) store.load(correlationId + ".t0");
237                    // t1 response may come after t0, so in case this happens, we need to discard t1
238                    if (t0 != null) {
239                        fail(t0, t1.getError());
240                    }
241                // a fault ?
242                } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
243                    MessageExchange t0 = (MessageExchange) store.load(correlationId + ".t0");
244                    // t1 response may come after t0, so in case this happens, we need to discard t1
245                    if (t0 != null) {
246                        store.store(correlationId + ".t1", t1);
247                        MessageUtil.transferFaultToFault(t1, t0);
248                        send(t0);
249                    }
250                // request sent successfully, start the timeout
251                } else {
252                    Date exchangeTimeout = getTimeout(t1);
253                    if (exchangeTimeout != null) {
254                        getTimerManager().schedule(new TimerListener() {
255                            public void timerExpired(Timer timer) {
256                                AsyncBridge.this.onTimeout(correlationId);
257                            }
258                        }, exchangeTimeout);
259                    }
260                }
261            }
262        }
263        
264        protected void onTimeout(String correlationId) {
265            try {
266                MessageExchange t0 = (MessageExchange) store.load(correlationId + ".t0");
267                if (t0 != null) {
268                    fail(t0, new TimeoutException());
269                }
270            } catch (Exception e) {
271                LOG.debug("Exception caught when handling timeout", e);
272            }
273        }
274        
275        protected Date getTimeout(MessageExchange exchange) {
276            if (timeout > 0) {
277                return new Date(System.currentTimeMillis() + timeout);
278            }
279            return null;
280        }
281    
282    }