001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.servicemix.eip.support;
018    
019    import javax.jbi.management.DeploymentException;
020    import javax.jbi.messaging.ExchangeStatus;
021    import javax.jbi.messaging.Fault;
022    import javax.jbi.messaging.MessageExchange;
023    import javax.jbi.messaging.NormalizedMessage;
024    
025    import org.apache.servicemix.common.JbiConstants;
026    import org.apache.servicemix.common.util.MessageUtil;
027    import org.apache.servicemix.eip.EIPEndpoint;
028    import org.apache.servicemix.store.Store;
029    
030    /**
031     * AbstractContentBasedRouter can be used as a base class for content-based routing.
032     * This component implements the  
033     * <a href="http://www.enterpriseintegrationpatterns.com/ContentBasedRouter.html">Content-Based Router</a> 
034     * pattern.
035     * 
036     * @author gnodet
037     * @version $Revision: 376451 $
038     */
039    public abstract class AbstractContentBasedRouter extends EIPEndpoint {
040    
041        /**
042         * The correlation property used by this component
043         */
044        private String correlation;
045    
046        /**
047         * Forward the operation qname when sending the exchange to the target
048         */
049        private boolean forwardOperation;
050    
051        public boolean isForwardOperation() {
052            return forwardOperation;
053        }
054    
055        public void setForwardOperation(boolean forwardOperation) {
056            this.forwardOperation = forwardOperation;
057        }
058    
059        /* (non-Javadoc)
060         * @see org.apache.servicemix.eip.EIPEndpoint#validate()
061         */
062        public void validate() throws DeploymentException {
063            super.validate();
064            // Create correlation property
065            correlation = "AbstractContentBasedRouter.Correlation." + getService() + "." + getEndpoint();
066        }
067    
068        /* (non-Javadoc)
069         * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
070         */
071        protected void processSync(MessageExchange exchange) throws Exception {
072            // Create exchange for target
073            MessageExchange tme = getExchangeFactory().createExchange(exchange.getPattern());
074            // Now copy input to new exchange
075            // We need to read the message once for finding routing target
076            // so ensure we have a re-readable source
077            NormalizedMessage in = MessageUtil.copyIn(exchange);
078            MessageUtil.transferToIn(in, tme); 
079            // Retrieve target
080            ExchangeTarget target = getDestination(tme);
081            target.configureTarget(tme, getContext());
082            if (isForwardOperation() && tme.getOperation() == null) {
083                tme.setOperation(exchange.getOperation());
084            }
085            // Send in to target
086            sendSync(tme);
087            // Send back the result
088            if (tme.getStatus() == ExchangeStatus.DONE) {
089                done(exchange);
090            } else if (tme.getStatus() == ExchangeStatus.ERROR) {
091                fail(exchange, tme.getError());
092            } else if (tme.getFault() != null) {
093                Fault fault = MessageUtil.copyFault(tme);
094                done(tme);
095                MessageUtil.transferToFault(fault, exchange);
096                sendSync(exchange);
097            } else if (tme.getMessage("out") != null) {
098                NormalizedMessage out = MessageUtil.copyOut(tme);
099                done(tme);
100                MessageUtil.transferToOut(out, exchange);
101                sendSync(exchange);
102            } else {
103                done(tme);
104                throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
105                        + " but has no Out nor Fault message");
106            }
107        }
108    
109        /* (non-Javadoc)
110         * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
111         */
112        protected void processAsync(MessageExchange exchange) throws Exception {
113            if (exchange.getRole() == MessageExchange.Role.PROVIDER
114                && exchange.getProperty(correlation) == null) {
115                // Create exchange for target
116                MessageExchange tme = getExchangeFactory().createExchange(exchange.getPattern());
117                if (store.hasFeature(Store.CLUSTERED)) {
118                    exchange.setProperty(JbiConstants.STATELESS_PROVIDER, Boolean.TRUE);
119                    tme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
120                }
121                // Set correlations
122                tme.setProperty(correlation, exchange.getExchangeId());
123                exchange.setProperty(correlation, tme.getExchangeId());
124                // Put exchange to store
125                store.store(exchange.getExchangeId(), exchange);
126                // Now copy input to new exchange
127                // We need to read the message once for finding routing target
128                // so ensure we have a re-readable source
129                try {
130                    NormalizedMessage in = MessageUtil.copyIn(exchange);
131                    MessageUtil.transferToIn(in, tme); 
132                    // Retrieve target
133                    ExchangeTarget target = getDestination(tme);
134                    target.configureTarget(tme, getContext());
135                    if (isForwardOperation() && tme.getOperation() == null) {
136                        tme.setOperation(exchange.getOperation());
137                    }
138                    // Send in to target
139                    send(tme);
140                } catch (Exception e) {
141                    // Clear the store on error
142                    store.load(exchange.getExchangeId());
143                    throw e;
144                }
145            // Mimic the exchange on the other side and send to needed listener
146            } else {
147                String id = (String) exchange.getProperty(correlation);
148                if (id == null) {
149                    throw new IllegalStateException(correlation + " property not found");
150                }
151                MessageExchange org = (MessageExchange) store.load(id);
152                if (org == null) {
153                    throw new IllegalStateException("Could not load original exchange with id " + id);
154                }
155                // Reproduce DONE status to the other side
156                if (exchange.getStatus() == ExchangeStatus.DONE) {
157                    done(org);
158                // Reproduce ERROR status to the other side
159                } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
160                    fail(org, exchange.getError());
161                // Reproduce faults to the other side and listeners
162                } else if (exchange.getFault() != null) {
163                    store.store(exchange.getExchangeId(), exchange);
164                    try {
165                        MessageUtil.transferTo(exchange, org, "fault"); 
166                        send(org);
167                    } catch (Exception e) {
168                        store.load(exchange.getExchangeId());
169                        throw e;
170                    }
171                // Reproduce answers to the other side
172                } else if (exchange.getMessage("out") != null) {
173                    store.store(exchange.getExchangeId(), exchange);
174                    try {
175                        MessageUtil.transferTo(exchange, org, "out"); 
176                        send(org);
177                    } catch (Exception e) {
178                        store.load(exchange.getExchangeId());
179                        throw e;
180                    }
181                } else {
182                    throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
183                            + " but has no Out nor Fault message");
184                }
185            }
186        }
187        
188        /**
189         * Find the target destination for the given JBI exchange
190         * @param exchange
191         * @return the target for the given exchange
192         * @throws Exception
193         */
194        protected abstract ExchangeTarget getDestination(MessageExchange exchange) throws Exception;
195    
196    }