001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.servicemix.eip.support;
018
019 import java.net.URI;
020 import java.util.Iterator;
021 import java.util.Set;
022 import java.util.concurrent.locks.Lock;
023
024 import javax.jbi.management.DeploymentException;
025 import javax.jbi.messaging.ExchangeStatus;
026 import javax.jbi.messaging.InOnly;
027 import javax.jbi.messaging.MessageExchange;
028 import javax.jbi.messaging.NormalizedMessage;
029 import javax.jbi.messaging.RobustInOnly;
030 import javax.xml.transform.Source;
031
032 import org.apache.servicemix.eip.EIPEndpoint;
033 import org.apache.servicemix.common.util.MessageUtil;
034
035 /**
036 * The AbstractSplitter is an abstract base class for Splitters.
037 * This component implements the
038 * <a href="http://www.enterpriseintegrationpatterns.com/Sequencer.html">Splitter</a>
039 * pattern.
040 *
041 * @author gnodet
042 * @version $Revision: 376451 $
043 */
044 public abstract class AbstractSplitter extends EIPEndpoint {
045
046 public static final String SPLITTER_COUNT = "org.apache.servicemix.eip.splitter.count";
047 public static final String SPLITTER_INDEX = "org.apache.servicemix.eip.splitter.index";
048 public static final String SPLITTER_CORRID = "org.apache.servicemix.eip.splitter.corrid";
049
050 /**
051 * The address of the target endpoint
052 */
053 private ExchangeTarget target;
054 /**
055 * Indicates if faults and errors from splitted parts should be sent
056 * back to the consumer. In such a case, only the first fault or
057 * error received will be reported.
058 * Note that if the consumer is synchronous, it will be blocked
059 * until all parts have been successfully acked, or
060 * a fault or error is reported, and the exchange will be kept in the
061 * store for recovery.
062 */
063 private boolean reportErrors;
064 /**
065 * Indicates if incoming attachments should be forwarded with the new exchanges.
066 */
067 private boolean forwardAttachments;
068 /**
069 * Indicates if properties on the incoming message should be forwarded.
070 */
071 private boolean forwardProperties;
072 /**
073 * The correlation property used by this component
074 */
075 //private String correlation;
076 /**
077 * Specifies wether exchanges for all parts are sent synchronously or not.
078 */
079 private boolean synchronous;
080
081 /**
082 * @return the synchronous
083 */
084 public boolean isSynchronous() {
085 return synchronous;
086 }
087
088 /**
089 * @param synchronous the synchronous to set
090 */
091 public void setSynchronous(boolean synchronous) {
092 this.synchronous = synchronous;
093 }
094
095 /**
096 * @return Returns the reportErrors.
097 */
098 public boolean isReportErrors() {
099 return reportErrors;
100 }
101
102 /**
103 * @param reportErrors The reportErrors to set.
104 */
105 public void setReportErrors(boolean reportErrors) {
106 this.reportErrors = reportErrors;
107 }
108
109 /**
110 * @return Returns the target.
111 */
112 public ExchangeTarget getTarget() {
113 return target;
114 }
115
116 /**
117 * @param target The target to set.
118 */
119 public void setTarget(ExchangeTarget target) {
120 this.target = target;
121 }
122
123 /**
124 * @return Returns the forwardAttachments.
125 */
126 public boolean isForwardAttachments() {
127 return forwardAttachments;
128 }
129
130 /**
131 * @param forwardAttachments The forwardAttachments to set.
132 */
133 public void setForwardAttachments(boolean forwardAttachments) {
134 this.forwardAttachments = forwardAttachments;
135 }
136
137 /**
138 * @return Returns the forwardProperties.
139 */
140 public boolean isForwardProperties() {
141 return forwardProperties;
142 }
143
144 /**
145 * @param forwardProperties The forwardProperties to set.
146 */
147 public void setForwardProperties(boolean forwardProperties) {
148 this.forwardProperties = forwardProperties;
149 }
150
151 /* (non-Javadoc)
152 * @see org.apache.servicemix.eip.EIPEndpoint#validate()
153 */
154 public void validate() throws DeploymentException {
155 super.validate();
156 // Check target
157 if (target == null) {
158 throw new IllegalArgumentException("target should be set to a valid ExchangeTarget");
159 }
160 }
161
162 /* (non-Javadoc)
163 * @see org.apache.servicemix.eip.EIPEndpoint#start()
164 */
165 public void start() throws Exception {
166 super.start();
167 // Create correlation property
168 //correlation = "Splitter.Correlation." + getContext().getComponentName();
169 }
170
171 /* (non-Javadoc)
172 * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
173 */
174 protected void processSync(MessageExchange exchange) throws Exception {
175 if (!(exchange instanceof InOnly)
176 && !(exchange instanceof RobustInOnly)) {
177 fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
178 return;
179 }
180 MessageExchange[] parts = createParts(exchange);
181 for (int i = 0; i < parts.length; i++) {
182 target.configureTarget(parts[i], getContext());
183 if (reportErrors || isSynchronous()) {
184 sendSync(parts[i]);
185 if (parts[i].getStatus() == ExchangeStatus.DONE) {
186 // nothing to do
187 } else if (parts[i].getStatus() == ExchangeStatus.ERROR) {
188 if (reportErrors) {
189 fail(exchange, parts[i].getError());
190 return;
191 }
192 } else if (parts[i].getFault() != null) {
193 if (reportErrors) {
194 MessageUtil.transferToFault(MessageUtil.copyFault(parts[i]), exchange);
195 done(parts[i]);
196 sendSync(exchange);
197 return;
198 } else {
199 done(parts[i]);
200 }
201 } else {
202 throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
203 + " but has no Fault message");
204 }
205 } else {
206 sendSync(parts[i]);
207 }
208 }
209 done(exchange);
210 }
211
212 /* (non-Javadoc)
213 * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
214 */
215 protected void processAsync(MessageExchange exchange) throws Exception {
216 if (exchange.getRole() == MessageExchange.Role.CONSUMER) {
217 String corrId = (String) exchange.getMessage("in").getProperty(SPLITTER_CORRID);
218 int count = (Integer) exchange.getMessage("in").getProperty(SPLITTER_COUNT);
219 Lock lock = lockManager.getLock(corrId);
220 lock.lock();
221 try {
222 Integer acks = (Integer) store.load(corrId + ".acks");
223 if (exchange.getStatus() == ExchangeStatus.DONE) {
224 // If the acks integer is not here anymore, the message response has been sent already
225 if (acks != null) {
226 if (acks + 1 >= count) {
227 MessageExchange me = (MessageExchange) store.load(corrId);
228 done(me);
229 } else {
230 store.store(corrId + ".acks", Integer.valueOf(acks + 1));
231 }
232 }
233 } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
234 // If the acks integer is not here anymore, the message response has been sent already
235 if (acks != null) {
236 if (reportErrors) {
237 MessageExchange me = (MessageExchange) store.load(corrId);
238 fail(me, exchange.getError());
239 } else if (acks + 1 >= count) {
240 MessageExchange me = (MessageExchange) store.load(corrId);
241 done(me);
242 } else {
243 store.store(corrId + ".acks", Integer.valueOf(acks + 1));
244 }
245 }
246 } else if (exchange.getFault() != null) {
247 // If the acks integer is not here anymore, the message response has been sent already
248 if (acks != null) {
249 if (reportErrors) {
250 MessageExchange me = (MessageExchange) store.load(corrId);
251 MessageUtil.transferToFault(MessageUtil.copyFault(exchange), me);
252 send(me);
253 done(exchange);
254 } else if (acks + 1 >= count) {
255 MessageExchange me = (MessageExchange) store.load(corrId);
256 done(me);
257 } else {
258 store.store(corrId + ".acks", Integer.valueOf(acks + 1));
259 }
260 } else {
261 done(exchange);
262 }
263 }
264 } finally {
265 lock.unlock();
266 }
267 } else {
268 if (!(exchange instanceof InOnly) && !(exchange instanceof RobustInOnly)) {
269 fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
270 } else {
271 store.store(exchange.getExchangeId(), exchange);
272 MessageExchange[] parts = createParts(exchange);
273 store.store(exchange.getExchangeId() + ".acks", Integer.valueOf(0));
274 for (int i = 0; i < parts.length; i++) {
275 target.configureTarget(parts[i], getContext());
276 send(parts[i]);
277 }
278 }
279 }
280 }
281
282 protected MessageExchange[] createParts(MessageExchange exchange) throws Exception {
283 NormalizedMessage in = MessageUtil.copyIn(exchange);
284 Source[] srcParts = split(in.getContent());
285 MessageExchange[] parts = new MessageExchange[srcParts.length];
286 for (int i = 0; i < srcParts.length; i++) {
287 parts[i] = createPart(exchange.getPattern(), in, srcParts[i]);
288 NormalizedMessage msg = parts[i].getMessage("in");
289 msg.setProperty(SPLITTER_COUNT, new Integer(srcParts.length));
290 msg.setProperty(SPLITTER_INDEX, new Integer(i));
291 msg.setProperty(SPLITTER_CORRID, exchange.getExchangeId());
292 }
293 return parts;
294 }
295
296 protected MessageExchange createPart(URI pattern,
297 NormalizedMessage srcMessage,
298 Source content) throws Exception {
299 MessageExchange me = getExchangeFactory().createExchange(pattern);
300 NormalizedMessage in = me.createMessage();
301 in.setContent(content);
302 me.setMessage(in, "in");
303 if (forwardAttachments) {
304 Set names = srcMessage.getAttachmentNames();
305 for (Iterator iter = names.iterator(); iter.hasNext();) {
306 String name = (String) iter.next();
307 in.addAttachment(name, srcMessage.getAttachment(name));
308 }
309 }
310 if (forwardProperties) {
311 Set names = srcMessage.getPropertyNames();
312 for (Iterator iter = names.iterator(); iter.hasNext();) {
313 String name = (String) iter.next();
314 in.setProperty(name, srcMessage.getProperty(name));
315 }
316 }
317 return me;
318 }
319
320 protected abstract Source[] split(Source main) throws Exception;
321
322 }