001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.servicemix.eip.support.resequence;
018
019 import java.util.Queue;
020 import java.util.Timer;
021
022 import org.apache.commons.logging.Log;
023 import org.apache.commons.logging.LogFactory;
024
025 /**
026 * Resequences elements based on a given {@link SequenceElementComparator}.
027 * This resequencer is designed for resequencing element streams. Resequenced
028 * elements are added to an output {@link Queue}. The resequencer is configured
029 * via the <code>timeout</code> and <code>capacity</code> properties.
030 *
031 * <ul>
032 * <li><code>timeout</code>. Defines the timeout (in milliseconds) for a
033 * given element managed by this resequencer. An out-of-sequence element can
034 * only be marked as <i>ready-for-delivery</i> if it either times out or if it
035 * has an immediate predecessor (in that case it is in-sequence). If an
036 * immediate predecessor of a waiting element arrives the timeout task for the
037 * waiting element will be cancelled (which marks it as <i>ready-for-delivery</i>).
038 * <p>
039 * If the maximum out-of-sequence time between elements within a stream is
040 * known, the <code>timeout</code> value should be set to this value. In this
041 * case it is guaranteed that all elements of a stream will be delivered in
042 * sequence to the output queue. However, large <code>timeout</code> values
043 * might require a very high resequencer <code>capacity</code> which might be
044 * in conflict with available memory resources. The lower the
045 * <code>timeout</code> value is compared to the out-of-sequence time between
046 * elements within a stream the higher the probability is for out-of-sequence
047 * elements delivered by this resequencer.</li>
048 * <li><code>capacity</code>. The capacity of this resequencer.</li>
049 * </ul>
050 *
051 * Whenever a timeout for a certain element occurs or an element has been added
052 * to this resequencer a delivery attempt is started. If a (sub)sequence of
053 * elements is <i>ready-for-delivery</i> then they are added to output queue.
054 * <p>
055 * The resequencer remembers the last-delivered element. If an element arrives
056 * which is the immediate successor of the last-delivered element it will be
057 * delivered immediately and the last-delivered element is adjusted accordingly.
058 * If the last-delivered element is <code>null</code> i.e. the resequencer was
059 * newly created the first arriving element will wait <code>timeout</code>
060 * milliseconds for being delivered to the output queue.
061 *
062 * @author Martin Krasser
063 */
064 public class ResequencerEngine<E> implements TimeoutHandler {
065
066 private static final Log LOG = LogFactory.getLog(ResequencerEngine.class);
067
068 private long timeout;
069
070 private int capacity;
071
072 private Queue<E> outQueue;
073
074 private Element<E> lastDelivered;
075
076 /**
077 * A sequence of elements for sorting purposes.
078 */
079 private Sequence<Element<E>> sequence;
080
081 /**
082 * A timer for scheduling timeout notifications.
083 */
084 private Timer timer;
085
086 /**
087 * Creates a new resequencer instance with a default timeout of 2000
088 * milliseconds. The capacity is set to {@link Integer#MAX_VALUE}.
089 *
090 * @param comparator a sequence element comparator.
091 */
092 public ResequencerEngine(SequenceElementComparator<E> comparator) {
093 this(comparator, Integer.MAX_VALUE);
094 }
095
096 /**
097 * Creates a new resequencer instance with a default timeout of 2000
098 * milliseconds.
099 *
100 * @param comparator a sequence element comparator.
101 * @param capacity the capacity of this resequencer.
102 */
103 public ResequencerEngine(SequenceElementComparator<E> comparator, int capacity) {
104 this.timer = new Timer("Resequencer Timer");
105 this.sequence = createSequence(comparator);
106 this.capacity = capacity;
107 this.timeout = 2000L;
108 this.lastDelivered = null;
109 }
110
111 /**
112 * Stops this resequencer (i.e. this resequencer's {@link Timer} instance).
113 */
114 public void stop() {
115 this.timer.cancel();
116 }
117
118 /**
119 * Returns the output queue.
120 *
121 * @return the output queue.
122 */
123 public Queue<E> getOutQueue() {
124 return outQueue;
125 }
126
127 /**
128 * Sets the output queue.
129 *
130 * @param outQueue output queue.
131 */
132 public void setOutQueue(Queue<E> outQueue) {
133 this.outQueue = outQueue;
134 }
135
136 /**
137 * Returns this resequencer's timeout value.
138 *
139 * @return the timeout in milliseconds.
140 */
141 public long getTimeout() {
142 return timeout;
143 }
144
145 /**
146 * Sets this sequencer's timeout value.
147 *
148 * @param timeout the timeout in milliseconds.
149 */
150 public void setTimeout(long timeout) {
151 this.timeout = timeout;
152 }
153
154 /**
155 * Handles a timeout notification by starting a delivery attempt.
156 *
157 * @param timout timeout task that caused the notification.
158 */
159 public synchronized void timeout(Timeout timout) {
160 try {
161 while (deliver()) {
162 // work done in deliver()
163 }
164 } catch (RuntimeException e) {
165 LOG.error("error during delivery", e);
166 }
167 }
168
169 /**
170 * Adds an element to this resequencer throwing an exception if the maximum
171 * capacity is reached.
172 *
173 * @param o element to be resequenced.
174 * @throws IllegalStateException if the element cannot be added at this time
175 * due to capacity restrictions.
176 */
177 public synchronized void add(E o) {
178 if (sequence.size() >= capacity) {
179 throw new IllegalStateException("maximum capacity is reached");
180 }
181 insert(o);
182 }
183
184 /**
185 * Adds an element to this resequencer waiting, if necessary, until capacity
186 * becomes available.
187 *
188 * @param o element to be resequenced.
189 * @throws InterruptedException if interrupted while waiting.
190 */
191 public synchronized void put(E o) throws InterruptedException {
192 if (sequence.size() >= capacity) {
193 wait();
194 }
195 insert(o);
196 }
197
198 /**
199 * Returns the last delivered element.
200 *
201 * @return the last delivered element or <code>null</code> if no delivery
202 * has been made yet.
203 */
204 E getLastDelivered() {
205 if (lastDelivered == null) {
206 return null;
207 }
208 return lastDelivered.getObject();
209 }
210
211 /**
212 * Sets the last delivered element. This is for testing purposes only.
213 *
214 * @param o an element.
215 */
216 void setLastDelivered(E o) {
217 lastDelivered = new Element<E>(o);
218 }
219
220 /**
221 * Inserts the given element into this resequencing queue (sequence). If the
222 * element is not ready for immediate delivery and has no immediate
223 * presecessor then it is scheduled for timing out. After being timed out it
224 * is ready for delivery.
225 *
226 * @param o an element.
227 */
228 private void insert(E o) {
229 // wrap object into internal element
230 Element<E> element = new Element<E>(o);
231 // add element to sequence in proper order
232 sequence.add(element);
233
234 Element<E> successor = sequence.successor(element);
235
236 // check if there is an immediate successor and cancel
237 // timer task (no need to wait any more for timeout)
238 if (successor != null) {
239 successor.cancel();
240 }
241
242 // start delivery if current element is successor of last delivered element
243 if (successorOfLastDelivered(element)) {
244 // nothing to schedule
245 } else if (sequence.predecessor(element) != null) {
246 // nothing to schedule
247 } else {
248 Timeout t = defineTimeout();
249 element.schedule(t);
250 }
251
252 // start delivery
253 while (deliver()) {
254 // work done in deliver()
255 }
256 }
257
258 /**
259 * Attempts to deliver a single element from the head of the resequencer
260 * queue (sequence). Only elements which have not been scheduled for timing
261 * out or which already timed out can be delivered.
262 *
263 * @return <code>true</code> if the element has been delivered
264 * <code>false</code> otherwise.
265 */
266 private boolean deliver() {
267 if (sequence.size() == 0) {
268 return false;
269 }
270 // inspect element with lowest sequence value
271 Element<E> element = sequence.first();
272
273 // if element is scheduled do not deliver and return
274 if (element.scheduled()) {
275 return false;
276 }
277
278 // remove deliverable element from sequence
279 sequence.remove(element);
280
281 // set the delivered element to last delivered element
282 lastDelivered = element;
283
284 // notify a waiting thread that capacity is available
285 notify();
286
287 // add element to output queue
288 outQueue.add(element.getObject());
289
290 // element has been delivered
291 return true;
292 }
293
294 /**
295 * Returns <code>true</code> if the given element is the immediate
296 * successor of the last delivered element.
297 *
298 * @param element an element.
299 * @return <code>true</code> if the given element is the immediate
300 * successor of the last delivered element.
301 */
302 private boolean successorOfLastDelivered(Element<E> element) {
303 if (lastDelivered == null) {
304 return false;
305 }
306 if (sequence.comparator().successor(element, lastDelivered)) {
307 return true;
308 }
309 return false;
310 }
311
312 /**
313 * Creates a timeout task based on the timeout setting of this resequencer.
314 *
315 * @return a new timeout task.
316 */
317 private Timeout defineTimeout() {
318 Timeout result = new Timeout(timer, timeout);
319 result.addTimeoutHandler(this);
320 return result;
321 }
322
323 private static <E> Sequence<Element<E>> createSequence(SequenceElementComparator<E> comparator) {
324 return new Sequence<Element<E>>(new ElementComparator<E>(comparator));
325 }
326
327 }