/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.rabbitmq.reply;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Connection;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.TimeoutMap;
import org.apache.camel.component.rabbitmq.RabbitMQEndpoint;
import org.apache.camel.component.rabbitmq.RabbitMQMessageConverter;
import org.apache.camel.component.rabbitmq.reply.CorrelationTimeoutMap;
import org.apache.camel.component.rabbitmq.reply.QueueReplyHandler;
import org.apache.camel.component.rabbitmq.reply.ReplyHandler;
import org.apache.camel.component.rabbitmq.reply.ReplyHolder;
import org.apache.camel.component.rabbitmq.reply.ReplyManager;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.support.task.ForegroundTask;
import org.apache.camel.support.task.Tasks;
import org.apache.camel.support.task.budget.Budgets;
import org.apache.camel.support.task.budget.IterationBudget;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ReplyManagerSupport
extends ServiceSupport
implements ReplyManager {
    private static final int CLOSE_TIMEOUT = 30000;
    private static final Logger LOG = LoggerFactory.getLogger(ReplyManagerSupport.class);
    protected final CamelContext camelContext;
    protected final CountDownLatch replyToLatch = new CountDownLatch(1);
    protected final long replyToTimeout = 1000L;
    protected ScheduledExecutorService executorService;
    protected RabbitMQEndpoint endpoint;
    protected String replyTo;
    protected Connection listenerContainer;
    protected TimeoutMap<String, ReplyHandler> correlation;
    private final RabbitMQMessageConverter messageConverter = new RabbitMQMessageConverter();

    public ReplyManagerSupport(CamelContext camelContext) {
        this.camelContext = camelContext;
    }

    @Override
    public void setScheduledExecutorService(ScheduledExecutorService executorService) {
        this.executorService = executorService;
    }

    @Override
    public void setEndpoint(RabbitMQEndpoint endpoint) {
        this.endpoint = endpoint;
    }

    @Override
    public void setReplyTo(String replyTo) {
        LOG.debug("ReplyTo destination: {}", (Object)replyTo);
        this.replyTo = replyTo;
        this.replyToLatch.countDown();
    }

    @Override
    public String getReplyTo() {
        if (this.replyTo != null) {
            return this.replyTo;
        }
        try {
            LOG.trace("Waiting for replyTo to be set");
            boolean done = this.replyToLatch.await(1000L, TimeUnit.MILLISECONDS);
            if (!done) {
                LOG.warn("ReplyTo destination was not set and timeout occurred");
            } else {
                LOG.trace("Waiting for replyTo to be set done");
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        return this.replyTo;
    }

    @Override
    public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, String originalCorrelationId, String correlationId, long requestTimeout) {
        QueueReplyHandler handler = new QueueReplyHandler(replyManager, exchange, callback, originalCorrelationId, correlationId, requestTimeout);
        ReplyHandler result = (ReplyHandler)this.correlation.putIfAbsent((Object)correlationId, (Object)handler, requestTimeout);
        if (result != null) {
            String logMessage = String.format("The correlationId [%s] is not unique.", correlationId);
            throw new IllegalArgumentException(logMessage);
        }
        return correlationId;
    }

    protected abstract ReplyHandler createReplyHandler(ReplyManager var1, Exchange var2, AsyncCallback var3, String var4, String var5, long var6);

    @Override
    public void cancelCorrelationId(String correlationId) {
        ReplyHandler handler = (ReplyHandler)this.correlation.get((Object)correlationId);
        if (handler != null) {
            LOG.warn("Cancelling correlationID: {}", (Object)correlationId);
            this.correlation.remove((Object)correlationId);
        }
    }

    public void onMessage(AMQP.BasicProperties properties, byte[] message) {
        String correlationID = properties.getCorrelationId();
        if (correlationID == null) {
            LOG.warn("Ignoring message with no correlationID: {}", (Object)message);
            return;
        }
        LOG.debug("Received reply message with correlationID [{}] -> {}", (Object)correlationID, (Object)message);
        this.handleReplyMessage(correlationID, properties, message);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void processReply(ReplyHolder holder) {
        if (holder != null && this.isRunAllowed()) {
            try {
                Exchange exchange = holder.getExchange();
                boolean timeout = holder.isTimeout();
                if (timeout) {
                    if (LOG.isWarnEnabled()) {
                        LOG.warn("Timeout occurred after {} millis waiting for reply message with correlationID [{}] on destination {}. Setting ExchangeTimedOutException on {} and continue routing.", new Object[]{holder.getRequestTimeout(), holder.getCorrelationId(), this.replyTo, ExchangeHelper.logIds((Exchange)exchange)});
                    }
                    String msg = "reply message with correlationID: " + holder.getCorrelationId() + " not received on destination: " + this.replyTo;
                    exchange.setException((Throwable)new ExchangeTimedOutException(exchange, holder.getRequestTimeout(), msg));
                } else {
                    this.messageConverter.populateRabbitExchange(exchange, null, holder.getProperties(), holder.getMessage(), true, this.endpoint.isAllowMessageBodySerialization());
                    if (holder.getOriginalCorrelationId() != null) {
                        if (exchange.hasOut()) {
                            exchange.getOut().setHeader("CamelRabbitmqCorrelationId", (Object)holder.getOriginalCorrelationId());
                        } else {
                            exchange.getIn().setHeader("CamelRabbitmqCorrelationId", (Object)holder.getOriginalCorrelationId());
                        }
                    }
                }
            }
            finally {
                AsyncCallback callback = holder.getCallback();
                callback.done(false);
            }
        }
    }

    protected abstract void handleReplyMessage(String var1, AMQP.BasicProperties var2, byte[] var3);

    protected abstract Connection createListenerContainer() throws Exception;

    protected ReplyHandler waitForProvisionCorrelationToBeUpdated(String correlationID, byte[] message) {
        if (LOG.isWarnEnabled()) {
            LOG.warn("Early reply received with correlationID [{}] -> {}", (Object)correlationID, (Object)message);
        }
        ForegroundTask task = Tasks.foregroundTask().withBudget((IterationBudget)Budgets.iterationBudget().withMaxIterations(50).withInterval(Duration.ofMillis(100L)).build()).build();
        return task.run(() -> (ReplyHandler)this.correlation.get((Object)correlationID), answer -> answer != null).orElse(null);
    }

    protected void doStart() throws Exception {
        ObjectHelper.notNull((Object)this.executorService, (String)"executorService", (Object)this);
        ObjectHelper.notNull((Object)((Object)this.endpoint), (String)"endpoint", (Object)this);
        this.messageConverter.setAllowNullHeaders(this.endpoint.isAllowNullHeaders());
        LOG.debug("Using timeout checker interval with {} millis", (Object)this.endpoint.getRequestTimeoutCheckerInterval());
        this.correlation = new CorrelationTimeoutMap(this.executorService, this.endpoint.getRequestTimeoutCheckerInterval());
        ServiceHelper.startService(this.correlation);
        this.listenerContainer = this.createListenerContainer();
        LOG.debug("Using executor {}", (Object)this.executorService);
    }

    protected void doStop() throws Exception {
        ServiceHelper.stopService(this.correlation);
        if (this.listenerContainer != null) {
            LOG.debug("Closing connection: {} with timeout: {} ms.", (Object)this.listenerContainer, (Object)30000);
            this.listenerContainer.close(30000);
            this.listenerContainer = null;
        }
        if (this.executorService != null) {
            this.camelContext.getExecutorServiceManager().shutdownGraceful((ExecutorService)this.executorService);
            this.executorService = null;
        }
    }
}

