/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.jms.client;

import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.jms.admin.RMQDestination;
import com.rabbitmq.jms.client.DelayedReceiver;
import com.rabbitmq.jms.client.MessageListenerConsumer;
import com.rabbitmq.jms.client.RMQMessage;
import com.rabbitmq.jms.client.RMQSession;
import com.rabbitmq.jms.util.AbortableHolder;
import com.rabbitmq.jms.util.AbortedException;
import com.rabbitmq.jms.util.EntryExitManager;
import com.rabbitmq.jms.util.TimeTracker;
import com.rabbitmq.jms.util.Util;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueReceiver;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RMQMessageConsumer
implements MessageConsumer,
QueueReceiver,
TopicSubscriber {
    private final Logger logger = LoggerFactory.getLogger(RMQMessageConsumer.class);
    private static final int DEFAULT_BATCHING_SIZE = 5;
    private static final long STOP_TIMEOUT_MS = 1000L;
    private final RMQDestination destination;
    private final RMQSession session;
    private final String uuidTag;
    private final String messageSelector;
    private final AtomicReference<MessageListenerConsumer> listenerConsumer = new AtomicReference();
    private final EntryExitManager receiveManager = new EntryExitManager();
    private final AbortableHolder abortables = new AbortableHolder();
    private volatile boolean closed = false;
    private volatile boolean closing = false;
    private volatile MessageListener messageListener;
    private volatile boolean durable = false;
    private volatile boolean noLocal = false;
    private final DelayedReceiver delayedReceiver;
    private final boolean autoAck;
    private final AtomicInteger numberOfReceives = new AtomicInteger(0);
    private final boolean requeueOnMessageListenerException;

    RMQMessageConsumer(RMQSession session, RMQDestination destination, String uuidTag, boolean paused, String messageSelector, boolean requeueOnMessageListenerException) {
        this.session = session;
        this.destination = destination;
        this.uuidTag = uuidTag;
        this.delayedReceiver = new DelayedReceiver(5, this);
        this.messageSelector = messageSelector;
        if (!paused) {
            this.receiveManager.openGate();
        }
        this.autoAck = session.isAutoAck();
        this.requeueOnMessageListenerException = requeueOnMessageListenerException;
    }

    public Queue getQueue() throws JMSException {
        return this.destination;
    }

    public String getMessageSelector() throws JMSException {
        return this.messageSelector;
    }

    public MessageListener getMessageListener() throws JMSException {
        return this.messageListener;
    }

    boolean messageListenerIsSet() {
        return null != this.messageListener;
    }

    private void removeListenerConsumer() {
        MessageListenerConsumer listConsumer = this.listenerConsumer.getAndSet(null);
        if (listConsumer != null) {
            this.abortables.remove(listConsumer);
            listConsumer.stop();
        }
    }

    public void setMessageListener(MessageListener messageListener) throws JMSException {
        if (messageListener == this.messageListener) {
            this.logger.info("MessageListener({}) already set", (Object)messageListener);
            return;
        }
        if (!this.session.aSyncAllowed()) {
            throw new IllegalStateException("A MessageListener cannot be set if receive() is outstanding on a session. (See JMS 1.1 \u00a74.4.6.)");
        }
        this.logger.trace("setting MessageListener({})", (Object)messageListener);
        this.removeListenerConsumer();
        this.messageListener = messageListener;
        this.setNewListenerConsumer(messageListener);
    }

    private void setNewListenerConsumer(MessageListener messageListener) throws IllegalStateException {
        if (messageListener != null) {
            MessageListenerConsumer mlConsumer = new MessageListenerConsumer(this, this.getSession().getChannel(), messageListener, TimeUnit.MILLISECONDS.toNanos(this.session.getConnection().getTerminationTimeout()), this.requeueOnMessageListenerException);
            if (this.listenerConsumer.compareAndSet(null, mlConsumer)) {
                this.abortables.add(mlConsumer);
                if (!this.getSession().getConnection().isStopped()) {
                    mlConsumer.start();
                }
            } else {
                mlConsumer.abort();
                throw new IllegalStateException(String.format("MessageListener concurrently set on Consumer [%s].", this));
            }
        }
    }

    public Message receive() throws JMSException {
        if (this.closed || this.closing) {
            throw new IllegalStateException("Consumer is closed or closing.");
        }
        this.logger.trace("receive (wait forever)");
        return this.receive(new TimeTracker());
    }

    public Message receive(long timeout) throws JMSException {
        if (this.closed || this.closing) {
            throw new IllegalStateException("Consumer is closed or closing.");
        }
        this.logger.trace("receive(timeout={}ms)", (Object)timeout);
        return this.receive(timeout == 0L ? new TimeTracker() : new TimeTracker(timeout, TimeUnit.MILLISECONDS));
    }

    final boolean isAutoAck() {
        return this.autoAck;
    }

    void basicConsume(Consumer consumer, String consTag) throws IOException {
        String name = this.rmqQueueName();
        this.logger.debug("consuming from queue '{}' with tag '{}'", (Object)name, (Object)consTag);
        this.getSession().getChannel().basicConsume(name, false, consTag, this.noLocal, false, null, consumer);
    }

    static final String newConsumerTag() {
        return Util.generateUUID("jms-consumer-");
    }

    String rmqQueueName() {
        if (this.destination.isQueue()) {
            return this.destination.getAmqpQueueName();
        }
        return this.getUUIDTag();
    }

    int getNumberOfReceives() {
        return this.numberOfReceives.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private RMQMessage receive(TimeTracker tt) throws JMSException {
        if (!this.session.syncAllowed()) {
            throw new IllegalStateException("A session may not receive() when a MessageListener is set. (See JMS 1.1 \u00a74.4.6.)");
        }
        this.numberOfReceives.incrementAndGet();
        try {
            GetResponse resp;
            block15: {
                if (!this.receiveManager.enter(tt)) {
                    RMQMessage rMQMessage = null;
                    return rMQMessage;
                }
                try {
                    resp = this.delayedReceiver.get(tt);
                    if (resp != null) break block15;
                    RMQMessage rMQMessage = null;
                    this.receiveManager.exit();
                    return rMQMessage;
                }
                catch (Throwable throwable) {
                    try {
                        this.receiveManager.exit();
                        throw throwable;
                    }
                    catch (AbortedException e) {
                        RMQMessage rMQMessage = null;
                        return rMQMessage;
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        RMQMessage rMQMessage = null;
                        return rMQMessage;
                    }
                }
            }
            this.dealWithAcknowledgements(this.isAutoAck(), resp.getEnvelope().getDeliveryTag());
            RMQMessage rMQMessage = RMQMessage.convertMessage(this.session, this.destination, resp);
            this.receiveManager.exit();
            return rMQMessage;
        }
        finally {
            this.numberOfReceives.decrementAndGet();
        }
    }

    void dealWithAcknowledgements(boolean ack, long dtag) {
        if (ack) {
            this.session.explicitAck(dtag);
        } else {
            this.session.unackedMessageReceived(dtag);
        }
    }

    public Message receiveNoWait() throws JMSException {
        this.logger.trace("receive without waiting");
        if (this.closed || this.closing) {
            throw new IllegalStateException("Consumer is closed or closing.");
        }
        return this.receive(TimeTracker.ZERO);
    }

    public void close() throws JMSException {
        this.getSession().consumerClose(this);
    }

    boolean isClosed() {
        return this.closed;
    }

    void internalClose() throws JMSException {
        this.logger.trace("close consumer({})", (Object)this);
        this.closing = true;
        this.receiveManager.closeGate();
        this.receiveManager.abortWaiters();
        this.delayedReceiver.close();
        this.removeListenerConsumer();
        this.abortables.abort();
        this.closed = true;
        this.closing = false;
    }

    RMQDestination getDestination() {
        return this.destination;
    }

    RMQSession getSession() {
        return this.session;
    }

    private String getUUIDTag() {
        return this.uuidTag;
    }

    public Topic getTopic() throws JMSException {
        if (this.getDestination().isQueue()) {
            throw new JMSException("Destination is not of type Topic");
        }
        return this.getDestination();
    }

    public boolean getNoLocal() throws JMSException {
        return this.getNoLocalNoException();
    }

    private boolean getNoLocalNoException() {
        return this.noLocal;
    }

    void pause() throws InterruptedException {
        this.receiveManager.closeGate();
        this.receiveManager.waitToClear(new TimeTracker(1000L, TimeUnit.MILLISECONDS));
        this.abortables.stop();
    }

    void resume() throws JMSException {
        this.abortables.start();
        this.receiveManager.openGate();
    }

    public boolean isDurable() {
        return this.durable;
    }

    void setDurable(boolean durable) {
        this.durable = durable;
    }

    void setNoLocal(boolean noLocal) {
        this.noLocal = noLocal;
    }

    GetResponse getFromRabbitQueue() {
        String qN = this.rmqQueueName();
        try {
            return this.getSession().getChannel().basicGet(qN, false);
        }
        catch (Exception e) {
            if (!(e instanceof ShutdownSignalException) && !(e.getCause() instanceof ShutdownSignalException)) {
                this.logger.error("basicGet for queue '{}' threw unexpected exception", (Object)qN, (Object)e);
            }
            return null;
        }
    }
}

