001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.servicemix.eip.patterns;
018    
019    import java.util.List;
020    import java.util.concurrent.BlockingQueue;
021    import java.util.concurrent.LinkedBlockingQueue;
022    
023    import javax.jbi.messaging.ExchangeStatus;
024    import javax.jbi.messaging.MessageExchange;
025    import javax.jbi.messaging.MessagingException;
026    import javax.jbi.messaging.NormalizedMessage;
027    
028    import org.apache.servicemix.eip.support.resequence.DefaultComparator;
029    import org.apache.servicemix.eip.support.resequence.ResequencerBase;
030    import org.apache.servicemix.eip.support.resequence.ResequencerEngine;
031    import org.apache.servicemix.eip.support.resequence.SequenceElementComparator;
032    import org.apache.servicemix.eip.support.resequence.SequenceReader;
033    import org.apache.servicemix.eip.support.resequence.SequenceSender;
034    import org.apache.servicemix.executors.Executor;
035    
036    /**
037     * @author Martin Krasser
038     * 
039     * @org.apache.xbean.XBean element="resequencer"
040     */
041    public class Resequencer extends ResequencerBase implements SequenceSender {
042    
043        private ResequencerEngine<MessageExchange> reseq;
044        
045        private SequenceReader reader;
046    
047        private Executor executor;
048    
049        private int capacity;
050        
051        private long timeout;
052        
053        private SequenceElementComparator<MessageExchange> comparator;
054        
055        public Resequencer() {
056            this.reader = new SequenceReader(this);
057            this.comparator = new DefaultComparator();
058        }
059        
060        public void setCapacity(int capacity) {
061            this.capacity = capacity;
062        }
063        
064        public void setTimeout(long timeout) {
065            this.timeout = timeout;
066        }
067    
068        public void setComparator(SequenceElementComparator<MessageExchange> comparator) {
069            this.comparator = comparator;
070        }
071        
072        @Override
073        public void start() throws Exception {
074            super.start();
075            if (executor == null) {
076                executor = getServiceUnit().getComponent().getExecutor();
077            }
078            BlockingQueue<MessageExchange> queue = new LinkedBlockingQueue<MessageExchange>();
079            reseq = new ResequencerEngine<MessageExchange>(comparator, capacity);
080            reseq.setTimeout(timeout);
081            reseq.setOutQueue(queue);
082            reader.setQueue(queue);
083            reader.start(executor);
084        }
085    
086        @Override
087        public void stop() throws Exception {
088            reseq.stop();
089            reader.stop();
090            super.stop();
091        }
092        
093        public void sendSync(MessageExchange exchange) throws MessagingException {
094            super.sendSync(exchange);
095        }
096        
097        public void sendSync(List<MessageExchange> exchanges) throws MessagingException {
098            for (MessageExchange exchange : exchanges) {
099                sendSync(exchange);
100            }
101        }
102    
103        @Override
104        protected void processSync(MessageExchange exchange) throws Exception {
105            fail(exchange, new UnsupportedOperationException("synchronous resequencing not supported"));
106        }
107    
108        @Override
109        protected void processAsync(MessageExchange exchange) throws Exception {
110            validateMessageExchange(exchange);
111            if (exchange.getStatus() == ExchangeStatus.DONE) {
112                return;
113            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
114                return;
115            } else if (exchange.getFault() != null) {
116                done(exchange);
117                return;
118            }
119            processMessage(exchange);
120            done(exchange);
121        }
122    
123        private void processMessage(MessageExchange sourceExchange) throws MessagingException, InterruptedException {
124            NormalizedMessage source = sourceExchange.getMessage("in");
125            NormalizedMessage copy = getMessageCopier().transform(sourceExchange, source);
126            MessageExchange targetExchange = createTargetExchange(copy, sourceExchange.getPattern());
127            // add target exchange to resequencer (blocking if capacity is reached)
128            reseq.put(targetExchange);
129        }
130        
131    }