001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.servicemix.eip.patterns;
018
019 import javax.jbi.management.DeploymentException;
020 import javax.jbi.messaging.ExchangeStatus;
021 import javax.jbi.messaging.Fault;
022 import javax.jbi.messaging.InOut;
023 import javax.jbi.messaging.MessageExchange;
024
025 import org.apache.servicemix.eip.EIPEndpoint;
026 import org.apache.servicemix.eip.support.ExchangeTarget;
027 import org.apache.servicemix.common.util.MessageUtil;
028
029 /**
030 * A RoutingSlip component can be used to route an incoming In-Out exchange
031 * through a series of target services.
032 * This component implements the
033 * <a href="http://www.enterpriseintegrationpatterns.com/RoutingTable.html">Routing Slip</a>
034 * pattern, with the limitation that the routing table is static.
035 * This component only uses In-Out MEPs and errors or faults sent by targets are reported
036 * back to the consumer, thus interrupting the routing process.
037 * In addition, this component is fully asynchronous and uses an exchange store to provide
038 * full HA and recovery for clustered / persistent flows.
039 *
040 * @author gnodet
041 * @version $Revision: 376451 $
042 * @org.apache.xbean.XBean element="static-routing-slip"
043 * description="A static Routing Slip"
044 */
045 public class StaticRoutingSlip extends EIPEndpoint {
046
047 /**
048 * List of target components used in the RoutingSlip
049 */
050 private ExchangeTarget[] targets;
051 /**
052 * The correlation property used by this component
053 */
054 private String correlation;
055 /**
056 * The current index of the target
057 */
058 private String index;
059 /**
060 * The id of the previous target exchange
061 */
062 private String previous;
063
064 /**
065 * @return Returns the targets.
066 */
067 public ExchangeTarget[] getTargets() {
068 return targets;
069 }
070
071 /**
072 * @param targets The targets to set.
073 */
074 public void setTargets(ExchangeTarget[] targets) {
075 this.targets = targets;
076 }
077
078 /* (non-Javadoc)
079 * @see org.apache.servicemix.eip.EIPEndpoint#validate()
080 */
081 public void validate() throws DeploymentException {
082 super.validate();
083 // Check target
084 if (targets == null || targets.length == 0) {
085 throw new IllegalArgumentException("targets should contain at least one ExchangeTarget");
086 }
087 // Create correlation properties
088 correlation = "RoutingSlip.Correlation." + getService() + "." + getEndpoint();
089 index = "RoutingSlip.Index." + getService() + "." + getEndpoint();
090 previous = "RoutingSlip.Previous." + getService() + "." + getEndpoint();
091 }
092
093 /* (non-Javadoc)
094 * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
095 */
096 protected void processSync(MessageExchange exchange) throws Exception {
097 if (!(exchange instanceof InOut)) {
098 throw new IllegalStateException("Use an InOut MEP");
099 }
100 MessageExchange current = exchange;
101 for (int i = 0; i < targets.length; i++) {
102 InOut me = getExchangeFactory().createInOutExchange();
103 targets[i].configureTarget(me, getContext());
104 if (i == 0) {
105 MessageUtil.transferInToIn(current, me);
106 } else {
107 MessageUtil.transferOutToIn(current, me);
108 }
109 sendSync(me);
110 if (i != 0) {
111 done(current);
112 }
113 if (me.getStatus() == ExchangeStatus.DONE) {
114 throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE);
115 } else if (me.getStatus() == ExchangeStatus.ERROR) {
116 fail(exchange, me.getError());
117 return;
118 } else if (me.getFault() != null) {
119 Fault fault = MessageUtil.copyFault(me);
120 MessageUtil.transferToFault(fault, exchange);
121 done(me);
122 sendSync(exchange);
123 return;
124 } else if (me.getOutMessage() == null) {
125 throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
126 + " but has no Out nor Fault message");
127 }
128 current = me;
129 }
130 MessageUtil.transferToOut(MessageUtil.copyOut(current), exchange);
131 done(current);
132 sendSync(exchange);
133 }
134
135 /* (non-Javadoc)
136 * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
137 */
138 protected void processAsync(MessageExchange exchange) throws Exception {
139 // This exchange comes from the consumer
140 if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
141 processProviderAsync(exchange);
142 // The exchange comes from a target
143 } else {
144 processConsumerAsync(exchange);
145 }
146 }
147
148 protected void processProviderAsync(MessageExchange exchange) throws Exception {
149 if (exchange.getStatus() == ExchangeStatus.DONE) {
150 String correlationId = (String) exchange.getProperty(correlation);
151 if (correlationId == null) {
152 throw new IllegalStateException(correlation + " property not found");
153 }
154 // Ack last target hit
155 MessageExchange me = (MessageExchange) store.load(correlationId);
156 done(me);
157 } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
158 String correlationId = (String) exchange.getProperty(correlation);
159 if (correlationId == null) {
160 throw new IllegalStateException(correlation + " property not found");
161 }
162 // Ack last target hit
163 MessageExchange me = (MessageExchange) store.load(correlationId);
164 done(me);
165 } else if (!(exchange instanceof InOut)) {
166 throw new IllegalStateException("Use an InOut MEP");
167 } else {
168 MessageExchange me = getExchangeFactory().createInOutExchange();
169 me.setProperty(correlation, exchange.getExchangeId());
170 me.setProperty(index, new Integer(0));
171 targets[0].configureTarget(me, getContext());
172 store.store(exchange.getExchangeId(), exchange);
173 MessageUtil.transferInToIn(exchange, me);
174 send(me);
175 }
176 }
177
178 private void processConsumerAsync(MessageExchange exchange) throws Exception {
179 String correlationId = (String) exchange.getProperty(correlation);
180 String previousId = (String) exchange.getProperty(previous);
181 Integer prevIndex = (Integer) exchange.getProperty(index);
182 if (correlationId == null) {
183 throw new IllegalStateException(correlation + " property not found");
184 }
185 if (prevIndex == null) {
186 throw new IllegalStateException(previous + " property not found");
187 }
188 // This should never happen, as we can only send DONE
189 if (exchange.getStatus() == ExchangeStatus.DONE) {
190 throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE);
191 // ERROR are sent back to the consumer
192 } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
193 MessageExchange me = (MessageExchange) store.load(correlationId);
194 fail(me, exchange.getError());
195 // Ack the previous target
196 if (previousId != null) {
197 me = (MessageExchange) store.load(previousId);
198 done(me);
199 }
200 // Faults are sent back to the consumer
201 } else if (exchange.getFault() != null) {
202 MessageExchange me = (MessageExchange) store.load(correlationId);
203 me.setProperty(correlation, exchange.getExchangeId());
204 store.store(exchange.getExchangeId(), exchange);
205 MessageUtil.transferFaultToFault(exchange, me);
206 send(me);
207 // Ack the previous target
208 if (previousId != null) {
209 me = (MessageExchange) store.load(previousId);
210 done(me);
211 }
212 // Out message, give it to next target or back to consumer
213 } else if (exchange.getMessage("out") != null) {
214 // This is the answer from the last target
215 if (prevIndex.intValue() == targets.length - 1) {
216 MessageExchange me = (MessageExchange) store.load(correlationId);
217 me.setProperty(correlation, exchange.getExchangeId());
218 store.store(exchange.getExchangeId(), exchange);
219 MessageUtil.transferOutToOut(exchange, me);
220 send(me);
221 if (previousId != null) {
222 me = (MessageExchange) store.load(previousId);
223 done(me);
224 }
225 // We still have a target to hit
226 } else {
227 MessageExchange me = getExchangeFactory().createInOutExchange();
228 Integer curIndex = new Integer(prevIndex.intValue() + 1);
229 me.setProperty(correlation, correlationId);
230 me.setProperty(index, curIndex);
231 me.setProperty(previous, exchange.getExchangeId());
232 targets[curIndex.intValue()].configureTarget(me, getContext());
233 store.store(exchange.getExchangeId(), exchange);
234 MessageUtil.transferOutToIn(exchange, me);
235 send(me);
236 if (previousId != null) {
237 me = (MessageExchange) store.load(previousId);
238 done(me);
239 }
240 }
241 // This should not happen
242 } else {
243 throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
244 + " but has no Out nor Fault message");
245 }
246 }
247
248 }