001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.servicemix.eip.support;
018    
019    import java.net.URI;
020    import java.util.Iterator;
021    import java.util.Set;
022    import java.util.concurrent.locks.Lock;
023    
024    import javax.jbi.management.DeploymentException;
025    import javax.jbi.messaging.ExchangeStatus;
026    import javax.jbi.messaging.InOnly;
027    import javax.jbi.messaging.MessageExchange;
028    import javax.jbi.messaging.NormalizedMessage;
029    import javax.jbi.messaging.RobustInOnly;
030    import javax.xml.transform.Source;
031    
032    import org.apache.servicemix.eip.EIPEndpoint;
033    import org.apache.servicemix.common.util.MessageUtil;
034    
035    /**
036     * The AbstractSplitter is an abstract base class for Splitters.
037     * This component implements the  
038     * <a href="http://www.enterpriseintegrationpatterns.com/Sequencer.html">Splitter</a> 
039     * pattern.
040     *  
041     * @author gnodet
042     * @version $Revision: 376451 $
043     */
044    public abstract class AbstractSplitter extends EIPEndpoint {
045        
046        public static final String SPLITTER_COUNT = "org.apache.servicemix.eip.splitter.count";
047        public static final String SPLITTER_INDEX = "org.apache.servicemix.eip.splitter.index";
048        public static final String SPLITTER_CORRID = "org.apache.servicemix.eip.splitter.corrid";
049    
050        /**
051         * The address of the target endpoint
052         */
053        private ExchangeTarget target;
054        /**
055         * Indicates if faults and errors from splitted parts should be sent
056         * back to the consumer.  In such a case, only the first fault or
057         * error received will be reported.
058         * Note that if the consumer is synchronous, it will be blocked
059         * until all parts have been successfully acked, or
060         * a fault or error is reported, and the exchange will be kept in the
061         * store for recovery. 
062         */
063        private boolean reportErrors;
064        /**
065         * Indicates if incoming attachments should be forwarded with the new exchanges.
066         */
067        private boolean forwardAttachments;
068        /**
069         * Indicates if properties on the incoming message should be forwarded.
070         */
071        private boolean forwardProperties;
072        /**
073         * The correlation property used by this component
074         */
075        //private String correlation;
076        /**
077         * Specifies wether exchanges for all parts are sent synchronously or not.
078         */
079        private boolean synchronous;
080        
081        /**
082         * @return the synchronous
083         */
084        public boolean isSynchronous() {
085            return synchronous;
086        }
087    
088        /**
089         * @param synchronous the synchronous to set
090         */
091        public void setSynchronous(boolean synchronous) {
092            this.synchronous = synchronous;
093        }
094    
095        /**
096         * @return Returns the reportErrors.
097         */
098        public boolean isReportErrors() {
099            return reportErrors;
100        }
101    
102        /**
103         * @param reportErrors The reportErrors to set.
104         */
105        public void setReportErrors(boolean reportErrors) {
106            this.reportErrors = reportErrors;
107        }
108    
109        /**
110         * @return Returns the target.
111         */
112        public ExchangeTarget getTarget() {
113            return target;
114        }
115    
116        /**
117         * @param target The target to set.
118         */
119        public void setTarget(ExchangeTarget target) {
120            this.target = target;
121        }
122    
123        /**
124         * @return Returns the forwardAttachments.
125         */
126        public boolean isForwardAttachments() {
127            return forwardAttachments;
128        }
129    
130        /**
131         * @param forwardAttachments The forwardAttachments to set.
132         */
133        public void setForwardAttachments(boolean forwardAttachments) {
134            this.forwardAttachments = forwardAttachments;
135        }
136    
137        /**
138         * @return Returns the forwardProperties.
139         */
140        public boolean isForwardProperties() {
141            return forwardProperties;
142        }
143    
144        /**
145         * @param forwardProperties The forwardProperties to set.
146         */
147        public void setForwardProperties(boolean forwardProperties) {
148            this.forwardProperties = forwardProperties;
149        }
150    
151        /* (non-Javadoc)
152         * @see org.apache.servicemix.eip.EIPEndpoint#validate()
153         */
154        public void validate() throws DeploymentException {
155            super.validate();
156            // Check target
157            if (target == null) {
158                throw new IllegalArgumentException("target should be set to a valid ExchangeTarget");
159            }
160        }
161        
162        /* (non-Javadoc)
163         * @see org.apache.servicemix.eip.EIPEndpoint#start()
164         */
165        public void start() throws Exception {
166            super.start();
167            // Create correlation property
168            //correlation = "Splitter.Correlation." + getContext().getComponentName();
169        }
170    
171        /* (non-Javadoc)
172         * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
173         */
174        protected void processSync(MessageExchange exchange) throws Exception {
175            if (!(exchange instanceof InOnly)
176                && !(exchange instanceof RobustInOnly)) {
177                fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
178                return;
179            }
180            MessageExchange[] parts = createParts(exchange);
181            for (int i = 0; i < parts.length; i++) {
182                target.configureTarget(parts[i], getContext());
183                if (reportErrors || isSynchronous()) {
184                    sendSync(parts[i]);
185                    if (parts[i].getStatus() == ExchangeStatus.DONE) {
186                        // nothing to do
187                    } else if (parts[i].getStatus() == ExchangeStatus.ERROR) {
188                        if (reportErrors) {
189                            fail(exchange, parts[i].getError());
190                            return;
191                        }
192                    } else if (parts[i].getFault() != null) {
193                        if (reportErrors) {
194                            MessageUtil.transferToFault(MessageUtil.copyFault(parts[i]), exchange);
195                            done(parts[i]);
196                            sendSync(exchange);
197                            return;
198                        } else {
199                            done(parts[i]);
200                        }
201                    } else {
202                        throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
203                                + " but has no Fault message");
204                    }
205                } else {
206                    sendSync(parts[i]);
207                }
208            }
209            done(exchange);
210        }
211    
212        /* (non-Javadoc)
213         * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
214         */
215        protected void processAsync(MessageExchange exchange) throws Exception {
216            if (exchange.getRole() == MessageExchange.Role.CONSUMER) {
217                String corrId = (String) exchange.getMessage("in").getProperty(SPLITTER_CORRID);
218                int count = (Integer) exchange.getMessage("in").getProperty(SPLITTER_COUNT);
219                Lock lock = lockManager.getLock(corrId);
220                lock.lock();
221                try {
222                    Integer acks = (Integer) store.load(corrId + ".acks");
223                    if (exchange.getStatus() == ExchangeStatus.DONE) {
224                        // If the acks integer is not here anymore, the message response has been sent already
225                        if (acks != null) {
226                            if (acks + 1 >= count) {
227                                MessageExchange me = (MessageExchange) store.load(corrId);
228                                done(me);
229                            } else {
230                                store.store(corrId + ".acks", Integer.valueOf(acks + 1));
231                            }
232                        }
233                    } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
234                        // If the acks integer is not here anymore, the message response has been sent already
235                        if (acks != null) {
236                            if (reportErrors) {
237                                MessageExchange me = (MessageExchange) store.load(corrId);
238                                fail(me, exchange.getError());
239                            } else  if (acks + 1 >= count) {
240                                MessageExchange me = (MessageExchange) store.load(corrId);
241                                done(me);
242                            } else {
243                                store.store(corrId + ".acks", Integer.valueOf(acks + 1));
244                            }
245                        }
246                    } else if (exchange.getFault() != null) {
247                        // If the acks integer is not here anymore, the message response has been sent already
248                        if (acks != null) {
249                            if (reportErrors) {
250                                MessageExchange me = (MessageExchange) store.load(corrId);
251                                MessageUtil.transferToFault(MessageUtil.copyFault(exchange), me);
252                                send(me);
253                                done(exchange);
254                            } else  if (acks + 1 >= count) {
255                                MessageExchange me = (MessageExchange) store.load(corrId);
256                                done(me);
257                            } else {
258                                store.store(corrId + ".acks", Integer.valueOf(acks + 1));
259                            }
260                        } else {
261                            done(exchange);
262                        }
263                    }
264                } finally {
265                    lock.unlock();
266                }
267            } else {
268                if (!(exchange instanceof InOnly) && !(exchange instanceof RobustInOnly)) {
269                    fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
270                } else {
271                    store.store(exchange.getExchangeId(), exchange);
272                    MessageExchange[] parts = createParts(exchange);
273                    store.store(exchange.getExchangeId() + ".acks", Integer.valueOf(0));
274                    for (int i = 0; i < parts.length; i++) {
275                        target.configureTarget(parts[i], getContext());
276                        send(parts[i]);
277                    }
278                }
279            }
280        }
281        
282        protected MessageExchange[] createParts(MessageExchange exchange) throws Exception {
283            NormalizedMessage in = MessageUtil.copyIn(exchange);
284            Source[] srcParts = split(in.getContent());
285            MessageExchange[] parts = new MessageExchange[srcParts.length];
286            for (int i = 0; i < srcParts.length; i++) {
287                parts[i] = createPart(exchange.getPattern(), in, srcParts[i]);
288                NormalizedMessage msg = parts[i].getMessage("in");
289                msg.setProperty(SPLITTER_COUNT, new Integer(srcParts.length));
290                msg.setProperty(SPLITTER_INDEX, new Integer(i));
291                msg.setProperty(SPLITTER_CORRID, exchange.getExchangeId());
292            }
293            return parts;
294        }
295        
296        protected MessageExchange createPart(URI pattern,
297                                             NormalizedMessage srcMessage, 
298                                             Source content) throws Exception {
299            MessageExchange me = getExchangeFactory().createExchange(pattern);
300            NormalizedMessage in = me.createMessage();
301            in.setContent(content);
302            me.setMessage(in, "in");
303            if (forwardAttachments) {
304                Set names = srcMessage.getAttachmentNames();
305                for (Iterator iter = names.iterator(); iter.hasNext();) {
306                    String name = (String) iter.next();
307                    in.addAttachment(name, srcMessage.getAttachment(name));
308                }
309            }
310            if (forwardProperties) {
311                Set names  = srcMessage.getPropertyNames();
312                for (Iterator iter = names.iterator(); iter.hasNext();) {
313                    String name = (String) iter.next();
314                    in.setProperty(name, srcMessage.getProperty(name));
315                }
316            }
317            return me;
318        }
319    
320        protected abstract Source[] split(Source main) throws Exception;
321    
322    }