001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.servicemix.eip.patterns;
018
019 import java.util.Iterator;
020
021 import javax.jbi.management.DeploymentException;
022 import javax.jbi.messaging.ExchangeStatus;
023 import javax.jbi.messaging.InOnly;
024 import javax.jbi.messaging.MessageExchange;
025 import javax.jbi.messaging.NormalizedMessage;
026
027 import org.apache.servicemix.common.JbiConstants;
028 import org.apache.servicemix.eip.EIPEndpoint;
029 import org.apache.servicemix.eip.support.ExchangeTarget;
030 import org.apache.servicemix.jbi.helper.MessageUtil;
031 import org.apache.servicemix.store.Store;
032
033 /**
034 *
035 * A WireTap component can be used to forward a copy of the input message to a listener.
036 * This component implements the
037 * <a href="http://www.enterpriseintegrationpatterns.com/WireTap.html">WireTap</a>
038 * pattern.
039 * It can handle all 4 standard MEPs, but will only send an In-Only MEP to the listener.
040 * In addition, this component is fully asynchronous and uses an exchange store to provide
041 * full HA and recovery for clustered / persistent flows.
042 *
043 * @author gnodet
044 * @version $Revision: 376451 $
045 * @org.apache.xbean.XBean element="wire-tap"
046 * description="A WireTap"
047 */
048 public class WireTap extends EIPEndpoint {
049
050 /**
051 * The main target destination which will receive the exchange
052 */
053 private ExchangeTarget target;
054 /**
055 * The listener destination for in messages
056 */
057 private ExchangeTarget inListener;
058 /**
059 * The listener destination for out messages
060 */
061 private ExchangeTarget outListener;
062 /**
063 * The listener destination for fault messages
064 */
065 private ExchangeTarget faultListener;
066 /**
067 * The correlation property used by this component
068 */
069 private String correlation;
070 /**
071 * If copyProperties is <code>true</code>, properties
072 * on the in message will be copied to the out / fault
073 * message before it is sent.
074 */
075 private boolean copyProperties;
076
077 /**
078 * @return Returns the target.
079 */
080 public ExchangeTarget getTarget() {
081 return target;
082 }
083
084 /**
085 * @param target The target to set.
086 */
087 public void setTarget(ExchangeTarget target) {
088 this.target = target;
089 this.wsdlExchangeTarget = target;
090 }
091
092 /**
093 * @return Returns the faultListener.
094 */
095 public ExchangeTarget getFaultListener() {
096 return faultListener;
097 }
098
099 /**
100 * @param faultListener The faultListener to set.
101 */
102 public void setFaultListener(ExchangeTarget faultListener) {
103 this.faultListener = faultListener;
104 }
105
106 /**
107 * @return Returns the inListener.
108 */
109 public ExchangeTarget getInListener() {
110 return inListener;
111 }
112
113 /**
114 * @param inListener The inListener to set.
115 */
116 public void setInListener(ExchangeTarget inListener) {
117 this.inListener = inListener;
118 }
119
120 /**
121 * @return Returns the outListener.
122 */
123 public ExchangeTarget getOutListener() {
124 return outListener;
125 }
126
127 /**
128 * @param outListener The outListener to set.
129 */
130 public void setOutListener(ExchangeTarget outListener) {
131 this.outListener = outListener;
132 }
133
134 /**
135 * @return the copyProperties
136 */
137 public boolean isCopyProperties() {
138 return copyProperties;
139 }
140
141 /**
142 * @param copyProperties the copyProperties to set
143 */
144 public void setCopyProperties(boolean copyProperties) {
145 this.copyProperties = copyProperties;
146 }
147
148 /* (non-Javadoc)
149 * @see org.apache.servicemix.eip.EIPEndpoint#validate()
150 */
151 public void validate() throws DeploymentException {
152 super.validate();
153 // Check target
154 if (target == null) {
155 throw new IllegalArgumentException("target should be set to a valid ExchangeTarget");
156 }
157 // Create correlation property
158 correlation = "WireTap.Correlation." + getService() + "." + getEndpoint();
159 }
160
161 /* (non-Javadoc)
162 * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
163 */
164 protected void processSync(MessageExchange exchange) throws Exception {
165 // Create exchange for target
166 MessageExchange tme = getExchangeFactory().createExchange(exchange.getPattern());
167 target.configureTarget(tme, getContext());
168 sendSyncToListenerAndTarget(exchange, tme, inListener, "in", false);
169 if (tme.getStatus() == ExchangeStatus.DONE) {
170 done(exchange);
171 } else if (tme.getStatus() == ExchangeStatus.ERROR) {
172 fail(exchange, tme.getError());
173 } else if (tme.getFault() != null) {
174 sendSyncToListenerAndTarget(tme, exchange, faultListener, "fault", isCopyProperties());
175 done(tme);
176 } else if (tme.getMessage("out") != null) {
177 sendSyncToListenerAndTarget(tme, exchange, outListener, "out", isCopyProperties());
178 done(tme);
179 } else {
180 done(tme);
181 throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
182 + " but has no Out nor Fault message");
183 }
184 }
185
186 /* (non-Javadoc)
187 * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
188 */
189 protected void processAsync(MessageExchange exchange) throws Exception {
190 if (exchange.getRole() == MessageExchange.Role.PROVIDER
191 && exchange.getProperty(correlation) == null) {
192 // Create exchange for target
193 MessageExchange tme = getExchangeFactory().createExchange(exchange.getPattern());
194 if (store.hasFeature(Store.CLUSTERED)) {
195 exchange.setProperty(JbiConstants.STATELESS_PROVIDER, Boolean.TRUE);
196 tme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
197 }
198 target.configureTarget(tme, getContext());
199 // Set correlations
200 exchange.setProperty(correlation, tme.getExchangeId());
201 tme.setProperty(correlation, exchange.getExchangeId());
202 // Put exchange to store
203 store.store(exchange.getExchangeId(), exchange);
204 // Send in to listener and target
205 sendToListenerAndTarget(exchange, tme, inListener, "in", false);
206 // Mimic the exchange on the other side and send to needed listener
207 } else {
208 String id = (String) exchange.getProperty(correlation);
209 if (id == null) {
210 if (exchange.getRole() == MessageExchange.Role.CONSUMER
211 && exchange.getStatus() != ExchangeStatus.ACTIVE) {
212 // This must be a listener status, so ignore
213 return;
214 }
215 throw new IllegalStateException(correlation + " property not found");
216 }
217 MessageExchange org = (MessageExchange) store.load(id);
218 if (org == null) {
219 throw new IllegalStateException("Could not load original exchange with id " + id);
220 }
221 // Reproduce DONE status to the other side
222 if (exchange.getStatus() == ExchangeStatus.DONE) {
223 done(org);
224 // Reproduce ERROR status to the other side
225 } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
226 fail(org, exchange.getError());
227 // Reproduce faults to the other side and listeners
228 } else if (exchange.getFault() != null) {
229 store.store(exchange.getExchangeId(), exchange);
230 sendToListenerAndTarget(exchange, org, faultListener, "fault", isCopyProperties());
231 // Reproduce answers to the other side
232 } else if (exchange.getMessage("out") != null) {
233 store.store(exchange.getExchangeId(), exchange);
234 sendToListenerAndTarget(exchange, org, outListener, "out", isCopyProperties());
235 } else {
236 throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
237 + " but has no Out nor Fault message");
238 }
239 }
240 }
241
242 private void sendToListenerAndTarget(MessageExchange source,
243 MessageExchange dest,
244 ExchangeTarget listener,
245 String message,
246 boolean copy) throws Exception {
247 if (listener != null) {
248 NormalizedMessage msg = MessageUtil.copy(source.getMessage(message));
249 InOnly lme = getExchangeFactory().createInOnlyExchange();
250 if (store.hasFeature(Store.CLUSTERED)) {
251 lme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
252 }
253 listener.configureTarget(lme, getContext());
254 MessageUtil.transferToIn(msg, lme);
255 send(lme);
256 MessageUtil.transferTo(msg, dest, message);
257 if (copy) {
258 copyExchangeProperties(dest, "in", message);
259 }
260 send(dest);
261 } else {
262 MessageUtil.transferTo(source, dest, message);
263 if (copy) {
264 copyExchangeProperties(dest, "in", message);
265 }
266 send(dest);
267 }
268 }
269
270 private void sendSyncToListenerAndTarget(MessageExchange source,
271 MessageExchange dest,
272 ExchangeTarget listener,
273 String message,
274 boolean copy) throws Exception {
275 if (listener != null) {
276 NormalizedMessage msg = MessageUtil.copy(source.getMessage(message));
277 InOnly lme = getExchangeFactory().createInOnlyExchange();
278 if (store.hasFeature(Store.CLUSTERED)) {
279 lme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
280 }
281 listener.configureTarget(lme, getContext());
282 MessageUtil.transferToIn(msg, lme);
283 sendSync(lme);
284 MessageUtil.transferTo(msg, dest, message);
285 if (copy) {
286 copyExchangeProperties(dest, "in", message);
287 }
288 sendSync(dest);
289 } else {
290 MessageUtil.transferTo(source, dest, message);
291 if (copy) {
292 copyExchangeProperties(dest, "in", message);
293 }
294 sendSync(dest);
295 }
296 }
297
298 /**
299 * A utility method to copy properties from the input of the original
300 * exchange to the output of the original exchange.
301 *
302 * @param exchange
303 * @param srcMessage
304 * @param @dstMessage
305 * @throws Exception
306 */
307 private void copyExchangeProperties(MessageExchange exchange, String srcMessage, String dstMessage) {
308 NormalizedMessage src = exchange.getMessage(srcMessage);
309 NormalizedMessage dst = exchange.getMessage(dstMessage);
310 for (Iterator iter = src.getPropertyNames().iterator(); iter.hasNext();) {
311 String name = (String) iter.next();
312 if (dst.getProperty(name) == null) {
313 Object prop = src.getProperty(name);
314 dst.setProperty(name, prop);
315 }
316 }
317 }
318
319 }