001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.servicemix.eip.patterns;
018
019 import java.util.List;
020 import java.util.concurrent.BlockingQueue;
021 import java.util.concurrent.LinkedBlockingQueue;
022
023 import javax.jbi.messaging.ExchangeStatus;
024 import javax.jbi.messaging.MessageExchange;
025 import javax.jbi.messaging.MessagingException;
026 import javax.jbi.messaging.NormalizedMessage;
027
028 import org.apache.servicemix.eip.support.resequence.DefaultComparator;
029 import org.apache.servicemix.eip.support.resequence.ResequencerBase;
030 import org.apache.servicemix.eip.support.resequence.ResequencerEngine;
031 import org.apache.servicemix.eip.support.resequence.SequenceElementComparator;
032 import org.apache.servicemix.eip.support.resequence.SequenceReader;
033 import org.apache.servicemix.eip.support.resequence.SequenceSender;
034 import org.apache.servicemix.executors.Executor;
035
036 /**
037 * @author Martin Krasser
038 *
039 * @org.apache.xbean.XBean element="resequencer"
040 */
041 public class Resequencer extends ResequencerBase implements SequenceSender {
042
043 private ResequencerEngine<MessageExchange> reseq;
044
045 private SequenceReader reader;
046
047 private Executor executor;
048
049 private int capacity;
050
051 private long timeout;
052
053 private SequenceElementComparator<MessageExchange> comparator;
054
055 public Resequencer() {
056 this.reader = new SequenceReader(this);
057 this.comparator = new DefaultComparator();
058 }
059
060 public void setCapacity(int capacity) {
061 this.capacity = capacity;
062 }
063
064 public void setTimeout(long timeout) {
065 this.timeout = timeout;
066 }
067
068 public void setComparator(SequenceElementComparator<MessageExchange> comparator) {
069 this.comparator = comparator;
070 }
071
072 @Override
073 public void start() throws Exception {
074 super.start();
075 if (executor == null) {
076 executor = getServiceUnit().getComponent().getExecutor();
077 }
078 BlockingQueue<MessageExchange> queue = new LinkedBlockingQueue<MessageExchange>();
079 reseq = new ResequencerEngine<MessageExchange>(comparator, capacity);
080 reseq.setTimeout(timeout);
081 reseq.setOutQueue(queue);
082 reader.setQueue(queue);
083 reader.start(executor);
084 }
085
086 @Override
087 public void stop() throws Exception {
088 reseq.stop();
089 reader.stop();
090 super.stop();
091 }
092
093 public void sendSync(MessageExchange exchange) throws MessagingException {
094 super.sendSync(exchange);
095 }
096
097 public void sendSync(List<MessageExchange> exchanges) throws MessagingException {
098 for (MessageExchange exchange : exchanges) {
099 sendSync(exchange);
100 }
101 }
102
103 @Override
104 protected void processSync(MessageExchange exchange) throws Exception {
105 fail(exchange, new UnsupportedOperationException("synchronous resequencing not supported"));
106 }
107
108 @Override
109 protected void processAsync(MessageExchange exchange) throws Exception {
110 validateMessageExchange(exchange);
111 if (exchange.getStatus() == ExchangeStatus.DONE) {
112 return;
113 } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
114 return;
115 } else if (exchange.getFault() != null) {
116 done(exchange);
117 return;
118 }
119 processMessage(exchange);
120 done(exchange);
121 }
122
123 private void processMessage(MessageExchange sourceExchange) throws MessagingException, InterruptedException {
124 NormalizedMessage source = sourceExchange.getMessage("in");
125 NormalizedMessage copy = getMessageCopier().transform(sourceExchange, source);
126 MessageExchange targetExchange = createTargetExchange(copy, sourceExchange.getPattern());
127 // add target exchange to resequencer (blocking if capacity is reached)
128 reseq.put(targetExchange);
129 }
130
131 }