001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.servicemix.eip.patterns;
018    
019    import javax.jbi.management.DeploymentException;
020    import javax.jbi.messaging.ExchangeStatus;
021    import javax.jbi.messaging.Fault;
022    import javax.jbi.messaging.InOut;
023    import javax.jbi.messaging.MessageExchange;
024    
025    import org.apache.servicemix.eip.EIPEndpoint;
026    import org.apache.servicemix.eip.support.ExchangeTarget;
027    import org.apache.servicemix.common.util.MessageUtil;
028    
029    /**
030     * A RoutingSlip component can be used to route an incoming In-Out exchange
031     * through a series of target services.
032     * This component implements the 
033     * <a href="http://www.enterpriseintegrationpatterns.com/RoutingTable.html">Routing Slip</a> 
034     * pattern, with the limitation that the routing table is static.
035     * This component only uses In-Out MEPs and errors or faults sent by targets are reported
036     * back to the consumer, thus interrupting the routing process.
037     * In addition, this component is fully asynchronous and uses an exchange store to provide
038     * full HA and recovery for clustered / persistent flows. 
039     *  
040     * @author gnodet
041     * @version $Revision: 376451 $
042     * @org.apache.xbean.XBean element="static-routing-slip"
043     *                  description="A static Routing Slip"
044     */
045    public class StaticRoutingSlip extends EIPEndpoint {
046    
047        /**
048         * List of target components used in the RoutingSlip
049         */
050        private ExchangeTarget[] targets;
051        /**
052         * The correlation property used by this component
053         */
054        private String correlation;
055        /**
056         * The current index of the target 
057         */
058        private String index;
059        /**
060         * The id of the previous target exchange 
061         */
062        private String previous;
063        
064        /**
065         * @return Returns the targets.
066         */
067        public ExchangeTarget[] getTargets() {
068            return targets;
069        }
070    
071        /**
072         * @param targets The targets to set.
073         */
074        public void setTargets(ExchangeTarget[] targets) {
075            this.targets = targets;
076        }
077    
078        /* (non-Javadoc)
079         * @see org.apache.servicemix.eip.EIPEndpoint#validate()
080         */
081        public void validate() throws DeploymentException {
082            super.validate();
083            // Check target
084            if (targets == null || targets.length == 0) {
085                throw new IllegalArgumentException("targets should contain at least one ExchangeTarget");
086            }
087            // Create correlation properties
088            correlation = "RoutingSlip.Correlation." + getService() + "." + getEndpoint();
089            index = "RoutingSlip.Index." + getService() + "." + getEndpoint();
090            previous = "RoutingSlip.Previous." + getService() + "." + getEndpoint();
091        }
092    
093        /* (non-Javadoc)
094         * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
095         */
096        protected void processSync(MessageExchange exchange) throws Exception {
097            if (!(exchange instanceof InOut)) {
098                throw new IllegalStateException("Use an InOut MEP");
099            }
100            MessageExchange current = exchange;
101            for (int i = 0; i < targets.length; i++) {
102                InOut me = getExchangeFactory().createInOutExchange();
103                targets[i].configureTarget(me, getContext());
104                if (i == 0) {
105                    MessageUtil.transferInToIn(current, me);
106                } else {
107                    MessageUtil.transferOutToIn(current, me);
108                }
109                sendSync(me);
110                if (i != 0) {
111                    done(current);
112                }
113                if (me.getStatus() == ExchangeStatus.DONE) {
114                    throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE);
115                } else if (me.getStatus() == ExchangeStatus.ERROR) {
116                    fail(exchange, me.getError());
117                    return;
118                } else if (me.getFault() != null) {
119                    Fault fault = MessageUtil.copyFault(me);
120                    MessageUtil.transferToFault(fault, exchange);
121                    done(me);
122                    sendSync(exchange);
123                    return;
124                } else if (me.getOutMessage() == null) {
125                    throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
126                            + " but has no Out nor Fault message");
127                }
128                current = me;
129            }
130            MessageUtil.transferToOut(MessageUtil.copyOut(current), exchange);
131            done(current);
132            sendSync(exchange);
133        }
134    
135        /* (non-Javadoc)
136         * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
137         */
138        protected void processAsync(MessageExchange exchange) throws Exception {
139            // This exchange comes from the consumer
140            if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
141                processProviderAsync(exchange);
142            // The exchange comes from a target
143            } else {
144                processConsumerAsync(exchange);
145            }
146        }
147    
148        protected void processProviderAsync(MessageExchange exchange) throws Exception {
149            if (exchange.getStatus() == ExchangeStatus.DONE) {
150                String correlationId = (String) exchange.getProperty(correlation);
151                if (correlationId == null) {
152                    throw new IllegalStateException(correlation + " property not found");
153                }
154                // Ack last target hit
155                MessageExchange me = (MessageExchange) store.load(correlationId);
156                done(me);
157            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
158                String correlationId = (String) exchange.getProperty(correlation);
159                if (correlationId == null) {
160                    throw new IllegalStateException(correlation + " property not found");
161                }
162                // Ack last target hit
163                MessageExchange me = (MessageExchange) store.load(correlationId);
164                done(me);
165            } else if (!(exchange instanceof InOut)) {
166                throw new IllegalStateException("Use an InOut MEP");
167            } else {
168                MessageExchange me = getExchangeFactory().createInOutExchange();
169                me.setProperty(correlation, exchange.getExchangeId());
170                me.setProperty(index, new Integer(0));
171                targets[0].configureTarget(me, getContext());
172                store.store(exchange.getExchangeId(), exchange);
173                MessageUtil.transferInToIn(exchange, me);
174                send(me);
175            }
176        }
177    
178        private void processConsumerAsync(MessageExchange exchange) throws Exception {
179            String correlationId = (String) exchange.getProperty(correlation);
180            String previousId = (String) exchange.getProperty(previous);
181            Integer prevIndex = (Integer) exchange.getProperty(index);
182            if (correlationId == null) {
183                throw new IllegalStateException(correlation + " property not found");
184            }
185            if (prevIndex == null) {
186                throw new IllegalStateException(previous + " property not found");
187            }
188            // This should never happen, as we can only send DONE
189            if (exchange.getStatus() == ExchangeStatus.DONE) {
190                throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE);
191            // ERROR are sent back to the consumer
192            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
193                MessageExchange me = (MessageExchange) store.load(correlationId);
194                fail(me, exchange.getError());
195                // Ack the previous target
196                if (previousId != null) {
197                    me = (MessageExchange) store.load(previousId);
198                    done(me);
199                }
200            // Faults are sent back to the consumer
201            } else if (exchange.getFault() != null) {
202                MessageExchange me = (MessageExchange) store.load(correlationId);
203                me.setProperty(correlation, exchange.getExchangeId());
204                store.store(exchange.getExchangeId(), exchange);
205                MessageUtil.transferFaultToFault(exchange, me);
206                send(me);
207                // Ack the previous target
208                if (previousId != null) {
209                    me = (MessageExchange) store.load(previousId);
210                    done(me);
211                }
212            // Out message, give it to next target or back to consumer
213            } else if (exchange.getMessage("out") != null) {
214                // This is the answer from the last target
215                if (prevIndex.intValue() == targets.length - 1) {
216                    MessageExchange me = (MessageExchange) store.load(correlationId);
217                    me.setProperty(correlation, exchange.getExchangeId());
218                    store.store(exchange.getExchangeId(), exchange);
219                    MessageUtil.transferOutToOut(exchange, me);
220                    send(me);
221                    if (previousId != null) {
222                        me = (MessageExchange) store.load(previousId);
223                        done(me);
224                    }
225                // We still have a target to hit
226                } else {
227                    MessageExchange me = getExchangeFactory().createInOutExchange();
228                    Integer curIndex = new Integer(prevIndex.intValue() + 1);
229                    me.setProperty(correlation, correlationId);
230                    me.setProperty(index, curIndex);
231                    me.setProperty(previous, exchange.getExchangeId());
232                    targets[curIndex.intValue()].configureTarget(me, getContext());
233                    store.store(exchange.getExchangeId(), exchange);
234                    MessageUtil.transferOutToIn(exchange, me);
235                    send(me);
236                    if (previousId != null) {
237                        me = (MessageExchange) store.load(previousId);
238                        done(me);
239                    }
240                }
241            // This should not happen
242            } else {
243                throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
244                        + " but has no Out nor Fault message");
245            }
246        }
247    
248    }