package com.rabbitmq.jms.client;

import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.jms.admin.RMQDestination;
import com.rabbitmq.jms.util.AbortableHolder;
import com.rabbitmq.jms.util.AbortedException;
import com.rabbitmq.jms.util.EntryExitManager;
import com.rabbitmq.jms.util.RMQJMSException;
import com.rabbitmq.jms.util.TimeTracker;
import com.rabbitmq.jms.util.Util;
import jakarta.jms.IllegalStateException;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.Queue;
import jakarta.jms.QueueReceiver;
import jakarta.jms.Topic;
import jakarta.jms.TopicSubscriber;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/rabbitmq/jms/client/RMQMessageConsumer.class */
public class RMQMessageConsumer implements MessageConsumer, QueueReceiver, TopicSubscriber {
    private static final String DIRECT_REPLY_TO = "amq.rabbitmq.reply-to";
    private static final int DEFAULT_BATCHING_SIZE = 5;
    private static final long STOP_TIMEOUT_MS = 1000;
    private final RMQDestination destination;
    private final RMQSession session;
    private final String uuidTag;
    private final String messageSelector;
    private volatile MessageListener messageListener;
    private final DelayedReceiver delayedReceiver;
    private final boolean autoAck;
    private final boolean requeueOnTimeout;
    private final boolean requeueOnMessageListenerException;
    private final ReceivingContextConsumer receivingContextConsumer;
    private final Logger logger = LoggerFactory.getLogger(RMQMessageConsumer.class);
    private final AtomicReference<MessageListenerConsumer> listenerConsumer = new AtomicReference<>();
    private final EntryExitManager receiveManager = new EntryExitManager();
    private final AbortableHolder abortables = new AbortableHolder();
    private volatile boolean closed = false;
    private volatile boolean closing = false;
    private volatile boolean durable = false;
    private volatile boolean noLocal = false;
    private final List<ClosedListener> closedListeners = new CopyOnWriteArrayList();
    private final AtomicInteger numberOfReceives = new AtomicInteger(0);

    @FunctionalInterface
    /* loaded from: input_file:com/rabbitmq/jms/client/RMQMessageConsumer$ClosedListener.class */
    interface ClosedListener {
        void closed(RMQMessageConsumer rMQMessageConsumer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RMQMessageConsumer(RMQSession rMQSession, RMQDestination rMQDestination, String str, boolean z, String str2, boolean z2, ReceivingContextConsumer receivingContextConsumer, boolean z3) {
        if (z3 && !z2) {
            throw new IllegalArgumentException("requeueOnTimeout can be true only if requeueOnMessageListenerException is true as well");
        }
        this.session = rMQSession;
        this.destination = rMQDestination;
        this.uuidTag = str;
        this.delayedReceiver = new DelayedReceiver(DEFAULT_BATCHING_SIZE, this);
        this.messageSelector = str2;
        if (!z) {
            this.receiveManager.openGate();
        }
        this.autoAck = rMQSession.isAutoAck();
        this.requeueOnMessageListenerException = z2;
        this.receivingContextConsumer = receivingContextConsumer;
        this.requeueOnTimeout = z3;
    }

    public Queue getQueue() throws JMSException {
        return this.destination;
    }

    public String getMessageSelector() throws JMSException {
        return this.messageSelector;
    }

    public MessageListener getMessageListener() throws JMSException {
        return this.messageListener;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean messageListenerIsSet() {
        return null != this.messageListener;
    }

    private void removeListenerConsumer() {
        MessageListenerConsumer andSet = this.listenerConsumer.getAndSet(null);
        if (andSet != null) {
            this.abortables.remove(andSet);
            andSet.stop();
        }
    }

    public void setMessageListener(MessageListener messageListener) throws JMSException {
        if (messageListener == this.messageListener) {
            this.logger.info("MessageListener({}) already set", messageListener);
            return;
        }
        if (!this.session.aSyncAllowed()) {
            throw new IllegalStateException("A MessageListener cannot be set if receive() is outstanding on a session. (See JMS 1.1 §4.4.6.)");
        }
        this.logger.trace("setting MessageListener({})", messageListener);
        removeListenerConsumer();
        this.messageListener = messageListener;
        try {
            setNewListenerConsumer(messageListener);
        } catch (Exception e) {
            throw new RMQJMSException(e);
        } catch (JMSException e2) {
            throw e2;
        }
    }

    private void setNewListenerConsumer(MessageListener messageListener) throws Exception {
        if (messageListener != null) {
            MessageListenerConsumer messageListenerConsumer = new MessageListenerConsumer(this, getSession().getChannel(), messageListener, TimeUnit.MILLISECONDS.toNanos(this.session.getConnection().getTerminationTimeout()), this.requeueOnMessageListenerException, this.receivingContextConsumer, this.requeueOnTimeout);
            if (!this.listenerConsumer.compareAndSet(null, messageListenerConsumer)) {
                messageListenerConsumer.abort();
                throw new IllegalStateException(String.format("MessageListener concurrently set on Consumer [%s].", this));
            }
            this.abortables.add(messageListenerConsumer);
            if (getSession().getConnection().isStopped()) {
                return;
            }
            messageListenerConsumer.start();
        }
    }

    public Message receive() throws JMSException {
        if (this.closed || this.closing) {
            throw new IllegalStateException("Consumer is closed or closing.");
        }
        this.logger.trace("receive (wait forever)");
        return receive(new TimeTracker());
    }

    public Message receive(long j) throws JMSException {
        if (this.closed || this.closing) {
            throw new IllegalStateException("Consumer is closed or closing.");
        }
        this.logger.trace("receive(timeout={}ms)", Long.valueOf(j));
        return receive(j == 0 ? new TimeTracker() : new TimeTracker(j, TimeUnit.MILLISECONDS));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final boolean isAutoAck() {
        return this.autoAck;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void basicConsume(Consumer consumer, String str) throws IOException {
        String rmqQueueName = rmqQueueName();
        this.logger.debug("consuming from queue '{}' with tag '{}'", rmqQueueName, str);
        getSession().getChannel().basicConsume(rmqQueueName, amqpAutoAck(), str, this.noLocal, false, (Map) null, consumer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static final String newConsumerTag() {
        return Util.generateUUID("jms-consumer-");
    }

    String rmqQueueName() {
        return this.destination.isQueue() ? this.destination.getAmqpQueueName() : getUUIDTag();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getNumberOfReceives() {
        return this.numberOfReceives.get();
    }

    private RMQMessage receive(TimeTracker timeTracker) throws JMSException {
        if (!this.session.syncAllowed()) {
            throw new IllegalStateException("A session may not receive() when a MessageListener is set. (See JMS 1.1 §4.4.6.)");
        }
        this.numberOfReceives.incrementAndGet();
        try {
            try {
                if (!this.receiveManager.enter(timeTracker)) {
                    this.numberOfReceives.decrementAndGet();
                    return null;
                }
                try {
                    GetResponse getResponse = this.delayedReceiver.get(timeTracker);
                    if (getResponse == null) {
                        this.numberOfReceives.decrementAndGet();
                        return null;
                    }
                    dealWithAcknowledgements(isAutoAck(), getResponse.getEnvelope().getDeliveryTag());
                    this.session.addUncommittedTag(getResponse.getEnvelope().getDeliveryTag());
                    RMQMessage convertMessage = RMQMessage.convertMessage(this.session, this.destination, getResponse, this.receivingContextConsumer);
                    this.receiveManager.exit();
                    this.numberOfReceives.decrementAndGet();
                    return convertMessage;
                } finally {
                    this.receiveManager.exit();
                }
            } catch (AbortedException e) {
                this.numberOfReceives.decrementAndGet();
                return null;
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                this.numberOfReceives.decrementAndGet();
                return null;
            }
        } catch (Throwable th) {
            this.numberOfReceives.decrementAndGet();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void dealWithAcknowledgements(boolean z, long j) {
        if (z) {
            this.session.explicitAck(j);
        } else {
            this.session.unackedMessageReceived(j);
        }
    }

    public Message receiveNoWait() throws JMSException {
        this.logger.trace("receive without waiting");
        if (this.closed || this.closing) {
            throw new IllegalStateException("Consumer is closed or closing.");
        }
        return receive(TimeTracker.ZERO);
    }

    public void close() throws JMSException {
        getSession().consumerClose(this);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isClosed() {
        return this.closed;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void internalClose() throws JMSException {
        try {
            this.logger.trace("close consumer({})", this);
            this.closing = true;
            this.receiveManager.closeGate();
            this.receiveManager.abortWaiters();
            this.delayedReceiver.close();
            removeListenerConsumer();
            try {
                this.abortables.abort();
                this.closed = true;
                this.closing = false;
            } catch (Exception e) {
                throw new RMQJMSException(e);
            } catch (JMSException e2) {
                throw e2;
            }
        } finally {
            this.closedListeners.forEach(closedListener -> {
                try {
                    closedListener.closed(this);
                } catch (Exception e3) {
                    this.logger.warn("Exception while calling consumer closing listener: {}", e3.getMessage());
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RMQDestination getDestination() {
        return this.destination;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RMQSession getSession() {
        return this.session;
    }

    private String getUUIDTag() {
        return this.uuidTag;
    }

    public Topic getTopic() throws JMSException {
        if (getDestination().isQueue()) {
            throw new JMSException("Destination is not of type Topic");
        }
        return getDestination();
    }

    public boolean getNoLocal() throws JMSException {
        return getNoLocalNoException();
    }

    private boolean getNoLocalNoException() {
        return this.noLocal;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void pause() throws Exception {
        this.receiveManager.closeGate();
        this.receiveManager.waitToClear(new TimeTracker(STOP_TIMEOUT_MS, TimeUnit.MILLISECONDS));
        this.abortables.stop();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void resume() throws JMSException {
        try {
            this.abortables.start();
            this.receiveManager.openGate();
        } catch (JMSException e) {
            throw e;
        } catch (Exception e2) {
            throw new RMQJMSException(e2);
        }
    }

    public boolean isDurable() {
        return this.durable;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setDurable(boolean z) {
        this.durable = z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setNoLocal(boolean z) {
        this.noLocal = z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public GetResponse getFromRabbitQueue() {
        String rmqQueueName = rmqQueueName();
        try {
            return getSession().getChannel().basicGet(rmqQueueName, false);
        } catch (Exception e) {
            if ((e instanceof ShutdownSignalException) || (e.getCause() instanceof ShutdownSignalException)) {
                return null;
            }
            this.logger.error("basicGet for queue '{}' threw unexpected exception", rmqQueueName, e);
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean amqpAutoAck() {
        return isDirectReplyTo();
    }

    private boolean isDirectReplyTo() {
        return this.destination.isAmqp() && "amq.rabbitmq.reply-to".equals(this.destination.getDestinationName());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addClosedListener(ClosedListener closedListener) {
        this.closedListeners.add(closedListener);
    }
}
