001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.servicemix.eip.support;
018
019 import javax.jbi.management.DeploymentException;
020 import javax.jbi.messaging.ExchangeStatus;
021 import javax.jbi.messaging.Fault;
022 import javax.jbi.messaging.MessageExchange;
023 import javax.jbi.messaging.NormalizedMessage;
024
025 import org.apache.servicemix.common.JbiConstants;
026 import org.apache.servicemix.common.util.MessageUtil;
027 import org.apache.servicemix.eip.EIPEndpoint;
028 import org.apache.servicemix.store.Store;
029
030 /**
031 * AbstractContentBasedRouter can be used as a base class for content-based routing.
032 * This component implements the
033 * <a href="http://www.enterpriseintegrationpatterns.com/ContentBasedRouter.html">Content-Based Router</a>
034 * pattern.
035 *
036 * @author gnodet
037 * @version $Revision: 376451 $
038 */
039 public abstract class AbstractContentBasedRouter extends EIPEndpoint {
040
041 /**
042 * The correlation property used by this component
043 */
044 private String correlation;
045
046 /**
047 * Forward the operation qname when sending the exchange to the target
048 */
049 private boolean forwardOperation;
050
051 public boolean isForwardOperation() {
052 return forwardOperation;
053 }
054
055 public void setForwardOperation(boolean forwardOperation) {
056 this.forwardOperation = forwardOperation;
057 }
058
059 /* (non-Javadoc)
060 * @see org.apache.servicemix.eip.EIPEndpoint#validate()
061 */
062 public void validate() throws DeploymentException {
063 super.validate();
064 // Create correlation property
065 correlation = "AbstractContentBasedRouter.Correlation." + getService() + "." + getEndpoint();
066 }
067
068 /* (non-Javadoc)
069 * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
070 */
071 protected void processSync(MessageExchange exchange) throws Exception {
072 // Create exchange for target
073 MessageExchange tme = getExchangeFactory().createExchange(exchange.getPattern());
074 // Now copy input to new exchange
075 // We need to read the message once for finding routing target
076 // so ensure we have a re-readable source
077 NormalizedMessage in = MessageUtil.copyIn(exchange);
078 MessageUtil.transferToIn(in, tme);
079 // Retrieve target
080 ExchangeTarget target = getDestination(tme);
081 target.configureTarget(tme, getContext());
082 if (isForwardOperation() && tme.getOperation() == null) {
083 tme.setOperation(exchange.getOperation());
084 }
085 // Send in to target
086 sendSync(tme);
087 // Send back the result
088 if (tme.getStatus() == ExchangeStatus.DONE) {
089 done(exchange);
090 } else if (tme.getStatus() == ExchangeStatus.ERROR) {
091 fail(exchange, tme.getError());
092 } else if (tme.getFault() != null) {
093 Fault fault = MessageUtil.copyFault(tme);
094 done(tme);
095 MessageUtil.transferToFault(fault, exchange);
096 sendSync(exchange);
097 } else if (tme.getMessage("out") != null) {
098 NormalizedMessage out = MessageUtil.copyOut(tme);
099 done(tme);
100 MessageUtil.transferToOut(out, exchange);
101 sendSync(exchange);
102 } else {
103 done(tme);
104 throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
105 + " but has no Out nor Fault message");
106 }
107 }
108
109 /* (non-Javadoc)
110 * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
111 */
112 protected void processAsync(MessageExchange exchange) throws Exception {
113 if (exchange.getRole() == MessageExchange.Role.PROVIDER
114 && exchange.getProperty(correlation) == null) {
115 // Create exchange for target
116 MessageExchange tme = getExchangeFactory().createExchange(exchange.getPattern());
117 if (store.hasFeature(Store.CLUSTERED)) {
118 exchange.setProperty(JbiConstants.STATELESS_PROVIDER, Boolean.TRUE);
119 tme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
120 }
121 // Set correlations
122 tme.setProperty(correlation, exchange.getExchangeId());
123 exchange.setProperty(correlation, tme.getExchangeId());
124 // Put exchange to store
125 store.store(exchange.getExchangeId(), exchange);
126 // Now copy input to new exchange
127 // We need to read the message once for finding routing target
128 // so ensure we have a re-readable source
129 try {
130 NormalizedMessage in = MessageUtil.copyIn(exchange);
131 MessageUtil.transferToIn(in, tme);
132 // Retrieve target
133 ExchangeTarget target = getDestination(tme);
134 target.configureTarget(tme, getContext());
135 if (isForwardOperation() && tme.getOperation() == null) {
136 tme.setOperation(exchange.getOperation());
137 }
138 // Send in to target
139 send(tme);
140 } catch (Exception e) {
141 // Clear the store on error
142 store.load(exchange.getExchangeId());
143 throw e;
144 }
145 // Mimic the exchange on the other side and send to needed listener
146 } else {
147 String id = (String) exchange.getProperty(correlation);
148 if (id == null) {
149 throw new IllegalStateException(correlation + " property not found");
150 }
151 MessageExchange org = (MessageExchange) store.load(id);
152 if (org == null) {
153 throw new IllegalStateException("Could not load original exchange with id " + id);
154 }
155 // Reproduce DONE status to the other side
156 if (exchange.getStatus() == ExchangeStatus.DONE) {
157 done(org);
158 // Reproduce ERROR status to the other side
159 } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
160 fail(org, exchange.getError());
161 // Reproduce faults to the other side and listeners
162 } else if (exchange.getFault() != null) {
163 store.store(exchange.getExchangeId(), exchange);
164 try {
165 MessageUtil.transferTo(exchange, org, "fault");
166 send(org);
167 } catch (Exception e) {
168 store.load(exchange.getExchangeId());
169 throw e;
170 }
171 // Reproduce answers to the other side
172 } else if (exchange.getMessage("out") != null) {
173 store.store(exchange.getExchangeId(), exchange);
174 try {
175 MessageUtil.transferTo(exchange, org, "out");
176 send(org);
177 } catch (Exception e) {
178 store.load(exchange.getExchangeId());
179 throw e;
180 }
181 } else {
182 throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
183 + " but has no Out nor Fault message");
184 }
185 }
186 }
187
188 /**
189 * Find the target destination for the given JBI exchange
190 * @param exchange
191 * @return the target for the given exchange
192 * @throws Exception
193 */
194 protected abstract ExchangeTarget getDestination(MessageExchange exchange) throws Exception;
195
196 }