001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.servicemix.eip.support.resequence;
018    
019    import java.lang.reflect.InvocationHandler;
020    import java.lang.reflect.Method;
021    import java.lang.reflect.Proxy;
022    import java.util.concurrent.BlockingQueue;
023    
024    import javax.jbi.messaging.MessageExchange;
025    
026    import org.apache.commons.logging.Log;
027    import org.apache.commons.logging.LogFactory;
028    import org.apache.servicemix.executors.Executor;
029    
030    /**
031     * @author Martin Krasser
032     */
033    public class SequenceReader implements Runnable {
034    
035        private static final Log LOG = LogFactory.getLog(SequenceReader.class);
036        
037        private static final MessageExchange STOP = createStopSignal();
038        
039        private BlockingQueue<MessageExchange> queue;
040        
041        private SequenceSender sender;
042        
043        public SequenceReader(SequenceSender sender) {
044            this.sender = sender;
045        }
046        
047        public void setQueue(BlockingQueue<MessageExchange> queue) {
048            this.queue = queue;
049        }
050    
051        public void run() {
052            while (true) {
053                try {
054                    // block until message exchange is available
055                    MessageExchange me = queue.take();
056                    if (me == STOP) {
057                        LOG.info("exit processing loop after cancellation");
058                        return;
059                    }
060                    // send sync to preserve message order
061                    sender.sendSync(me);
062                } catch (InterruptedException e) {
063                    LOG.info("exit processing loop after interrupt");
064                    return;
065                } catch (Exception e) {
066                    // TODO: handle sendSync errors and faults
067                    LOG.error("caught and ignored exception", e);
068                }
069            }
070        }
071        
072        public void start(Executor executor) {
073            executor.execute(this);
074        }
075        
076        public void stop() throws InterruptedException {
077            queue.put(STOP);
078        }
079        
080        private static MessageExchange createStopSignal() {
081            return (MessageExchange)Proxy.newProxyInstance(SequenceReader.class.getClassLoader(), 
082                    new Class[] {MessageExchange.class}, createStopHandler());
083        }
084        
085        private static InvocationHandler createStopHandler() {
086            return new InvocationHandler() {
087                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
088                    throw new IllegalStateException("illegal method invocation on stop signal");
089                }
090            };
091        }
092        
093    }