001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.servicemix.eip.patterns;
018    
019    import java.util.Iterator;
020    
021    import javax.jbi.management.DeploymentException;
022    import javax.jbi.messaging.ExchangeStatus;
023    import javax.jbi.messaging.InOnly;
024    import javax.jbi.messaging.MessageExchange;
025    import javax.jbi.messaging.NormalizedMessage;
026    
027    import org.apache.servicemix.common.JbiConstants;
028    import org.apache.servicemix.eip.EIPEndpoint;
029    import org.apache.servicemix.eip.support.ExchangeTarget;
030    import org.apache.servicemix.jbi.helper.MessageUtil;
031    import org.apache.servicemix.store.Store;
032    
033    /**
034     *
035     * A WireTap component can be used to forward a copy of the input message to a listener.
036     * This component implements the 
037     * <a href="http://www.enterpriseintegrationpatterns.com/WireTap.html">WireTap</a> 
038     * pattern.
039     * It can handle all 4 standard MEPs, but will only send an In-Only MEP to the listener.
040     * In addition, this component is fully asynchronous and uses an exchange store to provide
041     * full HA and recovery for clustered / persistent flows. 
042     * 
043     * @author gnodet
044     * @version $Revision: 376451 $
045     * @org.apache.xbean.XBean element="wire-tap"
046     *                  description="A WireTap"
047     */
048    public class WireTap extends EIPEndpoint {
049    
050        /**
051         * The main target destination which will receive the exchange
052         */
053        private ExchangeTarget target;
054        /**
055         * The listener destination for in messages
056         */
057        private ExchangeTarget inListener;
058        /**
059         * The listener destination for out messages
060         */
061        private ExchangeTarget outListener;
062        /**
063         * The listener destination for fault messages
064         */
065        private ExchangeTarget faultListener;
066        /**
067         * The correlation property used by this component
068         */
069        private String correlation;
070        /**
071         * If copyProperties is <code>true</code>, properties
072         * on the in message will be copied to the out / fault
073         * message before it is sent.
074         */
075        private boolean copyProperties;
076        
077        /**
078         * @return Returns the target.
079         */
080        public ExchangeTarget getTarget() {
081            return target;
082        }
083    
084        /**
085         * @param target The target to set.
086         */
087        public void setTarget(ExchangeTarget target) {
088            this.target = target;
089            this.wsdlExchangeTarget = target;
090        }
091    
092        /**
093         * @return Returns the faultListener.
094         */
095        public ExchangeTarget getFaultListener() {
096            return faultListener;
097        }
098    
099        /**
100         * @param faultListener The faultListener to set.
101         */
102        public void setFaultListener(ExchangeTarget faultListener) {
103            this.faultListener = faultListener;
104        }
105    
106        /**
107         * @return Returns the inListener.
108         */
109        public ExchangeTarget getInListener() {
110            return inListener;
111        }
112    
113        /**
114         * @param inListener The inListener to set.
115         */
116        public void setInListener(ExchangeTarget inListener) {
117            this.inListener = inListener;
118        }
119    
120        /**
121         * @return Returns the outListener.
122         */
123        public ExchangeTarget getOutListener() {
124            return outListener;
125        }
126    
127        /**
128         * @param outListener The outListener to set.
129         */
130        public void setOutListener(ExchangeTarget outListener) {
131            this.outListener = outListener;
132        }
133    
134        /**
135         * @return the copyProperties
136         */
137        public boolean isCopyProperties() {
138            return copyProperties;
139        }
140    
141        /**
142         * @param copyProperties the copyProperties to set
143         */
144        public void setCopyProperties(boolean copyProperties) {
145            this.copyProperties = copyProperties;
146        }
147    
148        /* (non-Javadoc)
149         * @see org.apache.servicemix.eip.EIPEndpoint#validate()
150         */
151        public void validate() throws DeploymentException {
152            super.validate();
153            // Check target
154            if (target == null) {
155                throw new IllegalArgumentException("target should be set to a valid ExchangeTarget");
156            }
157            // Create correlation property
158            correlation = "WireTap.Correlation." + getService() + "." + getEndpoint();
159        }
160    
161        /* (non-Javadoc)
162         * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
163         */
164        protected void processSync(MessageExchange exchange) throws Exception {
165            // Create exchange for target
166            MessageExchange tme = getExchangeFactory().createExchange(exchange.getPattern());
167            target.configureTarget(tme, getContext());
168            sendSyncToListenerAndTarget(exchange, tme, inListener, "in", false);
169            if (tme.getStatus() == ExchangeStatus.DONE) {
170                done(exchange);
171            } else if (tme.getStatus() == ExchangeStatus.ERROR) {
172                fail(exchange, tme.getError());
173            } else if (tme.getFault() != null) {
174                sendSyncToListenerAndTarget(tme, exchange, faultListener, "fault", isCopyProperties());
175                done(tme);
176            } else if (tme.getMessage("out") != null) {
177                sendSyncToListenerAndTarget(tme, exchange, outListener, "out", isCopyProperties());
178                done(tme);
179            } else {
180                done(tme);
181                throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
182                        + " but has no Out nor Fault message");
183            }
184        }
185        
186        /* (non-Javadoc)
187         * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
188         */
189        protected void processAsync(MessageExchange exchange) throws Exception {
190            if (exchange.getRole() == MessageExchange.Role.PROVIDER
191                && exchange.getProperty(correlation) == null) {
192                // Create exchange for target
193                MessageExchange tme = getExchangeFactory().createExchange(exchange.getPattern());
194                if (store.hasFeature(Store.CLUSTERED)) {
195                    exchange.setProperty(JbiConstants.STATELESS_PROVIDER, Boolean.TRUE);
196                    tme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
197                }
198                target.configureTarget(tme, getContext());
199                // Set correlations
200                exchange.setProperty(correlation, tme.getExchangeId());
201                tme.setProperty(correlation, exchange.getExchangeId());
202                // Put exchange to store
203                store.store(exchange.getExchangeId(), exchange);
204                // Send in to listener and target
205                sendToListenerAndTarget(exchange, tme, inListener, "in", false);
206            // Mimic the exchange on the other side and send to needed listener
207            } else {
208                String id = (String) exchange.getProperty(correlation);
209                if (id == null) {
210                    if (exchange.getRole() == MessageExchange.Role.CONSUMER
211                        && exchange.getStatus() != ExchangeStatus.ACTIVE) {
212                        // This must be a listener status, so ignore
213                        return;
214                    }
215                    throw new IllegalStateException(correlation + " property not found");
216                }
217                MessageExchange org = (MessageExchange) store.load(id);
218                if (org == null) {
219                    throw new IllegalStateException("Could not load original exchange with id " + id);
220                }
221                // Reproduce DONE status to the other side
222                if (exchange.getStatus() == ExchangeStatus.DONE) {
223                    done(org);
224                // Reproduce ERROR status to the other side
225                } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
226                    fail(org, exchange.getError());
227                // Reproduce faults to the other side and listeners
228                } else if (exchange.getFault() != null) {
229                    store.store(exchange.getExchangeId(), exchange);
230                    sendToListenerAndTarget(exchange, org, faultListener, "fault", isCopyProperties());
231                // Reproduce answers to the other side
232                } else if (exchange.getMessage("out") != null) {
233                    store.store(exchange.getExchangeId(), exchange);
234                    sendToListenerAndTarget(exchange, org, outListener, "out", isCopyProperties());
235                } else {
236                    throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
237                            + " but has no Out nor Fault message");
238                }
239            }
240        }
241        
242        private void sendToListenerAndTarget(MessageExchange source, 
243                                             MessageExchange dest, 
244                                             ExchangeTarget listener,
245                                             String message,
246                                             boolean copy) throws Exception {
247            if (listener != null) {
248                NormalizedMessage msg = MessageUtil.copy(source.getMessage(message));
249                InOnly lme = getExchangeFactory().createInOnlyExchange();
250                if (store.hasFeature(Store.CLUSTERED)) {
251                    lme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
252                }
253                listener.configureTarget(lme, getContext());
254                MessageUtil.transferToIn(msg, lme);
255                send(lme);
256                MessageUtil.transferTo(msg, dest, message);
257                if (copy) {
258                    copyExchangeProperties(dest, "in", message);
259                }
260                send(dest);
261            } else {
262                MessageUtil.transferTo(source, dest, message);
263                if (copy) {
264                    copyExchangeProperties(dest, "in", message);
265                }
266                send(dest);
267            }
268        }
269    
270        private void sendSyncToListenerAndTarget(MessageExchange source, 
271                                                 MessageExchange dest, 
272                                                 ExchangeTarget listener,
273                                                 String message,
274                                                 boolean copy) throws Exception {
275            if (listener != null) {
276                NormalizedMessage msg = MessageUtil.copy(source.getMessage(message));
277                InOnly lme = getExchangeFactory().createInOnlyExchange();
278                if (store.hasFeature(Store.CLUSTERED)) {
279                    lme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
280                }
281                listener.configureTarget(lme, getContext());
282                MessageUtil.transferToIn(msg, lme);
283                sendSync(lme);
284                MessageUtil.transferTo(msg, dest, message);
285                if (copy) {
286                    copyExchangeProperties(dest, "in", message);
287                }
288                sendSync(dest);
289            } else {
290                MessageUtil.transferTo(source, dest, message);
291                if (copy) {
292                    copyExchangeProperties(dest, "in", message);
293                }
294                sendSync(dest);
295            }
296        }
297        
298        /**
299         * A utility method to copy properties from the input of the original 
300         * exchange to the output of the original exchange. 
301         * 
302         * @param exchange
303         * @param srcMessage
304         * @param @dstMessage
305         * @throws Exception
306         */
307        private void copyExchangeProperties(MessageExchange exchange, String srcMessage, String dstMessage) {
308            NormalizedMessage src = exchange.getMessage(srcMessage);
309            NormalizedMessage dst = exchange.getMessage(dstMessage);
310            for (Iterator iter = src.getPropertyNames().iterator(); iter.hasNext();) {
311                String name = (String) iter.next();
312                if (dst.getProperty(name) == null) {
313                    Object prop = src.getProperty(name);
314                    dst.setProperty(name, prop);
315                }
316            }
317        }
318        
319    }