/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeoutException;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.rabbitmq.RabbitMQConsumer;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.support.task.BlockingTask;
import org.apache.camel.support.task.Tasks;
import org.apache.camel.support.task.budget.Budgets;
import org.apache.camel.support.task.budget.TimeBudget;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class RabbitConsumer
extends ServiceSupport
implements Consumer {
    private static final Logger LOG = LoggerFactory.getLogger(RabbitConsumer.class);
    private final RabbitMQConsumer consumer;
    private Channel channel;
    private String tag;
    private volatile String consumerTag;
    private final Semaphore lock = new Semaphore(1);

    RabbitConsumer(RabbitMQConsumer consumer) {
        this.consumer = consumer;
        try {
            Connection conn = consumer.getConnection();
            this.channel = this.openChannel(conn);
        }
        catch (IOException | TimeoutException e) {
            LOG.warn("Unable to open channel for RabbitMQConsumer. Continuing and will try again", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        try {
            if (!this.consumer.getEndpoint().isAutoAck()) {
                this.lock.acquire();
            }
            if (!this.channel.isOpen()) {
                if (!this.consumer.getEndpoint().isAutoAck()) {
                    this.lock.release();
                }
                return;
            }
            Exchange exchange = this.consumer.createExchange(envelope, properties, body);
            try {
                this.consumer.getEndpoint().getMessageConverter().mergeAmqpProperties(exchange, properties);
                this.doHandleDelivery(exchange, envelope, properties);
            }
            finally {
                if (!this.consumer.getEndpoint().isAutoAck()) {
                    this.lock.release();
                }
                this.consumer.releaseExchange(exchange, false);
            }
        }
        catch (InterruptedException e) {
            LOG.warn("Thread Interrupted!");
        }
    }

    public void doHandleDelivery(Exchange exchange, Envelope envelope, AMQP.BasicProperties properties) throws IOException {
        boolean sendReply;
        boolean bl = sendReply = properties.getReplyTo() != null;
        if (sendReply && !exchange.getPattern().isOutCapable()) {
            LOG.debug("In an inOut capable route");
            exchange.setPattern(ExchangePattern.InOut);
        }
        LOG.trace("Created exchange [exchange={}]", (Object)exchange);
        long deliveryTag = envelope.getDeliveryTag();
        try {
            this.consumer.getProcessor().process(exchange);
        }
        catch (Exception e) {
            exchange.setException((Throwable)e);
        }
        Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
        if (exchange.getException() != null) {
            this.consumer.getExceptionHandler().handleException("Error processing exchange", exchange, (Throwable)exchange.getException());
        }
        if (!exchange.isFailed()) {
            if (sendReply && exchange.getPattern().isOutCapable()) {
                try {
                    this.consumer.getEndpoint().publishExchangeToChannel(exchange, this.channel, properties.getReplyTo());
                }
                catch (AlreadyClosedException alreadyClosedException) {
                    LOG.warn("Connection or channel closed during reply to exchange {} for correlationId {}. Will reconnect and try again.", (Object)exchange.getExchangeId(), (Object)properties.getCorrelationId());
                    try {
                        this.reconnect();
                        LOG.debug("Sending again the reply to exchange {} for correlationId {}", (Object)exchange.getExchangeId(), (Object)properties.getCorrelationId());
                        this.consumer.getEndpoint().publishExchangeToChannel(exchange, this.channel, properties.getReplyTo());
                    }
                    catch (Exception e) {
                        LOG.error("Couldn't sending again the reply to exchange {} for correlationId {}", (Object)exchange.getExchangeId(), (Object)properties.getCorrelationId());
                        exchange.setException((Throwable)e);
                        this.consumer.getExceptionHandler().handleException("Error processing exchange", exchange, (Throwable)e);
                    }
                }
                catch (RuntimeCamelException e) {
                    exchange.setException((Throwable)e);
                    this.consumer.getExceptionHandler().handleException("Error processing exchange", exchange, (Throwable)e);
                }
            }
            if (!this.consumer.getEndpoint().isAutoAck()) {
                LOG.trace("Acknowledging receipt [delivery_tag={}]", (Object)deliveryTag);
                this.channel.basicAck(deliveryTag, false);
            }
        }
        if (exchange.isFailed()) {
            if (this.consumer.getEndpoint().isTransferException() && exchange.getPattern().isOutCapable()) {
                msg.setBody((Object)exchange.getException());
                exchange.setOut(msg);
                exchange.getOut().setHeader("CamelRabbitmqCorrelationId", exchange.getIn().getHeader("CamelRabbitmqCorrelationId"));
                try {
                    this.consumer.getEndpoint().publishExchangeToChannel(exchange, this.channel, properties.getReplyTo());
                }
                catch (RuntimeCamelException e) {
                    this.consumer.getExceptionHandler().handleException("Error processing exchange", exchange, (Throwable)e);
                }
                if (!this.consumer.getEndpoint().isAutoAck()) {
                    LOG.trace("Acknowledging receipt when transferring exception [delivery_tag={}]", (Object)deliveryTag);
                    this.channel.basicAck(deliveryTag, false);
                }
            } else {
                boolean isRequeueHeaderSet = this.consumer.getEndpoint().isReQueue();
                try {
                    isRequeueHeaderSet = (Boolean)msg.getHeader("CamelRabbitmqRequeue", (Object)isRequeueHeaderSet, Boolean.TYPE);
                    LOG.trace("Consumer requeue property is overridden using the message header requeue property as: {}", (Object)isRequeueHeaderSet);
                }
                catch (Exception exception) {
                    // empty catch block
                }
                if (deliveryTag != 0L && !this.consumer.getEndpoint().isAutoAck()) {
                    LOG.trace("Rejecting receipt [delivery_tag={}] with requeue={}", (Object)deliveryTag, (Object)isRequeueHeaderSet);
                    if (isRequeueHeaderSet) {
                        this.channel.basicReject(deliveryTag, true);
                    } else {
                        this.channel.basicReject(deliveryTag, false);
                    }
                }
            }
        }
    }

    protected void doStart() throws Exception {
        if (this.channel == null) {
            throw new IOException("The RabbitMQ channel is not open");
        }
        this.tag = this.channel.basicConsume(this.consumer.getEndpoint().getQueue(), this.consumer.getEndpoint().isAutoAck(), this.consumer.getEndpoint().getConsumerTag(), false, this.consumer.getEndpoint().isExclusiveConsumer(), null, (Consumer)this);
    }

    protected void doStop() throws Exception {
        if (this.channel == null) {
            return;
        }
        if (this.tag != null && this.isChannelOpen()) {
            this.channel.basicCancel(this.tag);
        }
        try {
            this.lock.acquire();
            if (this.isChannelOpen()) {
                this.channel.close();
            }
        }
        catch (TimeoutException e) {
            LOG.error("Timeout occurred");
            throw e;
        }
        catch (InterruptedException e1) {
            LOG.error("Thread Interrupted!");
        }
        finally {
            this.lock.release();
        }
    }

    public void handleConsumeOk(String consumerTag) {
        this.consumerTag = consumerTag;
    }

    public String getConsumerTag() {
        return this.consumerTag;
    }

    public void handleCancelOk(String consumerTag) {
        LOG.debug("Received cancelOk signal on the rabbitMQ channel");
    }

    public void handleCancel(String consumerTag) throws IOException {
        LOG.debug("Received cancel signal on the rabbitMQ channel.");
        try {
            this.channel.basicCancel(this.tag);
        }
        catch (Exception exception) {
            // empty catch block
        }
        this.consumer.getEndpoint().declareExchangeAndQueue(this.channel);
        try {
            this.start();
        }
        catch (Exception e) {
            throw new IOException("Error starting consumer", e);
        }
    }

    private boolean doReconnect() {
        if (this.isStopping()) {
            return true;
        }
        try {
            this.reconnect();
            return true;
        }
        catch (Exception e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Unable to obtain a RabbitMQ channel. Will try again. Caused by: {}.", (Object)e.getMessage());
            } else {
                LOG.warn("Unable to obtain a RabbitMQ channel. Will try again. Caused by: {}. Stacktrace logged at DEBUG logging level.", (Object)e.getMessage());
            }
            return false;
        }
    }

    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
        LOG.info("Received shutdown signal on the rabbitMQ channel");
        if (sig.isInitiatedByApplication()) {
            LOG.debug("Nothing to do because the consumer closed the connection");
            return;
        }
        Integer networkRecoveryInterval = this.consumer.getEndpoint().getNetworkRecoveryInterval();
        long connectionRetryInterval = networkRecoveryInterval != null && networkRecoveryInterval > 0 ? (long)networkRecoveryInterval.intValue() : 100L;
        String taskName = "shutdown-handler";
        ScheduledExecutorService service = this.consumer.getEndpoint().createScheduledExecutor(taskName);
        BlockingTask task = (BlockingTask)Tasks.backgroundTask().withBudget((TimeBudget)Budgets.timeBudget().withUnlimitedDuration().withInterval(Duration.ofMillis(connectionRetryInterval)).build()).withScheduledExecutor(service).withName(taskName).build();
        task.run(this::doReconnect);
    }

    public void handleRecoverOk(String consumerTag) {
        LOG.debug("Received recover ok signal on the rabbitMQ channel");
    }

    public void reconnect() throws Exception {
        if (this.isChannelOpen()) {
            this.start();
            return;
        }
        if (this.channel != null && !this.channel.isOpen() && this.isAutomaticRecoveryEnabled()) {
            throw new IOException("Waiting for channel to re-open.");
        }
        if (this.channel == null || !this.isAutomaticRecoveryEnabled()) {
            LOG.info("Attempting to open a new rabbitMQ channel");
            Connection conn = this.consumer.getConnection();
            try {
                this.stop();
            }
            finally {
                this.channel = this.openChannel(conn);
                this.start();
            }
        }
    }

    private boolean isAutomaticRecoveryEnabled() {
        return this.consumer.getEndpoint().getAutomaticRecoveryEnabled() != null && this.consumer.getEndpoint().getAutomaticRecoveryEnabled() != false;
    }

    private boolean isChannelOpen() {
        return this.channel != null && this.channel.isOpen();
    }

    private Channel openChannel(Connection conn) throws IOException {
        LOG.trace("Creating channel...");
        Channel channel = conn.createChannel();
        LOG.debug("Created channel: {}", (Object)channel);
        if (this.consumer.getEndpoint().isPrefetchEnabled()) {
            channel.basicQos(this.consumer.getEndpoint().getPrefetchSize(), this.consumer.getEndpoint().getPrefetchCount(), this.consumer.getEndpoint().isPrefetchGlobal());
        }
        if (this.consumer.getEndpoint().isDeclare()) {
            try {
                this.consumer.getEndpoint().declareExchangeAndQueue(channel);
            }
            catch (IOException e) {
                if (channel != null && channel.isOpen()) {
                    try {
                        channel.close();
                    }
                    catch (Exception innerEx) {
                        e.addSuppressed(innerEx);
                    }
                }
                if (this.consumer.getEndpoint().isRecoverFromDeclareException()) {
                    throw e;
                }
                throw new RuntimeCamelException("Unrecoverable error when attempting to declare exchange or queue for " + this.consumer, (Throwable)e);
            }
        }
        return channel;
    }
}

