001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.servicemix.eip.patterns;
018
019 import java.net.URI;
020
021 import javax.jbi.management.DeploymentException;
022 import javax.jbi.messaging.ExchangeStatus;
023 import javax.jbi.messaging.Fault;
024 import javax.jbi.messaging.InOnly;
025 import javax.jbi.messaging.InOut;
026 import javax.jbi.messaging.MessageExchange;
027 import javax.jbi.messaging.MessagingException;
028 import javax.jbi.messaging.NormalizedMessage;
029 import javax.jbi.messaging.RobustInOnly;
030 import javax.wsdl.Definition;
031
032 import org.apache.servicemix.common.util.MessageUtil;
033 import org.apache.servicemix.eip.EIPEndpoint;
034 import org.apache.servicemix.eip.support.ExchangeTarget;
035 import org.apache.servicemix.jbi.exception.FaultException;
036 import org.apache.servicemix.jbi.jaxp.SourceTransformer;
037
038 /**
039 * The Pipeline component is a bridge between an In-Only (or Robust-In-Only) MEP and
040 * an In-Out MEP.
041 * When the Pipeline receives an In-Only MEP, it will send the input in an In-Out MEP
042 * to the tranformer destination and forward the response in an In-Only MEP to the target
043 * destination.
044 * In addition, this component is fully asynchronous and uses an exchange store to provide
045 * full HA and recovery for clustered / persistent flows.
046 *
047 * @author gnodet
048 * @version $Revision: 376451 $
049 * @org.apache.xbean.XBean element="pipeline"
050 * description="A Pipeline"
051 */
052 public class Pipeline extends EIPEndpoint {
053
054 private static final String TRANSFORMER = "Pipeline.Transformer";
055
056 private static final String CONSUMER_MEP = "Pipeline.ConsumerMEP";
057
058 /**
059 * The adress of the in-out endpoint acting as a transformer
060 */
061 private ExchangeTarget transformer;
062
063 /**
064 * The address of the target endpoint
065 */
066 private ExchangeTarget target;
067
068 /**
069 * The addres of the endpoint to send faults to
070 */
071 private ExchangeTarget faultsTarget;
072
073 /**
074 * When the faultsTarget is not specified,
075 * faults may be sent to the target endpoint
076 * if this flag is set to <code>true</code>
077 */
078 private boolean sendFaultsToTarget;
079
080 /**
081 * The correlation property used by this component
082 */
083 private String correlationConsumer;
084
085 /**
086 * The correlation property used by this component
087 */
088 private String correlationTransformer;
089
090 /**
091 * The correlation property used by this component
092 */
093 private String correlationTarget;
094
095 /**
096 * Should message properties be copied ?
097 */
098 private boolean copyProperties;
099
100 /**
101 * Should message attachments be copied ?
102 */
103 private boolean copyAttachments;
104
105 /**
106 * @return Returns the target.
107 */
108 public ExchangeTarget getTarget() {
109 return target;
110 }
111
112 /**
113 * @param target The target to set.
114 */
115 public void setTarget(ExchangeTarget target) {
116 this.target = target;
117 }
118
119 /**
120 * @return the faultsTarget
121 */
122 public ExchangeTarget getFaultsTarget() {
123 return faultsTarget;
124 }
125
126 /**
127 * @param faultsTarget the faultsTarget to set
128 */
129 public void setFaultsTarget(ExchangeTarget faultsTarget) {
130 this.faultsTarget = faultsTarget;
131 }
132
133 /**
134 * @return the sendFaultsToTarget
135 */
136 public boolean isSendFaultsToTarget() {
137 return sendFaultsToTarget;
138 }
139
140 /**
141 * @param sendFaultsToTarget the sendFaultsToTarget to set
142 */
143 public void setSendFaultsToTarget(boolean sendFaultsToTarget) {
144 this.sendFaultsToTarget = sendFaultsToTarget;
145 }
146
147 /**
148 * @return Returns the transformer.
149 */
150 public ExchangeTarget getTransformer() {
151 return transformer;
152 }
153
154 /**
155 * @param transformer The transformer to set.
156 */
157 public void setTransformer(ExchangeTarget transformer) {
158 this.transformer = transformer;
159 }
160
161 public boolean isCopyProperties() {
162 return copyProperties;
163 }
164
165 public void setCopyProperties(boolean copyProperties) {
166 this.copyProperties = copyProperties;
167 }
168
169 public boolean isCopyAttachments() {
170 return copyAttachments;
171 }
172
173 public void setCopyAttachments(boolean copyAttachments) {
174 this.copyAttachments = copyAttachments;
175 }
176
177 /* (non-Javadoc)
178 * @see org.apache.servicemix.eip.EIPEndpoint#validate()
179 */
180 public void validate() throws DeploymentException {
181 super.validate();
182 // Check target
183 if (target == null) {
184 throw new IllegalArgumentException("target should be set to a valid ExchangeTarget");
185 }
186 // Check transformer
187 if (transformer == null) {
188 throw new IllegalArgumentException("transformer should be set to a valid ExchangeTarget");
189 }
190 // Create correlation properties
191 correlationConsumer = "Pipeline.Consumer." + getService() + "." + getEndpoint();
192 correlationTransformer = "Pipeline.Transformer." + getService() + "." + getEndpoint();
193 correlationTarget = "Pipeline.Target." + getService() + "." + getEndpoint();
194 }
195
196 /* (non-Javadoc)
197 * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
198 */
199 protected void processSync(MessageExchange exchange) throws Exception {
200 if (!(exchange instanceof InOnly)
201 && !(exchange instanceof RobustInOnly)) {
202 fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
203 return;
204 }
205 // Create exchange for target
206 InOut tme = getExchangeFactory().createInOutExchange();
207 transformer.configureTarget(tme, getContext());
208 // Send in to listener and target
209 MessageUtil.transferInToIn(exchange, tme);
210 sendSync(tme);
211 // Check result
212 if (tme.getStatus() == ExchangeStatus.DONE) {
213 throw new IllegalStateException("Received a DONE status from the transformer");
214 // Errors must be sent back to the consumer
215 } else if (tme.getStatus() == ExchangeStatus.ERROR) {
216 fail(exchange, tme.getError());
217 } else if (tme.getFault() != null) {
218 processFault(exchange, tme);
219 // This should not happen
220 } else if (tme.getOutMessage() == null) {
221 throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
222 + " but has no correlation set");
223 // This is the answer from the transformer
224 } else {
225 MessageExchange me = getExchangeFactory().createExchange(exchange.getPattern());
226 target.configureTarget(me, getContext());
227 MessageUtil.transferOutToIn(tme, me);
228 copyPropertiesAndAttachments(exchange.getMessage("in"), me.getMessage("in"));
229 sendSync(me);
230 done(tme);
231 if (me.getStatus() == ExchangeStatus.DONE) {
232 done(exchange);
233 } else if (me.getStatus() == ExchangeStatus.ERROR) {
234 fail(exchange, me.getError());
235 } else if (me.getFault() != null) {
236 if (exchange instanceof InOnly) {
237 // Do not use the fault has it may contain streams
238 // So just transform it to a string and send an error
239 String fault = new SourceTransformer().contentToString(me.getFault());
240 done(me);
241 fail(exchange, new FaultException(fault, null, null));
242 } else {
243 Fault fault = MessageUtil.copyFault(me);
244 MessageUtil.transferToFault(fault, exchange);
245 done(me);
246 sendSync(exchange);
247 }
248 } else {
249 throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
250 + " but has no correlation set");
251 }
252 }
253 }
254
255 private void processFault(MessageExchange exchange, InOut tme) throws Exception {
256 // Faults must be sent to the target / faultsTarget
257 if (faultsTarget != null || sendFaultsToTarget) {
258 MessageExchange me = getExchangeFactory().createExchange(exchange.getPattern());
259 (faultsTarget != null ? faultsTarget : target).configureTarget(me, getContext());
260 MessageUtil.transferToIn(tme.getFault(), me);
261 sendSync(me);
262 done(tme);
263 if (me.getStatus() == ExchangeStatus.DONE) {
264 done(exchange);
265 } else if (me.getStatus() == ExchangeStatus.ERROR) {
266 fail(exchange, me.getError());
267 } else if (me.getFault() != null) {
268 if (exchange instanceof InOnly) {
269 // Do not use the fault has it may contain streams
270 // So just transform it to a string and send an error
271 String fault = new SourceTransformer().contentToString(me.getFault());
272 done(me);
273 fail(exchange, new FaultException(fault, null, null));
274 } else {
275 Fault fault = MessageUtil.copyFault(me);
276 MessageUtil.transferToFault(fault, exchange);
277 done(me);
278 sendSync(exchange);
279 }
280 } else {
281 throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
282 + " but has no correlation set");
283 }
284 // Faults must be sent back to the consumer
285 } else {
286 if (exchange instanceof InOnly) {
287 // Do not use the fault has it may contain streams
288 // So just transform it to a string and send an error
289 String fault = new SourceTransformer().contentToString(tme.getFault());
290 done(tme);
291 fail(exchange, new FaultException(fault, null, null));
292 } else {
293 Fault fault = MessageUtil.copyFault(tme);
294 MessageUtil.transferToFault(fault, exchange);
295 done(tme);
296 sendSync(exchange);
297 }
298 }
299 }
300
301 /* (non-Javadoc)
302 * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
303 */
304 protected void processAsync(MessageExchange exchange) throws Exception {
305 // The exchange comes from the consumer
306 if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
307 processAsyncProvider(exchange);
308 // If the exchange comes from the transformer
309 } else if (Boolean.TRUE.equals(exchange.getProperty(TRANSFORMER))) {
310 processAsyncTransformerResponse(exchange);
311 // The exchange comes from the target
312 } else {
313 processAsyncTargetResponse(exchange);
314 }
315 }
316
317 private void processAsyncProvider(MessageExchange exchange) throws Exception {
318 // A DONE status from the consumer can only be received
319 // when a fault has been sent
320 if (exchange.getStatus() == ExchangeStatus.DONE) {
321 String transformerId = (String) exchange.getProperty(correlationTransformer);
322 String targetId = (String) exchange.getProperty(correlationTarget);
323 if (transformerId == null && targetId == null) {
324 throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE
325 + " but has no correlation set");
326 }
327 // Load the exchange
328 MessageExchange me = (MessageExchange) store.load(targetId != null ? targetId : transformerId);
329 done(me);
330 // Errors must be sent back to the target or transformer
331 } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
332 String transformerId = (String) exchange.getProperty(correlationTransformer);
333 String targetId = (String) exchange.getProperty(correlationTarget);
334 if (transformerId == null && targetId == null) {
335 throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE
336 + " but has no correlation set");
337 }
338 // Load the exchange
339 MessageExchange me = (MessageExchange) store.load(targetId != null ? targetId : transformerId);
340 fail(me, exchange.getError());
341 // This is a new exchange
342 } else if (exchange.getProperty(correlationTransformer) == null) {
343 if (!(exchange instanceof InOnly) && !(exchange instanceof RobustInOnly)) {
344 fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
345 return;
346 }
347 // Create exchange for target
348 MessageExchange tme = getExchangeFactory().createInOutExchange();
349 transformer.configureTarget(tme, getContext());
350 // Set correlations
351 exchange.setProperty(correlationTransformer, tme.getExchangeId());
352 tme.setProperty(correlationConsumer, exchange.getExchangeId());
353 tme.setProperty(TRANSFORMER, Boolean.TRUE);
354 tme.setProperty(CONSUMER_MEP, exchange.getPattern());
355 // Put exchange to store
356 store.store(exchange.getExchangeId(), exchange);
357 // Send in to listener and target
358 MessageUtil.transferInToIn(exchange, tme);
359 send(tme);
360 } else {
361 throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
362 + " but has no correlation set");
363 }
364 }
365
366 private void processAsyncTransformerResponse(MessageExchange exchange) throws Exception {
367 // Retrieve the correlation id
368 String consumerId = (String) exchange.getProperty(correlationConsumer);
369 if (consumerId == null) {
370 throw new IllegalStateException(correlationConsumer + " property not found");
371 }
372 // This should not happen beacause the MEP is an In-Out
373 // and the DONE status is always sent by the consumer (us)
374 if (exchange.getStatus() == ExchangeStatus.DONE) {
375 throw new IllegalStateException("Received a DONE status from the transformer");
376 // Errors must be sent back to the consumer
377 } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
378 MessageExchange me = (MessageExchange) store.load(consumerId);
379 fail(me, exchange.getError());
380 } else if (exchange.getFault() != null) {
381 // Faults must be sent to faultsTarget / target
382 if (faultsTarget != null || sendFaultsToTarget) {
383 // Retrieve the consumer MEP
384 URI mep = (URI) exchange.getProperty(CONSUMER_MEP);
385 if (mep == null) {
386 throw new IllegalStateException("Exchange does not carry the consumer MEP");
387 }
388 MessageExchange me = getExchangeFactory().createExchange(mep);
389 (faultsTarget != null ? faultsTarget : target).configureTarget(me, getContext());
390 me.setProperty(correlationConsumer, consumerId);
391 me.setProperty(correlationTransformer, exchange.getExchangeId());
392 store.store(exchange.getExchangeId(), exchange);
393 MessageUtil.transferToIn(exchange.getFault(), me);
394 send(me);
395 // Faults must be sent back to the consumer
396 } else {
397 MessageExchange me = (MessageExchange) store.load(consumerId);
398 if (me instanceof InOnly) {
399 // Do not use the fault has it may contain streams
400 // So just transform it to a string and send an error
401 String fault = new SourceTransformer().contentToString(exchange.getFault());
402 fail(me, new FaultException(fault, null, null));
403 done(exchange);
404 } else {
405 store.store(exchange.getExchangeId(), exchange);
406 MessageUtil.transferFaultToFault(exchange, me);
407 send(me);
408 }
409 }
410 // This is the answer from the transformer
411 } else if (exchange.getMessage("out") != null) {
412 // Retrieve the consumer MEP
413 URI mep = (URI) exchange.getProperty(CONSUMER_MEP);
414 if (mep == null) {
415 throw new IllegalStateException("Exchange does not carry the consumer MEP");
416 }
417 MessageExchange me = getExchangeFactory().createExchange(mep);
418 target.configureTarget(me, getContext());
419 me.setProperty(correlationConsumer, consumerId);
420 me.setProperty(correlationTransformer, exchange.getExchangeId());
421 store.store(exchange.getExchangeId(), exchange);
422 MessageUtil.transferOutToIn(exchange, me);
423 if (copyProperties || copyAttachments) {
424 MessageExchange cme = (MessageExchange) store.load(consumerId);
425 if (cme != null) {
426 NormalizedMessage cmeInMsg = cme.getMessage("in");
427 NormalizedMessage meInMsg = me.getMessage("in");
428 copyPropertiesAndAttachments(cmeInMsg, meInMsg);
429 store.store(consumerId, cme);
430 }
431 }
432 send(me);
433 // This should not happen
434 } else {
435 throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
436 + " but has no Out nor Fault message");
437 }
438 }
439
440 private void processAsyncTargetResponse(MessageExchange exchange) throws Exception {
441 // Retrieve the correlation id for the consumer
442 String consumerId = (String) exchange.getProperty(correlationConsumer);
443 if (consumerId == null) {
444 throw new IllegalStateException(correlationConsumer + " property not found");
445 }
446 // Retrieve the correlation id for the transformer
447 String transformerId = (String) exchange.getProperty(correlationTransformer);
448 if (transformerId == null) {
449 throw new IllegalStateException(correlationTransformer + " property not found");
450 }
451 // This should be the last message received
452 if (exchange.getStatus() == ExchangeStatus.DONE) {
453 // Need to ack the transformer
454 MessageExchange tme = (MessageExchange) store.load(transformerId);
455 done(tme);
456 // Need to ack the consumer
457 MessageExchange cme = (MessageExchange) store.load(consumerId);
458 done(cme);
459 // Errors should be sent back to the consumer
460 } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
461 // Need to ack the transformer
462 MessageExchange tme = (MessageExchange) store.load(transformerId);
463 done(tme);
464 // Send error to consumer
465 MessageExchange cme = (MessageExchange) store.load(consumerId);
466 fail(cme, exchange.getError());
467 // If we have a robust-in-only MEP, we can receive a fault
468 } else if (exchange.getFault() != null) {
469 // Need to ack the transformer
470 MessageExchange tme = (MessageExchange) store.load(transformerId);
471 done(tme);
472 // Send fault back to consumer
473 store.store(exchange.getExchangeId(), exchange);
474 MessageExchange cme = (MessageExchange) store.load(consumerId);
475 cme.setProperty(correlationTarget, exchange.getExchangeId());
476 MessageUtil.transferFaultToFault(exchange, cme);
477 send(cme);
478 // This should not happen
479 } else {
480 throw new IllegalStateException("Exchange from target has a " + ExchangeStatus.ACTIVE
481 + " status but has no Fault message");
482 }
483 }
484
485 protected Definition getDefinitionFromWsdlExchangeTarget() {
486 Definition rc = super.getDefinitionFromWsdlExchangeTarget();
487 if (rc != null) {
488 // TODO: This components wsdl is == transformer wsdl without the out message.
489 // need to massage the result wsdl so that it described an in only exchange
490 }
491 return rc;
492 }
493
494 /**
495 * Copies properties and attachments from one message to another
496 * depending on the endpoint configuration
497 *
498 * @param from the message containing the properties and attachments
499 * @param to the destination message where the properties and attachments are set
500 */
501 private void copyPropertiesAndAttachments(NormalizedMessage from, NormalizedMessage to) throws MessagingException {
502 if (copyProperties) {
503 copyProperties(from, to);
504 }
505 if (copyAttachments) {
506 copyAttachments(from, to);
507 }
508 }
509
510 }