package org.apache.camel.component.disruptor;

import com.lmax.disruptor.InsufficientCapacityException;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.StreamCache;
import org.apache.camel.WaitForTaskToComplete;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.SynchronizationAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/disruptor/DisruptorProducer.class */
public class DisruptorProducer extends DefaultAsyncProducer {
    private static final Logger LOG = LoggerFactory.getLogger(DisruptorProducer.class);
    private final WaitForTaskToComplete waitForTaskToComplete;
    private final long timeout;
    private final DisruptorEndpoint endpoint;
    private final boolean blockWhenFull;

    public DisruptorProducer(DisruptorEndpoint disruptorEndpoint, WaitForTaskToComplete waitForTaskToComplete, long j, boolean z) {
        super(disruptorEndpoint);
        this.waitForTaskToComplete = waitForTaskToComplete;
        this.timeout = j;
        this.endpoint = disruptorEndpoint;
        this.blockWhenFull = z;
    }

    /* renamed from: getEndpoint, reason: merged with bridge method [inline-methods] */
    public DisruptorEndpoint m8getEndpoint() {
        return this.endpoint;
    }

    protected void doStart() throws Exception {
        m8getEndpoint().onStarted(this);
    }

    protected void doStop() throws Exception {
        m8getEndpoint().onStopped(this);
    }

    public boolean process(Exchange exchange, AsyncCallback asyncCallback) {
        WaitForTaskToComplete waitForTaskToComplete = this.waitForTaskToComplete;
        if (exchange.getProperty("CamelAsyncWait") != null) {
            waitForTaskToComplete = (WaitForTaskToComplete) exchange.getProperty("CamelAsyncWait", WaitForTaskToComplete.class);
        }
        try {
        } catch (Exception e) {
            exchange.setException(e);
        }
        if (waitForTaskToComplete == WaitForTaskToComplete.Always || (waitForTaskToComplete == WaitForTaskToComplete.IfReplyExpected && ExchangeHelper.isOutCapable(exchange))) {
            Exchange prepareCopy = prepareCopy(exchange, false);
            CountDownLatch countDownLatch = new CountDownLatch(1);
            prepareCopy.getExchangeExtension().addOnCompletion(newOnCompletion(exchange, countDownLatch));
            doPublish(prepareCopy);
            if (this.timeout > 0) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Waiting for task to complete using timeout (ms): {} at [{}]", Long.valueOf(this.timeout), this.endpoint.getEndpointUri());
                }
                boolean z = false;
                try {
                    z = countDownLatch.await(this.timeout, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e2) {
                    LOG.info("Interrupted while waiting for the task to complete");
                    Thread.currentThread().interrupt();
                }
                if (!z) {
                    exchange.setProperty(DisruptorEndpoint.DISRUPTOR_IGNORE_EXCHANGE, true);
                    exchange.setException(new ExchangeTimedOutException(exchange, this.timeout));
                    countDownLatch.countDown();
                }
            } else {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Waiting for task to complete (blocking) at [{}]", this.endpoint.getEndpointUri());
                }
                try {
                    countDownLatch.await();
                } catch (InterruptedException e3) {
                    LOG.info("Interrupted while waiting for the task to complete");
                    Thread.currentThread().interrupt();
                }
            }
            exchange.setException(e);
            asyncCallback.done(true);
            return true;
        }
        doPublish(prepareCopy(exchange, true));
        asyncCallback.done(true);
        return true;
    }

    private SynchronizationAdapter newOnCompletion(final Exchange exchange, final CountDownLatch countDownLatch) {
        return new SynchronizationAdapter() { // from class: org.apache.camel.component.disruptor.DisruptorProducer.1
            public void onDone(Exchange exchange2) {
                if (countDownLatch.getCount() == 0) {
                    if (DisruptorProducer.LOG.isTraceEnabled()) {
                        DisruptorProducer.LOG.trace("{}. Timeout occurred so response will be ignored: {}", this, exchange2.getMessage());
                    }
                } else {
                    if (DisruptorProducer.LOG.isTraceEnabled()) {
                        DisruptorProducer.LOG.trace("{} with response: {}", this, exchange2.getMessage());
                    }
                    try {
                        ExchangeHelper.copyResults(exchange, exchange2);
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            }

            public boolean allowHandover() {
                return false;
            }

            public String toString() {
                return "onDone at endpoint: " + String.valueOf(DisruptorProducer.this.endpoint);
            }
        };
    }

    private void doPublish(Exchange exchange) {
        LOG.trace("Publishing Exchange to disruptor ringbuffer: {}", exchange);
        try {
            if (this.blockWhenFull) {
                this.endpoint.publish(exchange);
            } else {
                this.endpoint.tryPublish(exchange);
            }
        } catch (DisruptorNotStartedException e) {
            throw new IllegalStateException("Disruptor was not started", e);
        } catch (InsufficientCapacityException e2) {
            throw new IllegalStateException("Disruptors ringbuffer was full", e2);
        }
    }

    private Exchange prepareCopy(Exchange exchange, boolean z) throws IOException {
        StreamCache copy;
        Exchange createCorrelatedCopy = ExchangeHelper.createCorrelatedCopy(exchange, z);
        createCorrelatedCopy.getExchangeExtension().setFromEndpoint(this.endpoint);
        if (z) {
            Object body = createCorrelatedCopy.getMessage().getBody();
            if ((body instanceof StreamCache) && (copy = ((StreamCache) body).copy(createCorrelatedCopy)) != null) {
                createCorrelatedCopy.getMessage().setBody(copy);
            }
        }
        return createCorrelatedCopy;
    }
}
