package org.apache.synapse.message.store.impl.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.MessageContext;
import org.apache.synapse.message.MessageConsumer;
import org.apache.synapse.message.store.impl.commons.MessageConverter;
import org.apache.synapse.message.store.impl.commons.StorableMessage;
import org.springframework.beans.PropertyAccessor;

/* loaded from: input_file:WEB-INF/lib/synapse-core-2.1.7-wso2v88.jar:org/apache/synapse/message/store/impl/rabbitmq/RabbitMQConsumer.class */
public class RabbitMQConsumer implements MessageConsumer {
    private static final Log logger = LogFactory.getLog(RabbitMQConsumer.class.getName());
    private Connection connection;
    private Channel channel;
    private RabbitMQStore store;
    private String queueName;
    private String idString;
    private boolean isInitialized;
    private boolean isReceiveError;
    private CachedMessage cachedMessage;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/synapse-core-2.1.7-wso2v88.jar:org/apache/synapse/message/store/impl/rabbitmq/RabbitMQConsumer$CachedMessage.class */
    public final class CachedMessage {
        private GetResponse message;
        private MessageContext mc;
        private String id;

        private CachedMessage() {
            this.message = null;
            this.mc = null;
            this.id = "";
        }

        public CachedMessage setMessage(GetResponse getResponse) {
            this.message = getResponse;
            return this;
        }

        public boolean ack() {
            if (this.message == null || RabbitMQConsumer.this.channel == null || !RabbitMQConsumer.this.channel.isOpen()) {
                return false;
            }
            try {
                RabbitMQConsumer.this.channel.basicAck(this.message.getEnvelope().getDeliveryTag(), false);
                return true;
            } catch (IOException e) {
                RabbitMQConsumer.logger.error(getId() + " cannot ack last read message. Error:" + e.getLocalizedMessage(), e);
                return false;
            }
        }

        public GetResponse getMessage() {
            return this.message;
        }

        public CachedMessage setMc(MessageContext messageContext) {
            this.mc = messageContext;
            return this;
        }

        public CachedMessage setId(String str) {
            this.id = str;
            return this;
        }

        public String getId() {
            return this.id;
        }
    }

    public RabbitMQConsumer(RabbitMQStore rabbitMQStore) {
        if (rabbitMQStore == null) {
            logger.error("Cannot initialize.");
            return;
        }
        this.store = rabbitMQStore;
        this.cachedMessage = new CachedMessage();
        this.isReceiveError = false;
        this.isInitialized = true;
    }

    @Override // org.apache.synapse.message.MessageConsumer
    public MessageContext receive() {
        if (!checkConnection()) {
            if (!reconnect()) {
                if (!logger.isDebugEnabled()) {
                    return null;
                }
                logger.debug(getId() + " cannot receive message from store. Can not reconnect.");
                return null;
            }
            logger.info(getId() + " reconnected to store.");
            this.isReceiveError = false;
        }
        if (this.channel != null) {
            if (!this.channel.isOpen() && !setChannel()) {
                logger.info(getId() + " unable to create the channel.");
                return null;
            }
        } else if (!setChannel()) {
            logger.info(getId() + " unable to create the channel.");
            return null;
        }
        try {
            GetResponse basicGet = this.channel.basicGet(this.queueName, false);
            if (basicGet == null) {
                return null;
            }
            StorableMessage storableMessage = null;
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(basicGet.getBody());
            ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
            try {
                storableMessage = (StorableMessage) objectInputStream.readObject();
            } catch (ClassNotFoundException e) {
                logger.error(getId() + "unable to read the stored message" + e);
                this.channel.basicAck(basicGet.getEnvelope().getDeliveryTag(), false);
            }
            byteArrayInputStream.close();
            objectInputStream.close();
            org.apache.axis2.context.MessageContext newAxis2Mc = this.store.newAxis2Mc();
            MessageContext messageContext = MessageConverter.toMessageContext(storableMessage, newAxis2Mc, this.store.newSynapseMc(newAxis2Mc));
            updateCache(basicGet, messageContext, null, false);
            if (logger.isDebugEnabled()) {
                logger.debug(getId() + " Received MessageId:" + basicGet.getProps().getMessageId());
            }
            return messageContext;
        } catch (ShutdownSignalException e2) {
            logger.error(getId() + " connection error when receiving messages" + e2);
            return null;
        } catch (IOException e3) {
            logger.error(getId() + " connection error when receiving messages" + e3);
            return null;
        }
    }

    @Override // org.apache.synapse.message.MessageConsumer
    public boolean ack() {
        boolean ack = this.cachedMessage.ack();
        if (ack) {
            this.store.dequeued();
        }
        return ack;
    }

    @Override // org.apache.synapse.message.MessageConsumer
    public boolean cleanup() {
        if (logger.isDebugEnabled()) {
            logger.debug(getId() + " cleaning up...");
        }
        if (!this.store.cleanup(this.connection, true)) {
            return false;
        }
        this.connection = null;
        return true;
    }

    @Override // org.apache.synapse.message.MessageConsumer
    public boolean isAlive() {
        return true;
    }

    public RabbitMQConsumer setConnection(Connection connection) {
        this.connection = connection;
        return this;
    }

    public void setQueueName(String str) {
        this.queueName = str;
    }

    public boolean setChannel() {
        if (this.connection == null || !this.connection.isOpen()) {
            return false;
        }
        try {
            this.channel = this.connection.createChannel();
            return true;
        } catch (IOException e) {
            return false;
        }
    }

    @Override // org.apache.synapse.message.MessageConsumer
    public void setId(int i) {
        this.idString = PropertyAccessor.PROPERTY_KEY_PREFIX + this.store.getName() + "-C-" + i + PropertyAccessor.PROPERTY_KEY_SUFFIX;
    }

    @Override // org.apache.synapse.message.MessageConsumer
    public String getId() {
        return this.idString;
    }

    public Connection getConnection() {
        return this.connection;
    }

    private boolean checkConnection() {
        if (this.connection == null) {
            if (!logger.isDebugEnabled()) {
                return false;
            }
            logger.debug(getId() + " cannot proceed. RabbitMQ Connection is null.");
            return false;
        }
        if (this.connection.isOpen()) {
            return true;
        }
        if (!logger.isDebugEnabled()) {
            return false;
        }
        logger.debug(getId() + " cannot proceed. RabbitMQ Connection is closed.");
        return false;
    }

    private boolean reconnect() {
        RabbitMQConsumer rabbitMQConsumer = (RabbitMQConsumer) this.store.getConsumer();
        if (rabbitMQConsumer.getConnection() == null) {
            if (!logger.isDebugEnabled()) {
                return false;
            }
            logger.debug(getId() + " could not reconnect to the broker.");
            return false;
        }
        setConnection(rabbitMQConsumer.getConnection());
        if (logger.isDebugEnabled()) {
            logger.debug(getId() + " ===> " + rabbitMQConsumer.getId());
        }
        this.idString = rabbitMQConsumer.getId();
        return true;
    }

    private void updateCache(GetResponse getResponse, MessageContext messageContext, String str, boolean z) throws IOException {
        this.isReceiveError = z;
        this.cachedMessage.setMessage(getResponse);
        this.cachedMessage.setMc(messageContext);
        this.cachedMessage.setId(str);
    }
}
