001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.servicemix.eip.support.resequence;
018    
019    import java.util.Queue;
020    import java.util.Timer;
021    
022    import org.apache.commons.logging.Log;
023    import org.apache.commons.logging.LogFactory;
024    
025    /**
026     * Resequences elements based on a given {@link SequenceElementComparator}.
027     * This resequencer is designed for resequencing element streams. Resequenced
028     * elements are added to an output {@link Queue}. The resequencer is configured
029     * via the <code>timeout</code> and <code>capacity</code> properties.
030     * 
031     * <ul>
032     * <li><code>timeout</code>. Defines the timeout (in milliseconds) for a
033     * given element managed by this resequencer. An out-of-sequence element can
034     * only be marked as <i>ready-for-delivery</i> if it either times out or if it
035     * has an immediate predecessor (in that case it is in-sequence). If an
036     * immediate predecessor of a waiting element arrives the timeout task for the
037     * waiting element will be cancelled (which marks it as <i>ready-for-delivery</i>).
038     * <p>
039     * If the maximum out-of-sequence time between elements within a stream is
040     * known, the <code>timeout</code> value should be set to this value. In this
041     * case it is guaranteed that all elements of a stream will be delivered in
042     * sequence to the output queue. However, large <code>timeout</code> values
043     * might require a very high resequencer <code>capacity</code> which might be
044     * in conflict with available memory resources. The lower the
045     * <code>timeout</code> value is compared to the out-of-sequence time between
046     * elements within a stream the higher the probability is for out-of-sequence
047     * elements delivered by this resequencer.</li>
048     * <li><code>capacity</code>. The capacity of this resequencer.</li>
049     * </ul>
050     * 
051     * Whenever a timeout for a certain element occurs or an element has been added
052     * to this resequencer a delivery attempt is started. If a (sub)sequence of
053     * elements is <i>ready-for-delivery</i> then they are added to output queue.
054     * <p>
055     * The resequencer remembers the last-delivered element. If an element arrives
056     * which is the immediate successor of the last-delivered element it will be
057     * delivered immediately and the last-delivered element is adjusted accordingly.
058     * If the last-delivered element is <code>null</code> i.e. the resequencer was
059     * newly created the first arriving element will wait <code>timeout</code>
060     * milliseconds for being delivered to the output queue.
061     * 
062     * @author Martin Krasser
063     */
064    public class ResequencerEngine<E> implements TimeoutHandler {
065    
066        private static final Log LOG = LogFactory.getLog(ResequencerEngine.class);
067        
068        private long timeout;
069        
070        private int capacity;
071        
072        private Queue<E> outQueue;
073        
074        private Element<E> lastDelivered;
075    
076        /**
077         * A sequence of elements for sorting purposes.
078         */
079        private Sequence<Element<E>> sequence;
080        
081        /**
082         * A timer for scheduling timeout notifications.
083         */
084        private Timer timer;
085        
086        /**
087         * Creates a new resequencer instance with a default timeout of 2000
088         * milliseconds. The capacity is set to {@link Integer#MAX_VALUE}.
089         * 
090         * @param comparator a sequence element comparator.
091         */
092        public ResequencerEngine(SequenceElementComparator<E> comparator) {
093            this(comparator, Integer.MAX_VALUE);
094        }
095    
096        /**
097         * Creates a new resequencer instance with a default timeout of 2000
098         * milliseconds.
099         * 
100         * @param comparator a sequence element comparator.
101         * @param capacity the capacity of this resequencer.
102         */
103        public ResequencerEngine(SequenceElementComparator<E> comparator, int capacity) {
104            this.timer = new Timer("Resequencer Timer");
105            this.sequence = createSequence(comparator);
106            this.capacity = capacity;
107            this.timeout = 2000L;
108            this.lastDelivered = null;
109        }
110        
111        /**
112         * Stops this resequencer (i.e. this resequencer's {@link Timer} instance).
113         */
114        public void stop() {
115            this.timer.cancel();
116        }
117        
118        /**
119         * Returns the output queue.
120         * 
121         * @return the output queue.
122         */
123        public Queue<E> getOutQueue() {
124            return outQueue;
125        }
126    
127        /**
128         * Sets the output queue.
129         * 
130         * @param outQueue output queue.
131         */
132        public void setOutQueue(Queue<E> outQueue) {
133            this.outQueue = outQueue;
134        }
135    
136        /**
137         * Returns this resequencer's timeout value.
138         * 
139         * @return the timeout in milliseconds.
140         */
141        public long getTimeout() {
142            return timeout;
143        }
144    
145        /**
146         * Sets this sequencer's timeout value.
147         * 
148         * @param timeout the timeout in milliseconds.
149         */
150        public void setTimeout(long timeout) {
151            this.timeout = timeout;
152        }
153    
154        /** 
155         * Handles a timeout notification by starting a delivery attempt.
156         * 
157         * @param timout timeout task that caused the notification.
158         */
159        public synchronized void timeout(Timeout timout) {
160            try {
161                while (deliver()) {
162                    // work done in deliver()
163                }
164            } catch (RuntimeException e) {
165                LOG.error("error during delivery", e);
166            }
167        }
168    
169        /**
170         * Adds an element to this resequencer throwing an exception if the maximum
171         * capacity is reached.
172         * 
173         * @param o element to be resequenced.
174         * @throws IllegalStateException if the element cannot be added at this time
175         *         due to capacity restrictions.
176         */
177        public synchronized void add(E o) {
178            if (sequence.size() >= capacity) {
179                throw new IllegalStateException("maximum capacity is reached");
180            }
181            insert(o);
182        }
183        
184        /**
185         * Adds an element to this resequencer waiting, if necessary, until capacity
186         * becomes available.
187         * 
188         * @param o element to be resequenced.
189         * @throws InterruptedException if interrupted while waiting.
190         */
191        public synchronized void put(E o) throws InterruptedException {
192            if (sequence.size() >= capacity) {
193                wait();
194            }
195            insert(o);
196        }
197        
198        /**
199         * Returns the last delivered element.
200         * 
201         * @return the last delivered element or <code>null</code> if no delivery
202         *         has been made yet.
203         */
204        E getLastDelivered() {
205            if (lastDelivered == null) {
206                return null;
207            }
208            return lastDelivered.getObject();
209        }
210        
211        /**
212         * Sets the last delivered element. This is for testing purposes only.
213         * 
214         * @param o an element.
215         */
216        void setLastDelivered(E o) {
217            lastDelivered = new Element<E>(o);
218        }
219        
220        /**
221         * Inserts the given element into this resequencing queue (sequence). If the
222         * element is not ready for immediate delivery and has no immediate
223         * presecessor then it is scheduled for timing out. After being timed out it
224         * is ready for delivery.
225         * 
226         * @param o an element.
227         */
228        private void insert(E o) {
229            // wrap object into internal element
230            Element<E> element = new Element<E>(o);
231            // add element to sequence in proper order
232            sequence.add(element);
233    
234            Element<E> successor = sequence.successor(element);
235            
236            // check if there is an immediate successor and cancel
237            // timer task (no need to wait any more for timeout)
238            if (successor != null) {
239                successor.cancel();
240            }
241            
242            // start delivery if current element is successor of last delivered element
243            if (successorOfLastDelivered(element)) {
244                // nothing to schedule
245            } else if (sequence.predecessor(element) != null) {
246                // nothing to schedule
247            } else {
248                Timeout t = defineTimeout();
249                element.schedule(t);
250            }
251            
252            // start delivery
253            while (deliver()) {
254                // work done in deliver()
255            }
256        }
257        
258        /**
259         * Attempts to deliver a single element from the head of the resequencer
260         * queue (sequence). Only elements which have not been scheduled for timing
261         * out or which already timed out can be delivered.
262         * 
263         * @return <code>true</code> if the element has been delivered
264         *         <code>false</code> otherwise.
265         */
266        private boolean deliver() {
267            if (sequence.size() == 0) {
268                return false;
269            }
270            // inspect element with lowest sequence value
271            Element<E> element = sequence.first();
272            
273            // if element is scheduled do not deliver and return
274            if (element.scheduled()) {
275                return false;
276            }
277            
278            // remove deliverable element from sequence
279            sequence.remove(element);
280    
281            // set the delivered element to last delivered element
282            lastDelivered = element;
283            
284            // notify a waiting thread that capacity is available
285            notify();
286            
287            // add element to output queue
288            outQueue.add(element.getObject());
289    
290            // element has been delivered
291            return true;
292        }
293        
294        /**
295         * Returns <code>true</code> if the given element is the immediate
296         * successor of the last delivered element.
297         * 
298         * @param element an element.
299         * @return <code>true</code> if the given element is the immediate
300         *         successor of the last delivered element.
301         */
302        private boolean successorOfLastDelivered(Element<E> element) {
303            if (lastDelivered == null) {
304                return false;
305            }
306            if (sequence.comparator().successor(element, lastDelivered)) {
307                return true;
308            }
309            return false;
310        }
311        
312        /**
313         * Creates a timeout task based on the timeout setting of this resequencer.
314         * 
315         * @return a new timeout task.
316         */
317        private Timeout defineTimeout() {
318            Timeout result = new Timeout(timer, timeout);
319            result.addTimeoutHandler(this);
320            return result;
321        }
322        
323        private static <E> Sequence<Element<E>> createSequence(SequenceElementComparator<E> comparator) {
324            return new Sequence<Element<E>>(new ElementComparator<E>(comparator));
325        }
326        
327    }