001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.servicemix.eip.patterns;
018    
019    import java.net.URI;
020    
021    import javax.jbi.management.DeploymentException;
022    import javax.jbi.messaging.ExchangeStatus;
023    import javax.jbi.messaging.Fault;
024    import javax.jbi.messaging.InOnly;
025    import javax.jbi.messaging.InOut;
026    import javax.jbi.messaging.MessageExchange;
027    import javax.jbi.messaging.MessagingException;
028    import javax.jbi.messaging.NormalizedMessage;
029    import javax.jbi.messaging.RobustInOnly;
030    import javax.wsdl.Definition;
031    
032    import org.apache.servicemix.common.util.MessageUtil;
033    import org.apache.servicemix.eip.EIPEndpoint;
034    import org.apache.servicemix.eip.support.ExchangeTarget;
035    import org.apache.servicemix.jbi.exception.FaultException;
036    import org.apache.servicemix.jbi.jaxp.SourceTransformer;
037    
038    /**
039     * The Pipeline component is a bridge between an In-Only (or Robust-In-Only) MEP and
040     * an In-Out MEP.
041     * When the Pipeline receives an In-Only MEP, it will send the input in an In-Out MEP 
042     * to the tranformer destination and forward the response in an In-Only MEP to the target 
043     * destination.
044     * In addition, this component is fully asynchronous and uses an exchange store to provide
045     * full HA and recovery for clustered / persistent flows. 
046     *  
047     * @author gnodet
048     * @version $Revision: 376451 $
049     * @org.apache.xbean.XBean element="pipeline"
050     *                  description="A Pipeline"
051     */
052    public class Pipeline extends EIPEndpoint {
053    
054        private static final String TRANSFORMER = "Pipeline.Transformer";
055        
056        private static final String CONSUMER_MEP = "Pipeline.ConsumerMEP";
057    
058        /**
059         * The adress of the in-out endpoint acting as a transformer
060         */
061        private ExchangeTarget transformer;
062    
063        /**
064         * The address of the target endpoint
065         */
066        private ExchangeTarget target;
067        
068        /**
069         * The addres of the endpoint to send faults to
070         */
071        private ExchangeTarget faultsTarget;
072        
073        /**
074         * When the faultsTarget is not specified,
075         * faults may be sent to the target endpoint
076         * if this flag is set to <code>true</code>
077         */
078        private boolean sendFaultsToTarget;
079    
080        /**
081         * The correlation property used by this component
082         */
083        private String correlationConsumer;
084    
085        /**
086         * The correlation property used by this component
087         */
088        private String correlationTransformer;
089    
090        /**
091         * The correlation property used by this component
092         */
093        private String correlationTarget;
094    
095        /**
096         * Should message properties be copied ?
097         */
098        private boolean copyProperties;
099    
100        /**
101         * Should message attachments be copied ?
102         */
103        private boolean copyAttachments;
104    
105        /**
106         * @return Returns the target.
107         */
108        public ExchangeTarget getTarget() {
109            return target;
110        }
111    
112        /**
113         * @param target The target to set.
114         */
115        public void setTarget(ExchangeTarget target) {
116            this.target = target;
117        }
118    
119        /**
120         * @return the faultsTarget
121         */
122        public ExchangeTarget getFaultsTarget() {
123            return faultsTarget;
124        }
125    
126        /**
127         * @param faultsTarget the faultsTarget to set
128         */
129        public void setFaultsTarget(ExchangeTarget faultsTarget) {
130            this.faultsTarget = faultsTarget;
131        }
132    
133        /**
134         * @return the sendFaultsToTarget
135         */
136        public boolean isSendFaultsToTarget() {
137            return sendFaultsToTarget;
138        }
139    
140        /**
141         * @param sendFaultsToTarget the sendFaultsToTarget to set
142         */
143        public void setSendFaultsToTarget(boolean sendFaultsToTarget) {
144            this.sendFaultsToTarget = sendFaultsToTarget;
145        }
146    
147        /**
148         * @return Returns the transformer.
149         */
150        public ExchangeTarget getTransformer() {
151            return transformer;
152        }
153    
154        /**
155         * @param transformer The transformer to set.
156         */
157        public void setTransformer(ExchangeTarget transformer) {
158            this.transformer = transformer;
159        }
160    
161        public boolean isCopyProperties() {
162            return copyProperties;
163        }
164    
165        public void setCopyProperties(boolean copyProperties) {
166            this.copyProperties = copyProperties;
167        }
168    
169        public boolean isCopyAttachments() {
170            return copyAttachments;
171        }
172    
173        public void setCopyAttachments(boolean copyAttachments) {
174            this.copyAttachments = copyAttachments;
175        }
176    
177        /* (non-Javadoc)
178         * @see org.apache.servicemix.eip.EIPEndpoint#validate()
179         */
180        public void validate() throws DeploymentException {
181            super.validate();
182            // Check target
183            if (target == null) {
184                throw new IllegalArgumentException("target should be set to a valid ExchangeTarget");
185            }
186            // Check transformer
187            if (transformer == null) {
188                throw new IllegalArgumentException("transformer should be set to a valid ExchangeTarget");
189            }
190            // Create correlation properties
191            correlationConsumer = "Pipeline.Consumer." + getService() + "." + getEndpoint();
192            correlationTransformer = "Pipeline.Transformer." + getService() + "." + getEndpoint();
193            correlationTarget = "Pipeline.Target." + getService() + "." + getEndpoint();
194        }
195    
196        /* (non-Javadoc)
197         * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
198         */
199        protected void processSync(MessageExchange exchange) throws Exception {
200            if (!(exchange instanceof InOnly)
201                && !(exchange instanceof RobustInOnly)) {
202                fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
203                return;
204            }
205            // Create exchange for target
206            InOut tme = getExchangeFactory().createInOutExchange();
207            transformer.configureTarget(tme, getContext());
208            // Send in to listener and target
209            MessageUtil.transferInToIn(exchange, tme);
210            sendSync(tme);
211            // Check result
212            if (tme.getStatus() == ExchangeStatus.DONE) {
213                throw new IllegalStateException("Received a DONE status from the transformer");
214            // Errors must be sent back to the consumer
215            } else if (tme.getStatus() == ExchangeStatus.ERROR) {
216                fail(exchange, tme.getError());
217            } else if (tme.getFault() != null) {
218                processFault(exchange, tme);
219            // This should not happen
220            } else if (tme.getOutMessage() == null) {
221                throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
222                        + " but has no correlation set");
223            // This is the answer from the transformer
224            } else {
225                MessageExchange me = getExchangeFactory().createExchange(exchange.getPattern());
226                target.configureTarget(me, getContext());
227                MessageUtil.transferOutToIn(tme, me);
228                copyPropertiesAndAttachments(exchange.getMessage("in"), me.getMessage("in"));
229                sendSync(me);
230                done(tme);
231                if (me.getStatus() == ExchangeStatus.DONE) {
232                    done(exchange);
233                } else if (me.getStatus() == ExchangeStatus.ERROR) {
234                    fail(exchange, me.getError());
235                } else if (me.getFault() != null) {
236                    if (exchange instanceof InOnly) {
237                        // Do not use the fault has it may contain streams
238                        // So just transform it to a string and send an error
239                        String fault = new SourceTransformer().contentToString(me.getFault());
240                        done(me);
241                        fail(exchange, new FaultException(fault, null, null));
242                    } else {
243                        Fault fault = MessageUtil.copyFault(me);
244                        MessageUtil.transferToFault(fault, exchange);
245                        done(me);
246                        sendSync(exchange);
247                    }
248                } else {
249                    throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
250                            + " but has no correlation set");
251                }
252            }
253        }
254    
255        private void processFault(MessageExchange exchange, InOut tme) throws Exception {
256            // Faults must be sent to the target / faultsTarget
257            if (faultsTarget != null || sendFaultsToTarget) {
258                MessageExchange me = getExchangeFactory().createExchange(exchange.getPattern());
259                (faultsTarget != null ? faultsTarget : target).configureTarget(me, getContext());
260                MessageUtil.transferToIn(tme.getFault(), me);
261                sendSync(me);
262                done(tme);
263                if (me.getStatus() == ExchangeStatus.DONE) {
264                    done(exchange);
265                } else if (me.getStatus() == ExchangeStatus.ERROR) {
266                    fail(exchange, me.getError());
267                } else if (me.getFault() != null) {
268                    if (exchange instanceof InOnly) {
269                        // Do not use the fault has it may contain streams
270                        // So just transform it to a string and send an error
271                        String fault = new SourceTransformer().contentToString(me.getFault());
272                        done(me);
273                        fail(exchange, new FaultException(fault, null, null));
274                    } else {
275                        Fault fault = MessageUtil.copyFault(me);
276                        MessageUtil.transferToFault(fault, exchange);
277                        done(me);
278                        sendSync(exchange);
279                    }
280                } else {
281                    throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
282                            + " but has no correlation set");
283                }
284            // Faults must be sent back to the consumer
285            } else {
286                if (exchange instanceof InOnly) {
287                    // Do not use the fault has it may contain streams
288                    // So just transform it to a string and send an error
289                    String fault = new SourceTransformer().contentToString(tme.getFault());
290                    done(tme);
291                    fail(exchange, new FaultException(fault, null, null));
292                } else {
293                    Fault fault = MessageUtil.copyFault(tme);
294                    MessageUtil.transferToFault(fault, exchange);
295                    done(tme);
296                    sendSync(exchange);
297                }
298            }
299        }
300    
301        /* (non-Javadoc)
302         * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
303         */
304        protected void processAsync(MessageExchange exchange) throws Exception {
305            // The exchange comes from the consumer
306            if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
307                processAsyncProvider(exchange);
308            // If the exchange comes from the transformer
309            } else if (Boolean.TRUE.equals(exchange.getProperty(TRANSFORMER))) {
310                processAsyncTransformerResponse(exchange);
311            // The exchange comes from the target
312            } else {
313                processAsyncTargetResponse(exchange);
314            }
315        }
316        
317        private void processAsyncProvider(MessageExchange exchange) throws Exception {
318            // A DONE status from the consumer can only be received
319            // when a fault has been sent
320            if (exchange.getStatus() == ExchangeStatus.DONE) {
321                String transformerId = (String) exchange.getProperty(correlationTransformer);
322                String targetId = (String) exchange.getProperty(correlationTarget);
323                if (transformerId == null && targetId == null) {
324                    throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE
325                            + " but has no correlation set");
326                }
327                // Load the exchange
328                MessageExchange me = (MessageExchange) store.load(targetId != null ? targetId : transformerId);
329                done(me);
330            // Errors must be sent back to the target or transformer
331            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
332                String transformerId = (String) exchange.getProperty(correlationTransformer);
333                String targetId = (String) exchange.getProperty(correlationTarget);
334                if (transformerId == null && targetId == null) {
335                    throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE
336                            + " but has no correlation set");
337                }
338                // Load the exchange
339                MessageExchange me = (MessageExchange) store.load(targetId != null ? targetId : transformerId);
340                fail(me, exchange.getError());
341            // This is a new exchange
342            } else if (exchange.getProperty(correlationTransformer) == null) {
343                if (!(exchange instanceof InOnly) && !(exchange instanceof RobustInOnly)) {
344                    fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
345                    return;
346                }
347                // Create exchange for target
348                MessageExchange tme = getExchangeFactory().createInOutExchange();
349                transformer.configureTarget(tme, getContext());
350                // Set correlations
351                exchange.setProperty(correlationTransformer, tme.getExchangeId());
352                tme.setProperty(correlationConsumer, exchange.getExchangeId());
353                tme.setProperty(TRANSFORMER, Boolean.TRUE);
354                tme.setProperty(CONSUMER_MEP, exchange.getPattern());
355                // Put exchange to store
356                store.store(exchange.getExchangeId(), exchange);
357                // Send in to listener and target
358                MessageUtil.transferInToIn(exchange, tme);
359                send(tme);
360            } else {
361                throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
362                        + " but has no correlation set");
363            }
364        }
365    
366        private void processAsyncTransformerResponse(MessageExchange exchange) throws Exception {
367            // Retrieve the correlation id
368            String consumerId = (String) exchange.getProperty(correlationConsumer);
369            if (consumerId == null) {
370                throw new IllegalStateException(correlationConsumer + " property not found");
371            }
372            // This should not happen beacause the MEP is an In-Out
373            // and the DONE status is always sent by the consumer (us)
374            if (exchange.getStatus() == ExchangeStatus.DONE) {
375                throw new IllegalStateException("Received a DONE status from the transformer");
376            // Errors must be sent back to the consumer
377            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
378                MessageExchange me = (MessageExchange) store.load(consumerId);
379                fail(me, exchange.getError());
380            } else if (exchange.getFault() != null) {
381                // Faults must be sent to faultsTarget / target
382                if (faultsTarget != null || sendFaultsToTarget) {
383                    // Retrieve the consumer MEP
384                    URI mep = (URI) exchange.getProperty(CONSUMER_MEP);
385                    if (mep == null) {
386                        throw new IllegalStateException("Exchange does not carry the consumer MEP");
387                    }
388                    MessageExchange me = getExchangeFactory().createExchange(mep);
389                    (faultsTarget != null ? faultsTarget : target).configureTarget(me, getContext());
390                    me.setProperty(correlationConsumer, consumerId);
391                    me.setProperty(correlationTransformer, exchange.getExchangeId());
392                    store.store(exchange.getExchangeId(), exchange);
393                    MessageUtil.transferToIn(exchange.getFault(), me);
394                    send(me);
395                // Faults must be sent back to the consumer
396                } else {
397                    MessageExchange me = (MessageExchange) store.load(consumerId);
398                    if (me instanceof InOnly) {
399                        // Do not use the fault has it may contain streams
400                        // So just transform it to a string and send an error
401                        String fault = new SourceTransformer().contentToString(exchange.getFault());
402                        fail(me, new FaultException(fault, null, null));
403                        done(exchange);
404                    } else {
405                        store.store(exchange.getExchangeId(), exchange);
406                        MessageUtil.transferFaultToFault(exchange, me);
407                        send(me);
408                    }
409                }
410            // This is the answer from the transformer
411            } else if (exchange.getMessage("out") != null) {
412                // Retrieve the consumer MEP
413                URI mep = (URI) exchange.getProperty(CONSUMER_MEP);
414                if (mep == null) {
415                    throw new IllegalStateException("Exchange does not carry the consumer MEP");
416                }
417                MessageExchange me = getExchangeFactory().createExchange(mep);
418                target.configureTarget(me, getContext());
419                me.setProperty(correlationConsumer, consumerId);
420                me.setProperty(correlationTransformer, exchange.getExchangeId());
421                store.store(exchange.getExchangeId(), exchange);
422                MessageUtil.transferOutToIn(exchange, me);
423                if (copyProperties || copyAttachments) {
424                    MessageExchange cme = (MessageExchange) store.load(consumerId);
425                    if (cme != null) {
426                        NormalizedMessage cmeInMsg = cme.getMessage("in");
427                        NormalizedMessage meInMsg = me.getMessage("in");
428                        copyPropertiesAndAttachments(cmeInMsg, meInMsg);
429                        store.store(consumerId, cme);
430                    }
431                }
432                send(me);
433            // This should not happen
434            } else {
435                throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
436                        + " but has no Out nor Fault message");
437            }
438        }
439    
440        private void processAsyncTargetResponse(MessageExchange exchange) throws Exception {
441            // Retrieve the correlation id for the consumer
442            String consumerId = (String) exchange.getProperty(correlationConsumer);
443            if (consumerId == null) {
444                throw new IllegalStateException(correlationConsumer + " property not found");
445            }
446            // Retrieve the correlation id for the transformer
447            String transformerId = (String) exchange.getProperty(correlationTransformer);
448            if (transformerId == null) {
449                throw new IllegalStateException(correlationTransformer + " property not found");
450            }
451            // This should be the last message received
452            if (exchange.getStatus() == ExchangeStatus.DONE) {
453                // Need to ack the transformer
454                MessageExchange tme = (MessageExchange) store.load(transformerId);
455                done(tme);
456                // Need to ack the consumer
457                MessageExchange cme = (MessageExchange) store.load(consumerId);
458                done(cme);
459            // Errors should be sent back to the consumer
460            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
461                // Need to ack the transformer
462                MessageExchange tme = (MessageExchange) store.load(transformerId);
463                done(tme);
464                // Send error to consumer
465                MessageExchange cme = (MessageExchange) store.load(consumerId);
466                fail(cme, exchange.getError());
467            // If we have a robust-in-only MEP, we can receive a fault
468            } else if (exchange.getFault() != null) {
469                // Need to ack the transformer
470                MessageExchange tme = (MessageExchange) store.load(transformerId);
471                done(tme);
472                // Send fault back to consumer
473                store.store(exchange.getExchangeId(), exchange);
474                MessageExchange cme = (MessageExchange) store.load(consumerId);
475                cme.setProperty(correlationTarget, exchange.getExchangeId());
476                MessageUtil.transferFaultToFault(exchange, cme);
477                send(cme);
478            // This should not happen
479            } else {
480                throw new IllegalStateException("Exchange from target has a " + ExchangeStatus.ACTIVE
481                        + " status but has no Fault message");
482            }
483        }
484    
485        protected Definition getDefinitionFromWsdlExchangeTarget() {
486            Definition rc = super.getDefinitionFromWsdlExchangeTarget();
487            if (rc != null) {
488                // TODO: This components wsdl is == transformer wsdl without the out message.
489                // need to massage the result wsdl so that it described an in only exchange
490            }
491            return rc;
492        }
493    
494        /**
495         * Copies properties and attachments from one message to another
496         * depending on the endpoint configuration
497         *
498         * @param from the message containing the properties and attachments
499         * @param to the destination message where the properties and attachments are set
500         */
501        private void copyPropertiesAndAttachments(NormalizedMessage from, NormalizedMessage to) throws MessagingException {
502            if (copyProperties) {
503                copyProperties(from, to);
504            }
505            if (copyAttachments) {
506                copyAttachments(from, to);
507            }
508        }
509    
510    }