001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.servicemix.eip.patterns;
018
019 import java.util.Date;
020 import java.util.concurrent.TimeoutException;
021
022 import javax.jbi.messaging.ExchangeStatus;
023 import javax.jbi.messaging.InOnly;
024 import javax.jbi.messaging.InOut;
025 import javax.jbi.messaging.MessageExchange;
026 import javax.jbi.messaging.MessagingException;
027 import javax.jbi.messaging.NormalizedMessage;
028 import javax.jbi.messaging.RobustInOnly;
029
030 import org.apache.commons.logging.Log;
031 import org.apache.commons.logging.LogFactory;
032 import org.apache.servicemix.common.util.MessageUtil;
033 import org.apache.servicemix.eip.EIPEndpoint;
034 import org.apache.servicemix.eip.support.ExchangeTarget;
035 import org.apache.servicemix.expression.Expression;
036 import org.apache.servicemix.expression.PropertyExpression;
037 import org.apache.servicemix.timers.Timer;
038 import org.apache.servicemix.timers.TimerListener;
039
040 /**
041 *
042 * @author gnodet
043 * @org.apache.xbean.XBean element="async-bridge"
044 */
045 public class AsyncBridge extends EIPEndpoint {
046
047 public static final String CORRID = "org.apache.servicemix.eip.asyncbridge.corrid";
048
049 private static final Log LOG = LogFactory.getLog(AsyncBridge.class);
050
051 private Expression requestCorrId = new Expression() {
052 public Object evaluate(MessageExchange exchange, NormalizedMessage message) throws MessagingException {
053 return exchange.getExchangeId();
054 }
055 };
056 private String responseCorrIdProperty = CORRID;
057 private Expression responseCorrId;
058 private long timeout;
059 private ExchangeTarget target;
060 private boolean useRobustInOnly;
061
062 /**
063 * @return the timeout
064 */
065 public long getTimeout() {
066 return timeout;
067 }
068
069 /**
070 * @param timeout the timeout to set
071 */
072 public void setTimeout(long timeout) {
073 this.timeout = timeout;
074 }
075
076 /**
077 * @return the target
078 */
079 public ExchangeTarget getTarget() {
080 return target;
081 }
082
083 /**
084 * @param target the target to set
085 */
086 public void setTarget(ExchangeTarget target) {
087 this.target = target;
088 }
089
090 /**
091 * @return the requestCorrId
092 */
093 public Expression getRequestCorrId() {
094 return requestCorrId;
095 }
096
097 /**
098 * @param requestCorrId the requestCorrId to set
099 */
100 public void setRequestCorrId(Expression requestCorrId) {
101 this.requestCorrId = requestCorrId;
102 }
103
104 /**
105 * @return the responseCorrIdProperty
106 */
107 public String getResponseCorrIdProperty() {
108 return responseCorrIdProperty;
109 }
110
111 /**
112 * @param responseCorrIdProperty the responseCorrIdProperty to set
113 */
114 public void setResponseCorrIdProperty(String responseCorrIdProperty) {
115 this.responseCorrIdProperty = responseCorrIdProperty;
116 }
117
118 /**
119 * @return the responseCorrId
120 */
121 public Expression getResponseCorrId() {
122 return responseCorrId;
123 }
124
125 /**
126 * @param responseCorrId the responseCorrId to set
127 */
128 public void setResponseCorrId(Expression responseCorrId) {
129 this.responseCorrId = responseCorrId;
130 }
131
132 /**
133 * @return the useRobustInOnly
134 */
135 public boolean isUseRobustInOnly() {
136 return useRobustInOnly;
137 }
138
139 /**
140 * @param useRobustInOnly the useRobustInOnly to set
141 */
142 public void setUseRobustInOnly(boolean useRobustInOnly) {
143 this.useRobustInOnly = useRobustInOnly;
144 }
145
146 /*
147 * (non-Javadoc)
148 * @see org.apache.servicemix.eip.EIPEndpoint#start()
149 */
150 public void start() throws Exception {
151 super.start();
152 if (responseCorrId == null) {
153 responseCorrId = new PropertyExpression(responseCorrIdProperty);
154 }
155 }
156
157 /* (non-Javadoc)
158 * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
159 */
160 protected void processSync(MessageExchange exchange) throws Exception {
161 throw new IllegalStateException();
162 }
163
164 /* (non-Javadoc)
165 * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
166 */
167 protected void processAsync(MessageExchange exchange) throws Exception {
168 throw new IllegalStateException();
169 }
170
171 /* (non-Javadoc)
172 * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
173 */
174 public void process(MessageExchange exchange) throws Exception {
175 // Three exchanges are involved: the first InOut will be called t0,
176 // the InOnly send will be called t1 and the InOnly received will be called t2
177
178 if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
179 // Step1: receive t0 as the first message
180 if (exchange instanceof InOut && exchange.getStatus() == ExchangeStatus.ACTIVE) {
181 MessageExchange t0 = exchange;
182 MessageExchange t1;
183 final String correlationId = (String) requestCorrId.evaluate(t0, t0.getMessage("in"));
184 if (correlationId == null || correlationId.length() == 0) {
185 throw new IllegalArgumentException("Could not retrieve correlation id for incoming exchange");
186 }
187 store.store(correlationId + ".t0", t0);
188 t1 = useRobustInOnly ? getExchangeFactory().createRobustInOnlyExchange()
189 : getExchangeFactory().createInOnlyExchange();
190 target.configureTarget(t1, getContext());
191 MessageUtil.transferInToIn(t0, t1);
192 t1.setProperty(responseCorrIdProperty, correlationId);
193 t1.getMessage("in").setProperty(responseCorrIdProperty, correlationId);
194 send(t1);
195 // Receive the done / error from t0
196 } else if (exchange instanceof InOut && exchange.getStatus() != ExchangeStatus.ACTIVE) {
197 MessageExchange t0 = exchange;
198 MessageExchange t1;
199 MessageExchange t2;
200 final String correlationId = (String) requestCorrId.evaluate(t0, t0.getMessage("in"));
201 t1 = (MessageExchange) store.load(correlationId + ".t1");
202 t2 = (MessageExchange) store.load(correlationId + ".t2");
203 if (t1 != null) {
204 done(t1);
205 }
206 if (t2 != null) {
207 done(t2);
208 }
209 // Receive the response from t2
210 } else if ((exchange instanceof InOnly || exchange instanceof RobustInOnly) && exchange.getStatus() == ExchangeStatus.ACTIVE) {
211 MessageExchange t0;
212 MessageExchange t2 = exchange;
213 final String correlationId = (String) responseCorrId.evaluate(t2, t2.getMessage("in"));
214 if (correlationId == null || correlationId.length() == 0) {
215 throw new IllegalArgumentException("Could not retrieve correlation id for incoming exchange");
216 }
217 t0 = (MessageExchange) store.load(correlationId + ".t0");
218 store.store(correlationId + ".t2", t2);
219 // The request is found and has not timed out
220 if (t0 != null) {
221 MessageUtil.transferInToOut(t2, t0);
222 send(t0);
223 }
224 } else {
225 throw new IllegalStateException();
226 }
227 // Handle an exchange as a CONSUMER
228 } else {
229 // Step 2: receive t1 response
230 // If this is an error or a fault, transfer it from t1 to t0 and send,
231 // else, start a timeout to wait for t2
232 MessageExchange t1 = exchange;
233 // an error
234 final String correlationId = (String) t1.getProperty(responseCorrIdProperty);
235 if (t1.getStatus() == ExchangeStatus.ERROR) {
236 MessageExchange t0 = (MessageExchange) store.load(correlationId + ".t0");
237 // t1 response may come after t0, so in case this happens, we need to discard t1
238 if (t0 != null) {
239 fail(t0, t1.getError());
240 }
241 // a fault ?
242 } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
243 MessageExchange t0 = (MessageExchange) store.load(correlationId + ".t0");
244 // t1 response may come after t0, so in case this happens, we need to discard t1
245 if (t0 != null) {
246 store.store(correlationId + ".t1", t1);
247 MessageUtil.transferFaultToFault(t1, t0);
248 send(t0);
249 }
250 // request sent successfully, start the timeout
251 } else {
252 Date exchangeTimeout = getTimeout(t1);
253 if (exchangeTimeout != null) {
254 getTimerManager().schedule(new TimerListener() {
255 public void timerExpired(Timer timer) {
256 AsyncBridge.this.onTimeout(correlationId);
257 }
258 }, exchangeTimeout);
259 }
260 }
261 }
262 }
263
264 protected void onTimeout(String correlationId) {
265 try {
266 MessageExchange t0 = (MessageExchange) store.load(correlationId + ".t0");
267 if (t0 != null) {
268 fail(t0, new TimeoutException());
269 }
270 } catch (Exception e) {
271 LOG.debug("Exception caught when handling timeout", e);
272 }
273 }
274
275 protected Date getTimeout(MessageExchange exchange) {
276 if (timeout > 0) {
277 return new Date(System.currentTimeMillis() + timeout);
278 }
279 return null;
280 }
281
282 }