/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.FailedToCreateProducerException;
import org.apache.camel.Message;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.rabbitmq.RabbitMQEndpoint;
import org.apache.camel.component.rabbitmq.ReplyToType;
import org.apache.camel.component.rabbitmq.pool.PoolableChannelFactory;
import org.apache.camel.component.rabbitmq.reply.ReplyManager;
import org.apache.camel.component.rabbitmq.reply.TemporaryQueueReplyManager;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RabbitMQProducer
extends DefaultAsyncProducer {
    private static final Logger LOG = LoggerFactory.getLogger(RabbitMQProducer.class);
    private static final String GENERATED_CORRELATION_ID_PREFIX = "Camel-";
    private Connection conn;
    private ObjectPool<Channel> channelPool;
    private ExecutorService executorService;
    private int closeTimeout = 30000;
    private final AtomicBoolean started = new AtomicBoolean();
    private ReplyManager replyManager;

    public RabbitMQProducer(RabbitMQEndpoint endpoint) {
        super((Endpoint)endpoint);
    }

    public RabbitMQEndpoint getEndpoint() {
        return (RabbitMQEndpoint)super.getEndpoint();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> T execute(ChannelCallback<T> callback) throws Exception {
        Channel channel;
        try {
            channel = (Channel)this.channelPool.borrowObject();
        }
        catch (IllegalStateException e) {
            this.checkConnectionAndChannelPool();
            channel = (Channel)this.channelPool.borrowObject();
        }
        if (!channel.isOpen()) {
            LOG.warn("Got a closed channel from the pool. Invalidating and borrowing a new one from the pool.");
            this.channelPool.invalidateObject((Object)channel);
            this.checkConnectionAndChannelPool();
            this.attemptDeclaration();
            channel = (Channel)this.channelPool.borrowObject();
        }
        try {
            T t = callback.doWithChannel(channel);
            return t;
        }
        finally {
            this.channelPool.returnObject((Object)channel);
        }
    }

    private synchronized void openConnectionAndChannelPool() throws Exception {
        LOG.trace("Creating connection...");
        this.conn = this.getEndpoint().connect(this.executorService);
        LOG.debug("Created connection: {}", (Object)this.conn);
        LOG.trace("Creating channel pool...");
        int channelPoolMaxSize = this.getEndpoint().getChannelPoolMaxSize();
        long maxWait = this.getEndpoint().getChannelPoolMaxWait();
        GenericObjectPoolConfig config = new GenericObjectPoolConfig();
        config.setMaxWait(Duration.ofMillis(maxWait));
        config.setMaxTotal(channelPoolMaxSize);
        this.channelPool = new GenericObjectPool((PooledObjectFactory)new PoolableChannelFactory(this.conn), config);
        this.attemptDeclaration();
    }

    private synchronized void attemptDeclaration() throws Exception {
        if (this.getEndpoint().isDeclare()) {
            this.execute(new ChannelCallback<Void>(){

                @Override
                public Void doWithChannel(Channel channel) throws Exception {
                    RabbitMQProducer.this.getEndpoint().declareExchangeAndQueue(channel);
                    return null;
                }
            });
        }
    }

    private synchronized void checkConnectionAndChannelPool() throws Exception {
        if (this.conn == null || !this.conn.isOpen()) {
            LOG.info("Reconnecting to RabbitMQ");
            try {
                this.closeConnectionAndChannel();
            }
            catch (Exception exception) {
                // empty catch block
            }
            this.openConnectionAndChannelPool();
        }
    }

    protected void doStart() throws Exception {
        this.executorService = this.getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor((Object)this, "CamelRabbitMQProducer[" + this.getEndpoint().getQueue() + "]");
        try {
            this.openConnectionAndChannelPool();
        }
        catch (IOException e) {
            LOG.warn("Failed to create connection. It will attempt to connect again when publishing a message.", (Throwable)e);
        }
    }

    private synchronized void closeConnectionAndChannel() throws IOException {
        if (this.channelPool != null) {
            try {
                this.channelPool.close();
                this.channelPool = null;
            }
            catch (Exception e) {
                throw new IOException("Error closing channelPool", e);
            }
        }
        if (this.conn != null) {
            LOG.debug("Closing connection: {} with timeout: {} ms.", (Object)this.conn, (Object)this.closeTimeout);
            this.conn.close(this.closeTimeout);
            this.conn = null;
        }
    }

    protected void doStop() throws Exception {
        this.unInitReplyManager();
        this.closeConnectionAndChannel();
        if (this.executorService != null) {
            this.getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(this.executorService);
            this.executorService = null;
        }
    }

    public boolean process(Exchange exchange, AsyncCallback callback) {
        if (!this.isRunAllowed()) {
            if (exchange.getException() == null) {
                exchange.setException((Throwable)new RejectedExecutionException());
            }
            callback.done(true);
            return true;
        }
        try {
            if (exchange.getPattern().isOutCapable()) {
                return this.processInOut(exchange, callback);
            }
            return this.processInOnly(exchange, callback);
        }
        catch (Exception e) {
            exchange.setException((Throwable)e);
            callback.done(true);
            return true;
        }
    }

    protected boolean processInOut(Exchange exchange, AsyncCallback callback) throws Exception {
        Message in = exchange.getIn();
        this.initReplyManager();
        long timeout = (Long)exchange.getIn().getHeader("CamelRabbitmqRequestTimeout", (Object)this.getEndpoint().getRequestTimeout(), Long.TYPE);
        String originalCorrelationId = (String)in.getHeader("CamelRabbitmqCorrelationId", String.class);
        String correlationId = GENERATED_CORRELATION_ID_PREFIX + this.getEndpoint().getCamelContext().getUuidGenerator().generateUuid();
        in.setHeader("CamelRabbitmqCorrelationId", (Object)correlationId);
        in.setHeader("CamelRabbitmqReplyTo", (Object)this.replyManager.getReplyTo());
        String exchangeName = (String)exchange.getIn().getHeader("CamelRabbitmqExchangeOverrideName");
        if (exchangeName == null || this.getEndpoint().isBridgeEndpoint()) {
            exchangeName = this.getEndpoint().getExchangeName();
        } else {
            LOG.debug("Overriding header: {} detected sending message to exchange: {}", (Object)"CamelRabbitmqExchangeOverrideName", (Object)exchangeName);
        }
        String key = (String)in.getHeader("CamelRabbitmqRoutingKey", String.class);
        if (key == null || this.getEndpoint().isBridgeEndpoint()) {
            String string = key = this.getEndpoint().getRoutingKey() == null ? "" : this.getEndpoint().getRoutingKey();
        }
        if (ObjectHelper.isEmpty((Object)key) && ObjectHelper.isEmpty((Object)exchangeName)) {
            throw new IllegalArgumentException("ExchangeName and RoutingKey is not provided in the endpoint: " + (Object)((Object)this.getEndpoint()));
        }
        LOG.debug("Registering reply for {}", (Object)correlationId);
        this.replyManager.registerReply(this.replyManager, exchange, callback, originalCorrelationId, correlationId, timeout);
        try {
            this.basicPublish(exchange, exchangeName, key);
        }
        catch (Exception e) {
            this.replyManager.cancelCorrelationId(correlationId);
            exchange.setException((Throwable)e);
            return true;
        }
        return false;
    }

    private boolean processInOnly(Exchange exchange, AsyncCallback callback) throws Exception {
        String exchangeName = (String)exchange.getIn().getHeader("CamelRabbitmqExchangeOverrideName");
        if (exchangeName == null || this.getEndpoint().isBridgeEndpoint()) {
            exchangeName = this.getEndpoint().getExchangeName();
        } else {
            LOG.debug("Overriding header: {} detected sending message to exchange: {}", (Object)"CamelRabbitmqExchangeOverrideName", (Object)exchangeName);
        }
        String key = (String)exchange.getIn().getHeader("CamelRabbitmqRoutingKey", String.class);
        if (key == null || this.getEndpoint().isBridgeEndpoint()) {
            String string = key = this.getEndpoint().getRoutingKey() == null ? "" : this.getEndpoint().getRoutingKey();
        }
        if (ObjectHelper.isEmpty((Object)key) && ObjectHelper.isEmpty((Object)exchangeName)) {
            throw new IllegalArgumentException("ExchangeName and RoutingKey is not provided in the endpoint: " + (Object)((Object)this.getEndpoint()));
        }
        this.basicPublish(exchange, exchangeName, key);
        callback.done(true);
        return true;
    }

    private void basicPublish(final Exchange camelExchange, String rabbitExchange, final String routingKey) throws Exception {
        if (this.channelPool == null) {
            this.checkConnectionAndChannelPool();
        }
        this.execute(new ChannelCallback<Void>(){

            @Override
            public Void doWithChannel(Channel channel) throws Exception {
                RabbitMQProducer.this.getEndpoint().publishExchangeToChannel(camelExchange, channel, routingKey);
                return null;
            }
        });
    }

    AMQP.BasicProperties.Builder buildProperties(Exchange exchange) {
        return this.getEndpoint().getMessageConverter().buildProperties(exchange);
    }

    public int getCloseTimeout() {
        return this.closeTimeout;
    }

    public void setCloseTimeout(int closeTimeout) {
        this.closeTimeout = closeTimeout;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void initReplyManager() {
        if (!this.started.get()) {
            RabbitMQProducer rabbitMQProducer = this;
            synchronized (rabbitMQProducer) {
                if (this.started.get()) {
                    return;
                }
                LOG.debug("Starting reply manager");
                ClassLoader current = Thread.currentThread().getContextClassLoader();
                ClassLoader ac = this.getEndpoint().getCamelContext().getApplicationContextClassLoader();
                try {
                    if (ac != null) {
                        Thread.currentThread().setContextClassLoader(ac);
                    }
                    if (this.getEndpoint().getReplyToType() != null && this.getEndpoint().getReplyTo() != null && this.getEndpoint().getReplyToType().equals(ReplyToType.Temporary.name())) {
                        throw new IllegalArgumentException("ReplyToType " + (Object)((Object)ReplyToType.Temporary) + " is not supported when replyTo " + this.getEndpoint().getReplyTo() + " is also configured.");
                    }
                    if (this.getEndpoint().getReplyTo() != null) {
                        throw new IllegalArgumentException("Specifying replyTo " + this.getEndpoint().getReplyTo() + " is currently not supported.");
                    }
                    this.replyManager = this.createReplyManager();
                    LOG.debug("Using RabbitMQReplyManager: {} to process replies from temporary queue", (Object)this.replyManager);
                }
                catch (Exception e) {
                    throw new FailedToCreateProducerException((Endpoint)this.getEndpoint(), (Throwable)e);
                }
                finally {
                    Thread.currentThread().setContextClassLoader(current);
                }
                this.started.set(true);
            }
        }
    }

    protected void unInitReplyManager() {
        try {
            if (this.replyManager != null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Stopping RabbitMQReplyManager: {} from processing replies from: {}", (Object)this.replyManager, (Object)(this.getEndpoint().getReplyTo() != null ? this.getEndpoint().getReplyTo() : "temporary queue"));
                }
                ServiceHelper.stopService((Object)this.replyManager);
            }
        }
        catch (Exception e) {
            throw RuntimeCamelException.wrapRuntimeCamelException((Throwable)e);
        }
        finally {
            this.started.set(false);
        }
    }

    protected ReplyManager createReplyManager() {
        TemporaryQueueReplyManager replyManager = new TemporaryQueueReplyManager(this.getEndpoint().getCamelContext());
        replyManager.setEndpoint(this.getEndpoint());
        String name = "RabbitMQReplyManagerTimeoutChecker[" + this.getEndpoint().getExchangeName() + "]";
        ScheduledExecutorService replyManagerExecutorService = this.getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor((Object)name, name);
        replyManager.setScheduledExecutorService(replyManagerExecutorService);
        LOG.debug("Staring ReplyManager: {}", (Object)name);
        ServiceHelper.startService((Object)replyManager);
        return replyManager;
    }

    private static interface ChannelCallback<T> {
        public T doWithChannel(Channel var1) throws Exception;
    }
}

