001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.servicemix.eip.patterns;
018    
019    import java.io.Serializable;
020    import java.util.Date;
021    
022    import javax.jbi.messaging.MessageExchange;
023    import javax.jbi.messaging.NormalizedMessage;
024    import javax.xml.namespace.QName;
025    import javax.xml.transform.dom.DOMSource;
026    
027    import org.w3c.dom.Document;
028    import org.w3c.dom.Element;
029    import org.w3c.dom.Node;
030    
031    import org.apache.servicemix.eip.support.AbstractAggregator;
032    import org.apache.servicemix.eip.support.AbstractSplitter;
033    import org.apache.servicemix.expression.Expression;
034    import org.apache.servicemix.expression.PropertyExpression;
035    import org.apache.servicemix.jbi.jaxp.SourceTransformer;
036    
037    /**
038     * Aggregator can be used to wait and combine several messages.
039     * This component implements the  
040     * <a href="http://www.enterpriseintegrationpatterns.com/Aggregator.html">Aggregator</a> 
041     * pattern.
042     * 
043     * This aggregator collect  messages with a count, index and correlationId properties.
044     * These properties are automatically set by splitters.
045     * A timeout may be specified so that the aggregator will not keep data forever if a message is missing.
046     * 
047     * @author gnodet
048     * @version $Revision: 376451 $
049     * @org.apache.xbean.XBean element="split-aggregator"
050     */
051    public class SplitAggregator extends AbstractAggregator {
052    
053        protected Expression count = new PropertyExpression(AbstractSplitter.SPLITTER_COUNT);
054        protected Expression index = new PropertyExpression(AbstractSplitter.SPLITTER_INDEX);
055        protected Expression corrId = new PropertyExpression(AbstractSplitter.SPLITTER_CORRID);
056        
057        protected QName aggregateElementName = new QName("aggregate");
058        protected QName messageElementName = new QName("message");
059        protected String countAttribute = "count";
060        protected String indexAttribute = "index";
061        
062        protected long timeout;
063        
064        /**
065         * @return the aggregateElementName
066         */
067        public QName getAggregateElementName() {
068            return aggregateElementName;
069        }
070    
071        /**
072         * @param aggregateElementName the aggregateElementName to set
073         */
074        public void setAggregateElementName(QName aggregateElementName) {
075            this.aggregateElementName = aggregateElementName;
076        }
077    
078        /**
079         * @return the corrId
080         */
081        public Expression getCorrId() {
082            return corrId;
083        }
084    
085        /**
086         * @param corrId the corrId to set
087         */
088        public void setCorrId(Expression corrId) {
089            this.corrId = corrId;
090        }
091    
092        /**
093         * @return the count
094         */
095        public Expression getCount() {
096            return count;
097        }
098    
099        /**
100         * @param count the count to set
101         */
102        public void setCount(Expression count) {
103            this.count = count;
104        }
105    
106        /**
107         * @return the countAttribute
108         */
109        public String getCountAttribute() {
110            return countAttribute;
111        }
112    
113        /**
114         * @param countAttribute the countAttribute to set
115         */
116        public void setCountAttribute(String countAttribute) {
117            this.countAttribute = countAttribute;
118        }
119    
120        /**
121         * @return the index
122         */
123        public Expression getIndex() {
124            return index;
125        }
126    
127        /**
128         * @param index the index to set
129         */
130        public void setIndex(Expression index) {
131            this.index = index;
132        }
133    
134        /**
135         * @return the indexAttribute
136         */
137        public String getIndexAttribute() {
138            return indexAttribute;
139        }
140    
141        /**
142         * @param indexAttribute the indexAttribute to set
143         */
144        public void setIndexAttribute(String indexAttribute) {
145            this.indexAttribute = indexAttribute;
146        }
147    
148        /**
149         * @return the messageElementName
150         */
151        public QName getMessageElementName() {
152            return messageElementName;
153        }
154    
155        /**
156         * @param messageElementName the messageElementName to set
157         */
158        public void setMessageElementName(QName messageElementName) {
159            this.messageElementName = messageElementName;
160        }
161    
162        /**
163         * @return the timeout
164         */
165        public long getTimeout() {
166            return timeout;
167        }
168    
169        /**
170         * @param timeout the timeout to set
171         */
172        public void setTimeout(long timeout) {
173            this.timeout = timeout;
174        }
175    
176        /*(non-Javadoc)
177         * @see org.apache.servicemix.eip.support.AggregationFactory#createAggregation(java.lang.String)
178         */
179        public Object createAggregation(String correlationID) {
180            return new SplitterAggregation(correlationID);
181        }
182    
183        /*(non-Javadoc)
184         * @see org.apache.servicemix.eip.support.AggregationFactory#getCorrelationID(
185         *      javax.jbi.messaging.MessageExchange, javax.jbi.messaging.NormalizedMessage)
186         */
187        public String getCorrelationID(MessageExchange exchange, NormalizedMessage message) throws Exception {
188            return (String) corrId.evaluate(exchange, message);
189        }
190        
191        /* (non-Javadoc)
192         * @see org.apache.servicemix.eip.support.Aggregation#addMessage(
193         *      javax.jbi.messaging.NormalizedMessage, javax.jbi.messaging.MessageExchange)
194         */
195        public boolean addMessage(Object aggregation, NormalizedMessage message, MessageExchange exchange) 
196            throws Exception {
197            NormalizedMessage[] messages = ((SplitterAggregation) aggregation).messages;
198            // Retrieve count, index
199            Integer cnt = (Integer) SplitAggregator.this.count.evaluate(exchange, message);
200            if (cnt == null) {
201                throw new IllegalArgumentException("Property " + AbstractSplitter.SPLITTER_COUNT
202                        + " not specified on message");
203            }
204            if (messages == null) {
205                messages = new NormalizedMessage[cnt];
206                ((SplitterAggregation) aggregation).messages = messages;
207            } else if (cnt != messages.length) {
208                throw new IllegalArgumentException("Property " + AbstractSplitter.SPLITTER_COUNT
209                        + " is not consistent (received " + cnt + ", was " + messages.length + ")");
210            }
211            Integer idx = (Integer) SplitAggregator.this.index.evaluate(exchange, message);
212            if (idx == null) {
213                throw new IllegalArgumentException("Property " + AbstractSplitter.SPLITTER_INDEX
214                        + " not specified on message");
215            }
216            if (idx < 0 || idx >= messages.length) {
217                throw new IllegalArgumentException("Index is ouf of bound: " + idx + " [0.." + messages.length + "]");
218            }
219            if (messages[idx] != null) {
220                throw new IllegalStateException("Message with index " + idx + " has already been received");
221            }
222            // Store message
223            messages[idx] = message;
224            // Check if all messages have been received
225            for (int i = 0; i < messages.length; i++) {
226                if (messages[i] == null) {
227                    return false;
228                }
229            }
230            return true;
231        }
232    
233        /* (non-Javadoc)
234         * @see org.apache.servicemix.eip.support.Aggregation#buildAggregate(
235         *      javax.jbi.messaging.NormalizedMessage, javax.jbi.messaging.MessageExchange, boolean)
236         */
237        public void buildAggregate(Object aggregation, NormalizedMessage message, 
238                MessageExchange exchange, boolean doTimeout) throws Exception {
239            NormalizedMessage[] messages = ((SplitterAggregation) aggregation).messages;
240            String correlationId = ((SplitterAggregation) aggregation).correlationId;
241            SourceTransformer st = new SourceTransformer();
242            Document doc = st.createDocument();
243            Element root = createChildElement(aggregateElementName, doc);
244            root.setAttribute(countAttribute, Integer.toString(messages.length));
245            for (int i = 0; i < messages.length; i++) {
246                if (messages[i] != null) {
247                    Element elem = st.toDOMElement(messages[i]);
248                    if (messageElementName != null) {
249                        Element msg = createChildElement(messageElementName, root);
250                        msg.setAttribute(indexAttribute, Integer.toString(i));
251                        msg.appendChild(doc.importNode(elem, true));
252                    } else {
253                        root.appendChild(doc.importNode(elem, true));
254                    }
255                    if (isCopyProperties()) {
256                        copyProperties(messages[i], message);
257                    }
258                    if (isCopyAttachments()) {
259                        copyAttachments(messages[i], message);
260                    }
261                }
262            }
263            message.setContent(new DOMSource(doc));
264            message.setProperty(AbstractSplitter.SPLITTER_CORRID, correlationId);
265        }
266        
267        protected Element createChildElement(QName name, Node parent) {
268            Document doc = parent instanceof Document ? (Document) parent : parent.getOwnerDocument();
269            Element elem;
270            if ("".equals(name.getNamespaceURI())) {
271                elem = doc.createElement(name.getLocalPart());   
272            } else {
273                elem = doc.createElementNS(name.getNamespaceURI(),
274                                           name.getPrefix() + ":" + name.getLocalPart());
275            }
276            parent.appendChild(elem);
277            return elem;
278        }
279    
280        /* (non-Javadoc)
281         * @see org.apache.servicemix.eip.support.Aggregation#getTimeout()
282         */
283        public Date getTimeout(Object aggregation) {
284            if (timeout > 0) {
285                return new Date(System.currentTimeMillis() + timeout);
286            }
287            return null;
288        }
289        
290        /**
291         * 
292         * @author gnodet
293         */
294        protected static class SplitterAggregation implements Serializable {
295    
296            /**
297             * Serial version UID 
298             */
299            private static final long serialVersionUID = 8555934895155403923L;
300            
301            protected NormalizedMessage[] messages;
302            protected String correlationId;
303          
304            public SplitterAggregation(String correlationId) {
305                this.correlationId = correlationId;
306            }
307            
308            /**
309             * @return the correlationId
310             */
311            public String getCorrelationId() {
312                return correlationId;
313            }
314    
315            /**
316             * @param correlationId the correlationId to set
317             */
318            public void setCorrelationId(String correlationId) {
319                this.correlationId = correlationId;
320            }
321    
322            /**
323             * @return the messages
324             */
325            public NormalizedMessage[] getMessages() {
326                return messages;
327            }
328    
329            /**
330             * @param messages the messages to set
331             */
332            public void setMessages(NormalizedMessage[] messages) {
333                this.messages = messages;
334            }
335    
336        }
337        
338    }