001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.servicemix.eip.support;
018
019 import java.util.Date;
020 import java.util.concurrent.ConcurrentHashMap;
021 import java.util.concurrent.ConcurrentMap;
022 import java.util.concurrent.locks.Lock;
023
024 import javax.jbi.messaging.ExchangeStatus;
025 import javax.jbi.messaging.InOnly;
026 import javax.jbi.messaging.MessageExchange;
027 import javax.jbi.messaging.NormalizedMessage;
028 import javax.jbi.messaging.RobustInOnly;
029
030 import org.apache.commons.logging.Log;
031 import org.apache.commons.logging.LogFactory;
032 import org.apache.servicemix.common.JbiConstants;
033 import org.apache.servicemix.common.util.MessageUtil;
034 import org.apache.servicemix.eip.EIPEndpoint;
035 import org.apache.servicemix.store.Store;
036 import org.apache.servicemix.store.StoreFactory;
037 import org.apache.servicemix.store.memory.MemoryStore;
038 import org.apache.servicemix.store.memory.MemoryStoreFactory;
039 import org.apache.servicemix.timers.Timer;
040 import org.apache.servicemix.timers.TimerListener;
041
042 /**
043 * Aggregator can be used to wait and combine several messages.
044 * This component implements the
045 * <a href="http://www.enterpriseintegrationpatterns.com/Aggregator.html">Aggregator</a>
046 * pattern.
047 *
048 * Closed aggregations are being kept in a {@link Store}. By default, we will use a simple
049 * {@link MemoryStore}, but you can set your own {@link StoreFactory} to use other implementations.
050 *
051 * TODO: distributed lock manager
052 * TODO: persistent / transactional timer
053 *
054 * @author gnodet
055 * @version $Revision: 376451 $
056 */
057 public abstract class AbstractAggregator extends EIPEndpoint {
058
059 private static final Log LOG = LogFactory.getLog(AbstractAggregator.class);
060
061 private ExchangeTarget target;
062
063 private boolean rescheduleTimeouts;
064
065 private boolean synchronous;
066
067 private Store closedAggregates;
068 private StoreFactory closedAggregatesStoreFactory;
069
070 private boolean copyProperties = true;
071
072 private boolean copyAttachments = true;
073
074 private ConcurrentMap<String, Timer> timers = new ConcurrentHashMap<String, Timer>();
075
076 /**
077 * @return the synchronous
078 */
079 public boolean isSynchronous() {
080 return synchronous;
081 }
082
083 /**
084 * @param synchronous the synchronous to set
085 */
086 public void setSynchronous(boolean synchronous) {
087 this.synchronous = synchronous;
088 }
089
090 /**
091 * @return the rescheduleTimeouts
092 */
093 public boolean isRescheduleTimeouts() {
094 return rescheduleTimeouts;
095 }
096
097 /**
098 * @param rescheduleTimeouts the rescheduleTimeouts to set
099 */
100 public void setRescheduleTimeouts(boolean rescheduleTimeouts) {
101 this.rescheduleTimeouts = rescheduleTimeouts;
102 }
103
104 /**
105 * @return the target
106 */
107 public ExchangeTarget getTarget() {
108 return target;
109 }
110
111 /**
112 * @param target the target to set
113 */
114 public void setTarget(ExchangeTarget target) {
115 this.target = target;
116 }
117
118 public boolean isCopyProperties() {
119 return copyProperties;
120 }
121
122 public void setCopyProperties(boolean copyProperties) {
123 this.copyProperties = copyProperties;
124 }
125
126 public boolean isCopyAttachments() {
127 return copyAttachments;
128 }
129
130 public void setCopyAttachments(boolean copyAttachments) {
131 this.copyAttachments = copyAttachments;
132 }
133
134 /* (non-Javadoc)
135 * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
136 */
137 protected void processSync(MessageExchange exchange) throws Exception {
138 throw new IllegalStateException();
139 }
140
141 /**
142 * Access the currently configured {@link StoreFactory} for storing closed aggregations
143 */
144 public StoreFactory getClosedAggregatesStoreFactory() {
145 return closedAggregatesStoreFactory;
146 }
147
148 /**
149 * Set a new {@link StoreFactory} for creating the {@link Store} to hold closed aggregations
150 *
151 * If it hasn't been set, a simple {@link MemoryStoreFactory} will be used by default.
152 *
153 * @param closedAggregatesStoreFactory
154 */
155 public void setClosedAggregatesStoreFactory(StoreFactory closedAggregatesStoreFactory) {
156 this.closedAggregatesStoreFactory = closedAggregatesStoreFactory;
157 }
158
159 /* (non-Javadoc)
160 * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
161 */
162 protected void processAsync(MessageExchange exchange) throws Exception {
163 throw new IllegalStateException();
164 }
165
166 @Override
167 public void start() throws Exception {
168 super.start();
169 if (closedAggregatesStoreFactory == null) {
170 closedAggregatesStoreFactory = new MemoryStoreFactory();
171 }
172 closedAggregates = closedAggregatesStoreFactory.open(getService().toString() + getEndpoint() + "-closed-aggregates");
173 }
174
175 /* (non-Javadoc)
176 * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
177 */
178 public void process(MessageExchange exchange) throws Exception {
179 // Skip DONE
180 if (exchange.getStatus() == ExchangeStatus.DONE) {
181 return;
182 // Skip ERROR
183 } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
184 return;
185 // Handle an ACTIVE exchange as a PROVIDER
186 } else if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
187 if (!(exchange instanceof InOnly)
188 && !(exchange instanceof RobustInOnly)) {
189 fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
190 } else {
191 processProvider(exchange);
192 }
193 // Handle an ACTIVE exchange as a CONSUMER
194 } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
195 done(exchange);
196 }
197 }
198
199 private void processProvider(MessageExchange exchange) throws Exception {
200 final String processCorrelationId = (String) exchange.getProperty(JbiConstants.CORRELATION_ID);
201
202 NormalizedMessage in = MessageUtil.copyIn(exchange);
203 final String correlationId = getCorrelationID(exchange, in);
204 if (correlationId == null || correlationId.length() == 0) {
205 throw new IllegalArgumentException("Could not retrieve correlation id for incoming exchange");
206 }
207 // Load existing aggregation
208 Lock lock = getLockManager().getLock(correlationId);
209 lock.lock();
210 try {
211 Object aggregation = store.load(correlationId);
212 Date timeout = null;
213 // Create a new aggregate
214 if (aggregation == null) {
215 if (isAggregationClosed(correlationId)) {
216 // TODO: should we return an error here ?
217 } else {
218 aggregation = createAggregation(correlationId);
219 timeout = getTimeout(aggregation);
220 }
221 } else if (isRescheduleTimeouts()) {
222 timeout = getTimeout(aggregation);
223 }
224 // If the aggregation is not closed
225 if (aggregation != null) {
226 if (addMessage(aggregation, in, exchange)) {
227 sendAggregate(processCorrelationId, correlationId, aggregation, false, isSynchronous(exchange));
228 } else {
229 store.store(correlationId, aggregation);
230 if (timeout != null) {
231 if (LOG.isDebugEnabled()) {
232 LOG.debug("Scheduling timeout at " + timeout + " for aggregate " + correlationId);
233 }
234 Timer t = getTimerManager().schedule(new TimerListener() {
235 public void timerExpired(Timer timer) {
236 AbstractAggregator.this.onTimeout(processCorrelationId, correlationId, timer);
237 }
238 }, timeout);
239 timers.put(correlationId, t);
240 }
241 }
242 }
243 done(exchange);
244 } finally {
245 lock.unlock();
246 }
247 }
248
249 protected void sendAggregate(String processCorrelationId,
250 String correlationId,
251 Object aggregation,
252 boolean timeout,
253 boolean sync) throws Exception {
254 InOnly me = getExchangeFactory().createInOnlyExchange();
255 if (processCorrelationId != null) {
256 me.setProperty(JbiConstants.CORRELATION_ID, processCorrelationId);
257 }
258 target.configureTarget(me, getContext());
259 NormalizedMessage nm = me.createMessage();
260 me.setInMessage(nm);
261 buildAggregate(aggregation, nm, me, timeout);
262 closeAggregation(correlationId);
263 if (sync) {
264 sendSync(me);
265 } else {
266 send(me);
267 }
268 }
269
270 protected void onTimeout(String processCorrelationId, String correlationId, Timer timer) {
271 if (LOG.isDebugEnabled()) {
272 LOG.debug("Timeout expired for aggregate " + correlationId);
273 }
274 Lock lock = getLockManager().getLock(correlationId);
275 lock.lock();
276 try {
277 // the timeout event could have been fired before timer was canceled
278 Timer t = timers.get(correlationId);
279 if (t == null || !t.equals(timer)) {
280 return;
281 }
282 timers.remove(correlationId);
283 Object aggregation = store.load(correlationId);
284 if (aggregation != null) {
285 sendAggregate(processCorrelationId, correlationId, aggregation, true, isSynchronous());
286 } else if (!isAggregationClosed(correlationId)) {
287 throw new IllegalStateException("Aggregation is not closed, but can not be retrieved from the store");
288 } else {
289 if (LOG.isDebugEnabled()) {
290 LOG.debug("Aggregate " + correlationId + " is closed");
291 }
292 }
293 } catch (Exception e) {
294 LOG.info("Caught exception while processing timeout aggregation", e);
295 } finally {
296 lock.unlock();
297 }
298 }
299
300 /**
301 * Check if the aggregation with the given correlation id is closed or not.
302 * Called when the aggregation has not been found in the store.
303 *
304 * @param correlationId
305 * @return
306 * @throws Exception
307 */
308 protected boolean isAggregationClosed(String correlationId) throws Exception {
309 // TODO: implement this using a persistent / cached behavior
310 Object data = closedAggregates.load(correlationId);
311 if (data != null) {
312 closedAggregates.store(correlationId, data);
313 }
314 return data != null;
315 }
316
317 /**
318 * Mark an aggregation as closed
319 * @param correlationId
320 * @throws Exception
321 */
322 protected void closeAggregation(String correlationId) throws Exception {
323 // TODO: implement this using a persistent / cached behavior
324 closedAggregates.store(correlationId, Boolean.TRUE);
325 }
326
327 private boolean isSynchronous(MessageExchange exchange) {
328 return isSynchronous()
329 || (exchange.isTransacted() && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC)));
330 }
331
332 /**
333 * Retrieve the correlation ID of the given exchange
334 * @param exchange
335 * @param message
336 * @return the correlationID
337 * @throws Exception
338 */
339 protected abstract String getCorrelationID(MessageExchange exchange, NormalizedMessage message) throws Exception;
340
341 /**
342 * Creates a new empty aggregation.
343 * @param correlationID
344 * @return a newly created aggregation
345 */
346 protected abstract Object createAggregation(String correlationID) throws Exception;
347
348 /**
349 * Returns the date when the onTimeout method should be called if the aggregation is not completed yet,
350 * or null if the aggregation has no timeout.
351 *
352 * @param aggregate
353 * @return
354 */
355 protected abstract Date getTimeout(Object aggregate);
356
357 /**
358 * Add a newly received message to this aggregation
359 *
360 * @param aggregate
361 * @param message
362 * @param exchange
363 * @return <code>true</code> if the aggregate id complete
364 */
365 protected abstract boolean addMessage(Object aggregate,
366 NormalizedMessage message,
367 MessageExchange exchange) throws Exception;
368
369 /**
370 * Fill the given JBI message with the aggregation result.
371 *
372 * @param aggregate
373 * @param message
374 * @param exchange
375 * @param timeout <code>false</code> if the aggregation has completed or <code>true</code>
376 * if this aggregation has timed out
377 */
378 protected abstract void buildAggregate(Object aggregate,
379 NormalizedMessage message,
380 MessageExchange exchange,
381 boolean timeout) throws Exception;
382 }