package org.apache.camel.component.seda;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.util.concurrent.ExecutorServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/apache/camel/component/seda/SedaConsumer.class */
public class SedaConsumer extends ServiceSupport implements Consumer, Runnable {
    private static final transient Log LOG = LogFactory.getLog(SedaConsumer.class);
    private SedaEndpoint endpoint;
    private Processor processor;
    private ExecutorService executor;
    private ExceptionHandler exceptionHandler;

    public SedaConsumer(SedaEndpoint sedaEndpoint, Processor processor) {
        this.endpoint = sedaEndpoint;
        this.processor = processor;
    }

    public String toString() {
        return "SedaConsumer[" + this.endpoint.getEndpointUri() + "]";
    }

    @Override // org.apache.camel.Consumer
    public Endpoint getEndpoint() {
        return this.endpoint;
    }

    public ExceptionHandler getExceptionHandler() {
        if (this.exceptionHandler == null) {
            this.exceptionHandler = new LoggingExceptionHandler(getClass());
        }
        return this.exceptionHandler;
    }

    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
        this.exceptionHandler = exceptionHandler;
    }

    @Override // java.lang.Runnable
    public void run() {
        BlockingQueue<Exchange> queue = this.endpoint.getQueue();
        while (queue != null && isRunAllowed()) {
            try {
                Exchange poll = queue.poll(1000L, TimeUnit.MILLISECONDS);
                if (poll != null) {
                    if (isRunAllowed()) {
                        try {
                            this.processor.process(poll);
                        } catch (Exception e) {
                            getExceptionHandler().handleException(e);
                        }
                    } else {
                        LOG.warn("This consumer is stopped during polling an exchange, so putting it back on the seda queue: " + poll);
                        try {
                            queue.put(poll);
                        } catch (InterruptedException e2) {
                            LOG.debug("Sleep interrupted, are we stopping? " + (isStopping() || isStopped()));
                        }
                    }
                }
            } catch (InterruptedException e3) {
                LOG.debug("Sleep interrupted, are we stopping? " + (isStopping() || isStopped()));
            }
        }
    }

    @Override // org.apache.camel.impl.ServiceSupport
    protected void doStart() throws Exception {
        int concurrentConsumers = this.endpoint.getConcurrentConsumers();
        this.executor = ExecutorServiceHelper.newFixedThreadPool(concurrentConsumers, this.endpoint.getEndpointUri(), true);
        for (int i = 0; i < concurrentConsumers; i++) {
            this.executor.execute(this);
        }
        this.endpoint.onStarted(this);
    }

    @Override // org.apache.camel.impl.ServiceSupport
    protected void doStop() throws Exception {
        this.endpoint.onStopped(this);
        this.executor.shutdownNow();
        this.executor = null;
    }
}
