/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.carbon.inbound.endpoint.protocol.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;
import java.util.Properties;
import org.apache.axiom.om.OMException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.inbound.endpoint.protocol.rabbitmq.RabbitMQAckStates;
import org.wso2.carbon.inbound.endpoint.protocol.rabbitmq.RabbitMQConnectionFactory;
import org.wso2.carbon.inbound.endpoint.protocol.rabbitmq.RabbitMQException;
import org.wso2.carbon.inbound.endpoint.protocol.rabbitmq.RabbitMQInjectHandler;
import org.wso2.carbon.inbound.endpoint.protocol.rabbitmq.RabbitMQMessage;
import org.wso2.carbon.inbound.endpoint.protocol.rabbitmq.RabbitMQUtils;
import org.wso2.carbon.inbound.endpoint.protocol.rabbitmq.States;

public class RabbitMQConnectionConsumer {
    private static final Log log = LogFactory.getLog(RabbitMQConnectionConsumer.class);
    private RabbitMQConnectionFactory rabbitMQConnectionFactory;
    private Properties rabbitMQProperties;
    private static final int STATE_STOPPED = 0;
    private static final int STATE_STARTED = 1;
    private static final int STATE_PAUSED = 2;
    private static final int STATE_SHUTTING_DOWN = 3;
    private static final int STATE_FAILURE = 4;
    private static final int STATE_FAULTY = 5;
    private volatile int workerState = 0;
    private String inboundName;
    private Connection connection = null;
    private Channel channel = null;
    private boolean autoAck = false;
    private QueueingConsumer queueingConsumer;
    private String queueName;
    private String routeKey;
    private String exchangeName;
    private Hashtable<String, String> rabbitMQProps = new Hashtable();
    private RabbitMQInjectHandler injectHandler;
    private String consumerTagString;
    private volatile boolean connected = false;
    private volatile boolean idle = false;

    public RabbitMQConnectionConsumer(RabbitMQConnectionFactory rabbitMQConnectionFactory, Properties rabbitMQProperties, RabbitMQInjectHandler injectHandler) {
        this.rabbitMQConnectionFactory = rabbitMQConnectionFactory;
        this.rabbitMQProperties = rabbitMQProperties;
        this.injectHandler = injectHandler;
        for (String propertyName : rabbitMQProperties.stringPropertyNames()) {
            this.rabbitMQProps.put(propertyName, rabbitMQProperties.getProperty(propertyName));
        }
    }

    public void execute() {
        try {
            this.workerState = 1;
            this.initConsumer();
            while (this.workerState == 1) {
                try {
                    this.startConsumer();
                }
                catch (ShutdownSignalException sse) {
                    if (sse.isInitiatedByApplication()) continue;
                    log.error((Object)("RabbitMQ Listener of the inbound " + this.inboundName + " was disconnected"), (Throwable)sse);
                    this.waitForConnection();
                }
                catch (OMException e) {
                    log.error((Object)"Invalid Message Format while consuming the message", (Throwable)e);
                }
                catch (IOException e) {
                    log.error((Object)("RabbitMQ Listener of the inbound " + this.inboundName + " was disconnected"), (Throwable)e);
                    this.waitForConnection();
                }
            }
        }
        catch (IOException e) {
            this.handleException("Error initializing consumer for inbound " + this.inboundName, e);
        }
        finally {
            this.closeConnection();
            this.workerState = 0;
        }
    }

    private void waitForConnection() throws IOException {
        int retryInterval = this.rabbitMQConnectionFactory.getRetryInterval();
        int retryCountMax = this.rabbitMQConnectionFactory.getRetryCount();
        int retryCount = 0;
        while (!(this.workerState != 1 || this.connection.isOpen() || retryCountMax != -1 && retryCount >= retryCountMax)) {
            ++retryCount;
            log.info((Object)("Attempting to reconnect to RabbitMQ Broker for the inbound " + this.inboundName + " in " + retryInterval + " ms"));
            try {
                Thread.sleep(retryInterval);
            }
            catch (InterruptedException e) {
                log.error((Object)("Error while trying to reconnect to RabbitMQ Broker for the inbound " + this.inboundName), (Throwable)e);
            }
        }
        if (this.connection.isOpen()) {
            log.info((Object)("Successfully reconnected to RabbitMQ Broker for the inbound " + this.inboundName));
            this.initConsumer();
        } else {
            log.error((Object)("Could not reconnect to the RabbitMQ Broker for the inbound " + this.inboundName + ". Connection is closed."));
            this.workerState = 5;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void startConsumer() throws ShutdownSignalException, IOException {
        this.connection = this.getConnection();
        if (this.channel == null || !this.channel.isOpen()) {
            this.channel = this.connection.createChannel();
            log.debug((Object)("Channel is not open. Creating a new channel for inbound " + this.inboundName));
        }
        if (this.queueingConsumer == null) {
            this.workerState = 0;
            return;
        }
        while (this.isActive()) {
            block23: {
                boolean mediationError;
                block21: {
                    RabbitMQAckStates ackState;
                    RabbitMQMessage message;
                    block22: {
                        try {
                            if (!this.channel.isOpen()) {
                                this.channel = this.queueingConsumer.getChannel();
                            }
                            message = this.getConsumerDelivery(this.queueingConsumer);
                        }
                        catch (ConsumerCancelledException | ShutdownSignalException | InterruptedException e) {
                            log.warn((Object)"Exception occurred while consuming the message", e);
                            continue;
                        }
                        if (message == null) break block23;
                        this.idle = false;
                        ackState = RabbitMQAckStates.REJECT_AND_REQUEUE;
                        try {
                            States states = this.injectHandler.invokeAndReturnAckState(message, this.inboundName);
                            ackState = states.ackState;
                            mediationError = states.mediationError;
                            if (this.autoAck) break block21;
                            if (ackState != RabbitMQAckStates.ACK) break block22;
                        }
                        catch (Throwable throwable) {
                            if (!this.autoAck) {
                                if (ackState == RabbitMQAckStates.ACK) {
                                    try {
                                        this.channel.basicAck(message.getDeliveryTag(), false);
                                    }
                                    catch (IOException e) {
                                        log.error((Object)"Error while acknowledging the message", (Throwable)e);
                                    }
                                } else {
                                    try {
                                        this.channel.basicReject(message.getDeliveryTag(), ackState == RabbitMQAckStates.REJECT_AND_REQUEUE);
                                    }
                                    catch (IOException e) {
                                        log.error((Object)"Error while rejecting the unacked message", (Throwable)e);
                                    }
                                }
                            }
                            throw throwable;
                        }
                        try {
                            this.channel.basicAck(message.getDeliveryTag(), false);
                        }
                        catch (IOException e) {
                            log.error((Object)"Error while acknowledging the message", (Throwable)e);
                        }
                        break block21;
                    }
                    try {
                        this.channel.basicReject(message.getDeliveryTag(), ackState == RabbitMQAckStates.REJECT_AND_REQUEUE);
                    }
                    catch (IOException e) {
                        log.error((Object)"Error while rejecting the unacked message", (Throwable)e);
                    }
                }
                if (!mediationError) continue;
                try {
                    Thread.sleep(2000L);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                continue;
            }
            this.idle = true;
        }
    }

    private void initConsumer() throws IOException {
        String qos;
        if (log.isDebugEnabled()) {
            log.debug((Object)("Initializing consumer for inbound " + this.inboundName));
        }
        this.connection = this.getConnection();
        this.channel = this.connection.createChannel();
        this.queueName = this.rabbitMQProperties.getProperty("rabbitmq.queue.name");
        this.routeKey = this.rabbitMQProperties.getProperty("rabbitmq.queue.routing.key");
        this.exchangeName = this.rabbitMQProperties.getProperty("rabbitmq.exchange.name");
        String autoAckStringValue = this.rabbitMQProperties.getProperty("rabbitmq.queue.auto.ack");
        if (autoAckStringValue != null) {
            try {
                this.autoAck = Boolean.parseBoolean(autoAckStringValue);
            }
            catch (Exception e) {
                log.debug((Object)"Format error in rabbitmq.queue.auto.ack parameter");
            }
        }
        if (StringUtils.isEmpty((String)this.queueName)) {
            this.queueName = this.inboundName;
            log.info((Object)("No queue name is specified for " + this.inboundName + ". inbound factory name will be used as queue name"));
        }
        if (this.routeKey == null) {
            log.info((Object)"No routing key specified. Using queue name as the routing key.");
            this.routeKey = this.queueName;
        }
        if (!StringUtils.isEmpty((String)this.queueName)) {
            RabbitMQUtils.declareQueue(this.connection, this.queueName, this.rabbitMQProps);
        }
        if (!StringUtils.isEmpty((String)this.exchangeName)) {
            RabbitMQUtils.declareExchange(this.connection, this.exchangeName, this.rabbitMQProps);
            if (!this.channel.isOpen()) {
                this.channel = this.connection.createChannel();
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Channel is not open. Creating a new channel for inbound " + this.inboundName));
                }
            }
            this.channel.queueBind(this.queueName, this.exchangeName, this.routeKey);
            log.debug((Object)("Bind queue '" + this.queueName + "' to exchange '" + this.exchangeName + "' with route key '" + this.routeKey + "'"));
        }
        if (!this.channel.isOpen()) {
            this.channel = this.connection.createChannel();
            log.debug((Object)("Channel is not open. Creating a new channel for inbound " + this.inboundName));
        }
        if ((qos = this.rabbitMQProperties.getProperty("rabbitmq.channel.consumer.qos")) != null && !qos.isEmpty()) {
            try {
                this.channel.basicQos(Integer.parseInt(qos));
            }
            catch (NumberFormatException e) {
                log.warn((Object)("Unable to parse given QoS value, " + qos + " as an integer. Therefore using channel without QoS."));
            }
        }
        this.queueingConsumer = new QueueingConsumer(this.channel);
        this.consumerTagString = this.rabbitMQProperties.getProperty("rabbitmq.consumer.tag");
        if (this.consumerTagString != null) {
            this.channel.basicConsume(this.queueName, this.autoAck, this.consumerTagString, (Consumer)this.queueingConsumer);
            log.debug((Object)("Start consuming queue '" + this.queueName + "' with consumer tag '" + this.consumerTagString + "' for inbound " + this.inboundName));
        } else {
            this.consumerTagString = this.channel.basicConsume(this.queueName, this.autoAck, (Consumer)this.queueingConsumer);
            log.debug((Object)("Start consuming queue '" + this.queueName + "' with consumer tag '" + this.consumerTagString + "' for inbound " + this.inboundName));
        }
    }

    private RabbitMQMessage getConsumerDelivery(QueueingConsumer consumer) throws ShutdownSignalException, InterruptedException, ConsumerCancelledException {
        RabbitMQMessage message = new RabbitMQMessage();
        log.debug((Object)("Waiting for next delivery from queue for inbound " + this.inboundName));
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        if (delivery != null) {
            AMQP.BasicProperties properties = delivery.getProperties();
            Map headers = properties.getHeaders();
            message.setBody(delivery.getBody());
            message.setDeliveryTag(delivery.getEnvelope().getDeliveryTag());
            message.setReplyTo(properties.getReplyTo());
            message.setMessageId(properties.getMessageId());
            String contentType = properties.getContentType();
            if (contentType == null) {
                contentType = this.rabbitMQProperties.getProperty("rabbitmq.message.content.type");
            }
            message.setContentType(contentType);
            message.setContentEncoding(properties.getContentEncoding());
            message.setCorrelationId(properties.getCorrelationId());
            if (headers != null) {
                message.setHeaders(headers);
                if (headers.get("SOAP_ACTION") != null) {
                    message.setSoapAction(headers.get("SOAP_ACTION").toString());
                }
            }
        } else {
            log.debug((Object)("Queue delivery item is null for inbound " + this.inboundName));
            return null;
        }
        return message;
    }

    private void closeConnection() {
        if (this.connection != null && this.connection.isOpen()) {
            try {
                this.connection.close();
                log.info((Object)("RabbitMQ connection closed for inbound " + this.inboundName));
            }
            catch (IOException e) {
                log.error((Object)("Error while closing RabbitMQ connection for inbound " + this.inboundName), (Throwable)e);
            }
            finally {
                this.connection = null;
            }
        }
    }

    private Connection createConnection() throws IOException {
        Connection connection = null;
        try {
            connection = this.rabbitMQConnectionFactory.createConnection();
            log.info((Object)("RabbitMQ connection created for inbound " + this.inboundName));
        }
        catch (Exception e) {
            this.handleException("Error while creating RabbitMQ connection for inbound " + this.inboundName, e);
        }
        return connection;
    }

    private Connection getConnection() throws IOException {
        if (this.connection == null) {
            this.connection = this.createConnection();
            this.setConnected(true);
        }
        return this.connection;
    }

    private boolean isActive() {
        return this.workerState == 1;
    }

    public void setConnected(boolean connected) {
        this.connected = connected;
    }

    public String getInboundName() {
        return this.inboundName;
    }

    public void setInboundName(String inboundName) {
        this.inboundName = inboundName;
    }

    protected void requestShutdown() {
        this.workerState = 3;
        this.closeConnection();
    }

    private void handleException(String msg, Exception e) {
        log.error((Object)msg, (Throwable)e);
        throw new RabbitMQException(msg, e);
    }
}

