001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.servicemix.eip.patterns;
018
019 import java.io.Serializable;
020 import java.util.Date;
021
022 import javax.jbi.messaging.MessageExchange;
023 import javax.jbi.messaging.NormalizedMessage;
024 import javax.xml.namespace.QName;
025 import javax.xml.transform.dom.DOMSource;
026
027 import org.w3c.dom.Document;
028 import org.w3c.dom.Element;
029 import org.w3c.dom.Node;
030
031 import org.apache.servicemix.eip.support.AbstractAggregator;
032 import org.apache.servicemix.eip.support.AbstractSplitter;
033 import org.apache.servicemix.expression.Expression;
034 import org.apache.servicemix.expression.PropertyExpression;
035 import org.apache.servicemix.jbi.jaxp.SourceTransformer;
036
037 /**
038 * Aggregator can be used to wait and combine several messages.
039 * This component implements the
040 * <a href="http://www.enterpriseintegrationpatterns.com/Aggregator.html">Aggregator</a>
041 * pattern.
042 *
043 * This aggregator collect messages with a count, index and correlationId properties.
044 * These properties are automatically set by splitters.
045 * A timeout may be specified so that the aggregator will not keep data forever if a message is missing.
046 *
047 * @author gnodet
048 * @version $Revision: 376451 $
049 * @org.apache.xbean.XBean element="split-aggregator"
050 */
051 public class SplitAggregator extends AbstractAggregator {
052
053 protected Expression count = new PropertyExpression(AbstractSplitter.SPLITTER_COUNT);
054 protected Expression index = new PropertyExpression(AbstractSplitter.SPLITTER_INDEX);
055 protected Expression corrId = new PropertyExpression(AbstractSplitter.SPLITTER_CORRID);
056
057 protected QName aggregateElementName = new QName("aggregate");
058 protected QName messageElementName = new QName("message");
059 protected String countAttribute = "count";
060 protected String indexAttribute = "index";
061
062 protected long timeout;
063
064 /**
065 * @return the aggregateElementName
066 */
067 public QName getAggregateElementName() {
068 return aggregateElementName;
069 }
070
071 /**
072 * @param aggregateElementName the aggregateElementName to set
073 */
074 public void setAggregateElementName(QName aggregateElementName) {
075 this.aggregateElementName = aggregateElementName;
076 }
077
078 /**
079 * @return the corrId
080 */
081 public Expression getCorrId() {
082 return corrId;
083 }
084
085 /**
086 * @param corrId the corrId to set
087 */
088 public void setCorrId(Expression corrId) {
089 this.corrId = corrId;
090 }
091
092 /**
093 * @return the count
094 */
095 public Expression getCount() {
096 return count;
097 }
098
099 /**
100 * @param count the count to set
101 */
102 public void setCount(Expression count) {
103 this.count = count;
104 }
105
106 /**
107 * @return the countAttribute
108 */
109 public String getCountAttribute() {
110 return countAttribute;
111 }
112
113 /**
114 * @param countAttribute the countAttribute to set
115 */
116 public void setCountAttribute(String countAttribute) {
117 this.countAttribute = countAttribute;
118 }
119
120 /**
121 * @return the index
122 */
123 public Expression getIndex() {
124 return index;
125 }
126
127 /**
128 * @param index the index to set
129 */
130 public void setIndex(Expression index) {
131 this.index = index;
132 }
133
134 /**
135 * @return the indexAttribute
136 */
137 public String getIndexAttribute() {
138 return indexAttribute;
139 }
140
141 /**
142 * @param indexAttribute the indexAttribute to set
143 */
144 public void setIndexAttribute(String indexAttribute) {
145 this.indexAttribute = indexAttribute;
146 }
147
148 /**
149 * @return the messageElementName
150 */
151 public QName getMessageElementName() {
152 return messageElementName;
153 }
154
155 /**
156 * @param messageElementName the messageElementName to set
157 */
158 public void setMessageElementName(QName messageElementName) {
159 this.messageElementName = messageElementName;
160 }
161
162 /**
163 * @return the timeout
164 */
165 public long getTimeout() {
166 return timeout;
167 }
168
169 /**
170 * @param timeout the timeout to set
171 */
172 public void setTimeout(long timeout) {
173 this.timeout = timeout;
174 }
175
176 /*(non-Javadoc)
177 * @see org.apache.servicemix.eip.support.AggregationFactory#createAggregation(java.lang.String)
178 */
179 public Object createAggregation(String correlationID) {
180 return new SplitterAggregation(correlationID);
181 }
182
183 /*(non-Javadoc)
184 * @see org.apache.servicemix.eip.support.AggregationFactory#getCorrelationID(
185 * javax.jbi.messaging.MessageExchange, javax.jbi.messaging.NormalizedMessage)
186 */
187 public String getCorrelationID(MessageExchange exchange, NormalizedMessage message) throws Exception {
188 return (String) corrId.evaluate(exchange, message);
189 }
190
191 /* (non-Javadoc)
192 * @see org.apache.servicemix.eip.support.Aggregation#addMessage(
193 * javax.jbi.messaging.NormalizedMessage, javax.jbi.messaging.MessageExchange)
194 */
195 public boolean addMessage(Object aggregation, NormalizedMessage message, MessageExchange exchange)
196 throws Exception {
197 NormalizedMessage[] messages = ((SplitterAggregation) aggregation).messages;
198 // Retrieve count, index
199 Integer cnt = (Integer) SplitAggregator.this.count.evaluate(exchange, message);
200 if (cnt == null) {
201 throw new IllegalArgumentException("Property " + AbstractSplitter.SPLITTER_COUNT
202 + " not specified on message");
203 }
204 if (messages == null) {
205 messages = new NormalizedMessage[cnt];
206 ((SplitterAggregation) aggregation).messages = messages;
207 } else if (cnt != messages.length) {
208 throw new IllegalArgumentException("Property " + AbstractSplitter.SPLITTER_COUNT
209 + " is not consistent (received " + cnt + ", was " + messages.length + ")");
210 }
211 Integer idx = (Integer) SplitAggregator.this.index.evaluate(exchange, message);
212 if (idx == null) {
213 throw new IllegalArgumentException("Property " + AbstractSplitter.SPLITTER_INDEX
214 + " not specified on message");
215 }
216 if (idx < 0 || idx >= messages.length) {
217 throw new IllegalArgumentException("Index is ouf of bound: " + idx + " [0.." + messages.length + "]");
218 }
219 if (messages[idx] != null) {
220 throw new IllegalStateException("Message with index " + idx + " has already been received");
221 }
222 // Store message
223 messages[idx] = message;
224 // Check if all messages have been received
225 for (int i = 0; i < messages.length; i++) {
226 if (messages[i] == null) {
227 return false;
228 }
229 }
230 return true;
231 }
232
233 /* (non-Javadoc)
234 * @see org.apache.servicemix.eip.support.Aggregation#buildAggregate(
235 * javax.jbi.messaging.NormalizedMessage, javax.jbi.messaging.MessageExchange, boolean)
236 */
237 public void buildAggregate(Object aggregation, NormalizedMessage message,
238 MessageExchange exchange, boolean doTimeout) throws Exception {
239 NormalizedMessage[] messages = ((SplitterAggregation) aggregation).messages;
240 String correlationId = ((SplitterAggregation) aggregation).correlationId;
241 SourceTransformer st = new SourceTransformer();
242 Document doc = st.createDocument();
243 Element root = createChildElement(aggregateElementName, doc);
244 root.setAttribute(countAttribute, Integer.toString(messages.length));
245 for (int i = 0; i < messages.length; i++) {
246 if (messages[i] != null) {
247 Element elem = st.toDOMElement(messages[i]);
248 if (messageElementName != null) {
249 Element msg = createChildElement(messageElementName, root);
250 msg.setAttribute(indexAttribute, Integer.toString(i));
251 msg.appendChild(doc.importNode(elem, true));
252 } else {
253 root.appendChild(doc.importNode(elem, true));
254 }
255 if (isCopyProperties()) {
256 copyProperties(messages[i], message);
257 }
258 if (isCopyAttachments()) {
259 copyAttachments(messages[i], message);
260 }
261 }
262 }
263 message.setContent(new DOMSource(doc));
264 message.setProperty(AbstractSplitter.SPLITTER_CORRID, correlationId);
265 }
266
267 protected Element createChildElement(QName name, Node parent) {
268 Document doc = parent instanceof Document ? (Document) parent : parent.getOwnerDocument();
269 Element elem;
270 if ("".equals(name.getNamespaceURI())) {
271 elem = doc.createElement(name.getLocalPart());
272 } else {
273 elem = doc.createElementNS(name.getNamespaceURI(),
274 name.getPrefix() + ":" + name.getLocalPart());
275 }
276 parent.appendChild(elem);
277 return elem;
278 }
279
280 /* (non-Javadoc)
281 * @see org.apache.servicemix.eip.support.Aggregation#getTimeout()
282 */
283 public Date getTimeout(Object aggregation) {
284 if (timeout > 0) {
285 return new Date(System.currentTimeMillis() + timeout);
286 }
287 return null;
288 }
289
290 /**
291 *
292 * @author gnodet
293 */
294 protected static class SplitterAggregation implements Serializable {
295
296 /**
297 * Serial version UID
298 */
299 private static final long serialVersionUID = 8555934895155403923L;
300
301 protected NormalizedMessage[] messages;
302 protected String correlationId;
303
304 public SplitterAggregation(String correlationId) {
305 this.correlationId = correlationId;
306 }
307
308 /**
309 * @return the correlationId
310 */
311 public String getCorrelationId() {
312 return correlationId;
313 }
314
315 /**
316 * @param correlationId the correlationId to set
317 */
318 public void setCorrelationId(String correlationId) {
319 this.correlationId = correlationId;
320 }
321
322 /**
323 * @return the messages
324 */
325 public NormalizedMessage[] getMessages() {
326 return messages;
327 }
328
329 /**
330 * @param messages the messages to set
331 */
332 public void setMessages(NormalizedMessage[] messages) {
333 this.messages = messages;
334 }
335
336 }
337
338 }