001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.servicemix.eip.support;
018    
019    import java.util.Date;
020    import java.util.concurrent.ConcurrentHashMap;
021    import java.util.concurrent.ConcurrentMap;
022    import java.util.concurrent.locks.Lock;
023    
024    import javax.jbi.messaging.ExchangeStatus;
025    import javax.jbi.messaging.InOnly;
026    import javax.jbi.messaging.MessageExchange;
027    import javax.jbi.messaging.NormalizedMessage;
028    import javax.jbi.messaging.RobustInOnly;
029    
030    import org.apache.commons.logging.Log;
031    import org.apache.commons.logging.LogFactory;
032    import org.apache.servicemix.common.JbiConstants;
033    import org.apache.servicemix.common.util.MessageUtil;
034    import org.apache.servicemix.eip.EIPEndpoint;
035    import org.apache.servicemix.store.Store;
036    import org.apache.servicemix.store.StoreFactory;
037    import org.apache.servicemix.store.memory.MemoryStore;
038    import org.apache.servicemix.store.memory.MemoryStoreFactory;
039    import org.apache.servicemix.timers.Timer;
040    import org.apache.servicemix.timers.TimerListener;
041    
042    /**
043     * Aggregator can be used to wait and combine several messages.
044     * This component implements the
045     * <a href="http://www.enterpriseintegrationpatterns.com/Aggregator.html">Aggregator</a>
046     * pattern.
047     *
048     * Closed aggregations are being kept in a {@link Store}.  By default, we will use a simple 
049     * {@link MemoryStore}, but you can set your own {@link StoreFactory} to use other implementations.
050     * 
051     * TODO: distributed lock manager
052     * TODO: persistent / transactional timer
053     *
054     * @author gnodet
055     * @version $Revision: 376451 $
056     */
057    public abstract class AbstractAggregator extends EIPEndpoint {
058    
059        private static final Log LOG = LogFactory.getLog(AbstractAggregator.class);
060    
061        private ExchangeTarget target;
062        
063        private boolean rescheduleTimeouts;
064        
065        private boolean synchronous;
066    
067        private Store closedAggregates;
068        private StoreFactory closedAggregatesStoreFactory;
069    
070        private boolean copyProperties = true;
071    
072        private boolean copyAttachments = true;
073        
074        private ConcurrentMap<String, Timer> timers = new ConcurrentHashMap<String, Timer>();
075        
076        /**
077         * @return the synchronous
078         */
079        public boolean isSynchronous() {
080            return synchronous;
081        }
082    
083        /**
084         * @param synchronous the synchronous to set
085         */
086        public void setSynchronous(boolean synchronous) {
087            this.synchronous = synchronous;
088        }
089    
090        /**
091         * @return the rescheduleTimeouts
092         */
093        public boolean isRescheduleTimeouts() {
094            return rescheduleTimeouts;
095        }
096    
097        /**
098         * @param rescheduleTimeouts the rescheduleTimeouts to set
099         */
100        public void setRescheduleTimeouts(boolean rescheduleTimeouts) {
101            this.rescheduleTimeouts = rescheduleTimeouts;
102        }
103    
104        /**
105         * @return the target
106         */
107        public ExchangeTarget getTarget() {
108            return target;
109        }
110    
111        /**
112         * @param target the target to set
113         */
114        public void setTarget(ExchangeTarget target) {
115            this.target = target;
116        }
117    
118        public boolean isCopyProperties() {
119            return copyProperties;
120        }
121    
122        public void setCopyProperties(boolean copyProperties) {
123            this.copyProperties = copyProperties;
124        }
125    
126        public boolean isCopyAttachments() {
127            return copyAttachments;
128        }
129    
130        public void setCopyAttachments(boolean copyAttachments) {
131            this.copyAttachments = copyAttachments;
132        }
133        
134        /* (non-Javadoc)
135         * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
136         */
137        protected void processSync(MessageExchange exchange) throws Exception {
138            throw new IllegalStateException();
139        }
140        
141        /**
142         * Access the currently configured {@link StoreFactory} for storing closed aggregations
143         */
144        public StoreFactory getClosedAggregatesStoreFactory() {
145            return closedAggregatesStoreFactory;
146        }
147    
148        /**
149         * Set a new {@link StoreFactory} for creating the {@link Store} to hold closed aggregations
150         * 
151         * If it hasn't been set, a simple {@link MemoryStoreFactory} will be used by default.
152         * 
153         * @param closedAggregatesStoreFactory
154         */
155        public void setClosedAggregatesStoreFactory(StoreFactory closedAggregatesStoreFactory) {
156            this.closedAggregatesStoreFactory = closedAggregatesStoreFactory;
157        }
158    
159        /* (non-Javadoc)
160         * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
161         */
162        protected void processAsync(MessageExchange exchange) throws Exception {
163            throw new IllegalStateException();
164        }
165        
166        @Override
167        public void start() throws Exception {
168            super.start();
169            if (closedAggregatesStoreFactory == null) {
170                closedAggregatesStoreFactory = new MemoryStoreFactory();
171            }
172            closedAggregates = closedAggregatesStoreFactory.open(getService().toString() + getEndpoint() + "-closed-aggregates");
173        }
174    
175        /* (non-Javadoc)
176         * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
177         */
178        public void process(MessageExchange exchange) throws Exception {
179            // Skip DONE
180            if (exchange.getStatus() == ExchangeStatus.DONE) {
181                return;
182            // Skip ERROR
183            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
184                return;
185            // Handle an ACTIVE exchange as a PROVIDER
186            } else if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
187                if (!(exchange instanceof InOnly)
188                    && !(exchange instanceof RobustInOnly)) {
189                    fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
190                } else {
191                    processProvider(exchange);
192                }
193            // Handle an ACTIVE exchange as a CONSUMER
194            } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
195                done(exchange);
196            }
197        }
198    
199        private void processProvider(MessageExchange exchange) throws Exception {
200            final String processCorrelationId = (String) exchange.getProperty(JbiConstants.CORRELATION_ID);
201    
202            NormalizedMessage in = MessageUtil.copyIn(exchange);
203            final String correlationId = getCorrelationID(exchange, in);
204            if (correlationId == null || correlationId.length() == 0) {
205                throw new IllegalArgumentException("Could not retrieve correlation id for incoming exchange");
206            }
207            // Load existing aggregation
208            Lock lock = getLockManager().getLock(correlationId);
209            lock.lock();
210            try {
211                Object aggregation = store.load(correlationId);
212                Date timeout = null;
213                // Create a new aggregate
214                if (aggregation == null) {
215                    if (isAggregationClosed(correlationId)) {
216                        // TODO: should we return an error here ?
217                    } else {
218                        aggregation = createAggregation(correlationId);
219                        timeout = getTimeout(aggregation);
220                    }
221                } else if (isRescheduleTimeouts()) {
222                    timeout = getTimeout(aggregation);
223                }
224                // If the aggregation is not closed
225                if (aggregation != null) {
226                    if (addMessage(aggregation, in, exchange)) {
227                        sendAggregate(processCorrelationId, correlationId, aggregation, false, isSynchronous(exchange));
228                    } else {
229                        store.store(correlationId, aggregation);
230                        if (timeout != null) {
231                            if (LOG.isDebugEnabled()) {
232                                LOG.debug("Scheduling timeout at " + timeout + " for aggregate " + correlationId);
233                            }
234                            Timer t = getTimerManager().schedule(new TimerListener() {
235                                public void timerExpired(Timer timer) {
236                                    AbstractAggregator.this.onTimeout(processCorrelationId, correlationId, timer);
237                                }
238                            }, timeout);
239                            timers.put(correlationId, t);
240                        }
241                    }
242                }
243                done(exchange);
244            } finally {
245                lock.unlock();
246            }
247        }
248    
249        protected void sendAggregate(String processCorrelationId,
250                                     String correlationId,
251                                     Object aggregation,
252                                     boolean timeout,
253                                     boolean sync) throws Exception {
254            InOnly me = getExchangeFactory().createInOnlyExchange();
255            if (processCorrelationId != null) {
256                me.setProperty(JbiConstants.CORRELATION_ID, processCorrelationId);
257            }
258            target.configureTarget(me, getContext());
259            NormalizedMessage nm = me.createMessage();
260            me.setInMessage(nm);
261            buildAggregate(aggregation, nm, me, timeout);
262            closeAggregation(correlationId);
263            if (sync) {
264                sendSync(me);
265            } else {
266                send(me);
267            }
268        }
269    
270        protected void onTimeout(String processCorrelationId, String correlationId, Timer timer) {
271            if (LOG.isDebugEnabled()) {
272                LOG.debug("Timeout expired for aggregate " + correlationId);
273            }
274            Lock lock = getLockManager().getLock(correlationId);
275            lock.lock();
276            try {
277                // the timeout event could have been fired before timer was canceled
278                Timer t = timers.get(correlationId);
279                if (t == null || !t.equals(timer)) {
280                    return;
281                }
282                timers.remove(correlationId);
283                Object aggregation = store.load(correlationId);
284                if (aggregation != null) {
285                    sendAggregate(processCorrelationId, correlationId, aggregation, true, isSynchronous());
286                } else if (!isAggregationClosed(correlationId)) {
287                    throw new IllegalStateException("Aggregation is not closed, but can not be retrieved from the store");
288                } else {
289                    if (LOG.isDebugEnabled()) {
290                        LOG.debug("Aggregate " + correlationId + " is closed");
291                    }
292                }
293            } catch (Exception e) {
294                LOG.info("Caught exception while processing timeout aggregation", e);
295            } finally {
296                lock.unlock();
297            }
298        }
299    
300        /**
301         * Check if the aggregation with the given correlation id is closed or not.
302         * Called when the aggregation has not been found in the store.
303         *
304         * @param correlationId
305         * @return
306         * @throws Exception 
307         */
308        protected boolean isAggregationClosed(String correlationId) throws Exception {
309            // TODO: implement this using a persistent / cached behavior
310            Object data = closedAggregates.load(correlationId);
311            if (data != null) {
312                closedAggregates.store(correlationId, data);
313            }
314            return data != null;
315        }
316    
317        /**
318         * Mark an aggregation as closed
319         * @param correlationId
320         * @throws Exception 
321         */
322        protected void closeAggregation(String correlationId) throws Exception {
323            // TODO: implement this using a persistent / cached behavior
324            closedAggregates.store(correlationId, Boolean.TRUE);
325        }
326    
327        private boolean isSynchronous(MessageExchange exchange) {
328            return isSynchronous()
329                    || (exchange.isTransacted() && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC)));
330        }
331    
332        /**
333         * Retrieve the correlation ID of the given exchange
334         * @param exchange
335         * @param message
336         * @return the correlationID
337         * @throws Exception
338         */
339        protected abstract String getCorrelationID(MessageExchange exchange, NormalizedMessage message) throws Exception;
340    
341        /**
342         * Creates a new empty aggregation.
343         * @param correlationID
344         * @return a newly created aggregation
345         */
346        protected abstract Object createAggregation(String correlationID) throws Exception;
347    
348        /**
349         * Returns the date when the onTimeout method should be called if the aggregation is not completed yet,
350         * or null if the aggregation has no timeout.
351         *
352         * @param aggregate
353         * @return
354         */
355        protected abstract Date getTimeout(Object aggregate);
356    
357        /**
358         * Add a newly received message to this aggregation
359         *
360         * @param aggregate
361         * @param message
362         * @param exchange
363         * @return <code>true</code> if the aggregate id complete
364         */
365        protected abstract boolean addMessage(Object aggregate,
366                                              NormalizedMessage message,
367                                              MessageExchange exchange) throws Exception;
368    
369        /**
370         * Fill the given JBI message with the aggregation result.
371         *
372         * @param aggregate
373         * @param message
374         * @param exchange
375         * @param timeout <code>false</code> if the aggregation has completed or <code>true</code>
376         *                  if this aggregation has timed out
377         */
378        protected abstract void buildAggregate(Object aggregate,
379                                               NormalizedMessage message,
380                                               MessageExchange exchange,
381                                               boolean timeout) throws Exception;
382    }