package org.apache.qpid.client;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.jms.MessageConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/qpid-client-1.0-incubating-M3-615355.jar:org/apache/qpid/client/BasicMessageConsumer.class */
public abstract class BasicMessageConsumer<H, B> extends Closeable implements MessageConsumer {
    private static final Logger _logger;
    protected AMQConnection _connection;
    private String _messageSelector;
    private boolean _noLocal;
    private AMQDestination _destination;
    protected AMQShortString _consumerTag;
    protected int _channelId;
    protected final ArrayBlockingQueue _synchronousQueue;
    protected MessageFactoryRegistry _messageFactory;
    protected final AMQSession _session;
    protected AMQProtocolHandler _protocolHandler;
    private FieldTable _rawSelectorFieldTable;
    private int _prefetchHigh;
    private int _prefetchLow;
    private boolean _exclusive;
    private int _acknowledgeMode;
    private int _outstanding;
    private boolean _dups_ok_acknowledge_send;
    private Thread _receivingThread;
    private AMQShortString _queuename;
    private boolean _autoClose;
    private boolean _closeWhenNoMessages;
    private boolean _noConsume;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final AtomicBoolean _receiving = new AtomicBoolean(false);
    private final AtomicReference<MessageListener> _messageListener = new AtomicReference<>();
    private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<>();
    private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<>();
    private List<StackTraceElement> _closedStack = null;

    /* JADX INFO: Access modifiers changed from: protected */
    public BasicMessageConsumer(int i, AMQConnection aMQConnection, AMQDestination aMQDestination, String str, boolean z, MessageFactoryRegistry messageFactoryRegistry, AMQSession aMQSession, AMQProtocolHandler aMQProtocolHandler, FieldTable fieldTable, int i2, int i3, boolean z2, int i4, boolean z3, boolean z4) {
        this._channelId = i;
        this._connection = aMQConnection;
        this._messageSelector = str;
        this._noLocal = z;
        this._destination = aMQDestination;
        this._messageFactory = messageFactoryRegistry;
        this._session = aMQSession;
        this._protocolHandler = aMQProtocolHandler;
        this._rawSelectorFieldTable = fieldTable;
        this._prefetchHigh = i2;
        this._prefetchLow = i3;
        this._exclusive = z2;
        this._acknowledgeMode = i4;
        this._synchronousQueue = new ArrayBlockingQueue(i2, true);
        this._autoClose = z4;
        this._noConsume = z3;
        if (this._noConsume) {
            this._acknowledgeMode = 257;
        }
    }

    public AMQDestination getDestination() {
        return this._destination;
    }

    @Override // javax.jms.MessageConsumer
    public String getMessageSelector() throws JMSException {
        checkPreConditions();
        return this._messageSelector;
    }

    @Override // javax.jms.MessageConsumer
    public MessageListener getMessageListener() throws JMSException {
        checkPreConditions();
        return this._messageListener.get();
    }

    public int getAcknowledgeMode() {
        return this._acknowledgeMode;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isMessageListenerSet() {
        return this._messageListener.get() != null;
    }

    @Override // javax.jms.MessageConsumer
    public void setMessageListener(MessageListener messageListener) throws JMSException {
        checkPreConditions();
        if (!this._session.getAMQConnection().started()) {
            this._messageListener.set(messageListener);
            this._session.setHasMessageListeners();
            if (_logger.isDebugEnabled()) {
                _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination " + this._destination);
                return;
            }
            return;
        }
        if (this._receiving.get()) {
            throw new IllegalStateException("Another thread is already receiving synchronously.");
        }
        if (!this._messageListener.compareAndSet(null, messageListener)) {
            throw new IllegalStateException("Attempt to alter listener while session is started.");
        }
        _logger.debug("Message listener set for destination " + this._destination);
        if (messageListener != null) {
            synchronized (this._session) {
                this._messageListener.set(messageListener);
                this._session.setHasMessageListeners();
                this._session.startDistpatcherIfNecessary();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void preApplicationProcessing(AbstractJMSMessage abstractJMSMessage) throws JMSException {
        if (this._session.getAcknowledgeMode() == 2) {
            this._unacknowledgedDeliveryTags.add(Long.valueOf(abstractJMSMessage.getDeliveryTag()));
        }
        this._session.setInRecovery(false);
    }

    private void acquireReceiving() throws JMSException {
        if (!this._receiving.compareAndSet(false, true)) {
            throw new IllegalStateException("Another thread is already receiving.");
        }
        if (isMessageListenerSet()) {
            throw new IllegalStateException("A listener has already been set.");
        }
        this._receivingThread = Thread.currentThread();
    }

    private void releaseReceiving() {
        this._receiving.set(false);
        this._receivingThread = null;
    }

    public FieldTable getRawSelectorFieldTable() {
        return this._rawSelectorFieldTable;
    }

    public int getPrefetch() {
        return this._prefetchHigh;
    }

    public int getPrefetchHigh() {
        return this._prefetchHigh;
    }

    public int getPrefetchLow() {
        return this._prefetchLow;
    }

    public boolean isNoLocal() {
        return this._noLocal;
    }

    public boolean isExclusive() {
        return this._exclusive;
    }

    public boolean isReceiving() {
        return this._receiving.get();
    }

    @Override // javax.jms.MessageConsumer
    public Message receive() throws JMSException {
        return receive(0L);
    }

    @Override // javax.jms.MessageConsumer
    public Message receive(long j) throws JMSException {
        checkPreConditions();
        acquireReceiving();
        this._session.startDistpatcherIfNecessary();
        try {
            try {
                if (closeOnAutoClose()) {
                    releaseReceiving();
                    return null;
                }
                AbstractJMSMessage returnMessageOrThrow = returnMessageOrThrow(getMessageFromQueue(j));
                if (returnMessageOrThrow != null) {
                    preApplicationProcessing(returnMessageOrThrow);
                    postDeliver(returnMessageOrThrow);
                }
                releaseReceiving();
                return returnMessageOrThrow;
            } catch (InterruptedException e) {
                _logger.warn("Interrupted: " + e);
                releaseReceiving();
                return null;
            }
        } catch (Throwable th) {
            releaseReceiving();
            throw th;
        }
    }

    public abstract Object getMessageFromQueue(long j) throws InterruptedException;

    private boolean closeOnAutoClose() throws JMSException {
        if (!isAutoClose() || !this._closeWhenNoMessages || !this._synchronousQueue.isEmpty()) {
            return false;
        }
        close(false);
        return true;
    }

    @Override // javax.jms.MessageConsumer
    public Message receiveNoWait() throws JMSException {
        checkPreConditions();
        acquireReceiving();
        this._session.startDistpatcherIfNecessary();
        try {
            try {
                if (closeOnAutoClose()) {
                    return null;
                }
                AbstractJMSMessage returnMessageOrThrow = returnMessageOrThrow(getMessageFromQueue(-1L));
                if (returnMessageOrThrow != null) {
                    preApplicationProcessing(returnMessageOrThrow);
                    postDeliver(returnMessageOrThrow);
                }
                releaseReceiving();
                return returnMessageOrThrow;
            } catch (InterruptedException e) {
                _logger.warn("Interrupted: " + e);
                releaseReceiving();
                return null;
            }
        } finally {
            releaseReceiving();
        }
    }

    private AbstractJMSMessage returnMessageOrThrow(Object obj) throws JMSException {
        if (!(obj instanceof Throwable)) {
            return (AbstractJMSMessage) obj;
        }
        JMSException jMSException = new JMSException("Message consumer forcibly closed due to error: " + obj);
        if (obj instanceof Exception) {
            jMSException.setLinkedException((Exception) obj);
        }
        throw jMSException;
    }

    @Override // org.apache.qpid.client.Closeable, javax.jms.Connection
    public void close() throws JMSException {
        close(true);
    }

    public void close(boolean z) throws JMSException {
        if (_logger.isInfoEnabled()) {
            _logger.info("Closing consumer:" + debugIdentity());
        }
        synchronized (this._connection.getFailoverMutex()) {
            if (!this._closed.getAndSet(true)) {
                if (_logger.isTraceEnabled()) {
                    if (this._closedStack != null) {
                        _logger.trace(((Object) this._consumerTag) + " close():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
                        _logger.trace(((Object) this._consumerTag) + " previously:" + this._closedStack.toString());
                    } else {
                        this._closedStack = Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6);
                    }
                }
                if (z) {
                    sendCancel();
                } else {
                    deregisterConsumer();
                }
                if (this._messageListener != null && this._receiving.get() && this._receivingThread != null) {
                    if (_logger.isInfoEnabled()) {
                        _logger.info("Interrupting thread: " + this._receivingThread);
                    }
                    this._receivingThread.interrupt();
                }
            }
        }
    }

    public abstract void sendCancel() throws JMSAMQException;

    /* JADX INFO: Access modifiers changed from: package-private */
    public void markClosed() {
        this._closed.set(true);
        if (_logger.isTraceEnabled()) {
            if (this._closedStack != null) {
                _logger.trace(((Object) this._consumerTag) + " markClosed():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
                _logger.trace(((Object) this._consumerTag) + " previously:" + this._closedStack.toString());
            } else {
                this._closedStack = Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8);
            }
        }
        deregisterConsumer();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyMessage(UnprocessedMessage unprocessedMessage, int i) {
        boolean isDebugEnabled = _logger.isDebugEnabled();
        if (isDebugEnabled) {
            _logger.debug("notifyMessage called with message number " + unprocessedMessage.getDeliveryTag());
        }
        try {
            AbstractJMSMessage createJMSMessageFromUnprocessedMessage = createJMSMessageFromUnprocessedMessage(unprocessedMessage);
            if (isDebugEnabled) {
                _logger.debug("Message is of type: " + createJMSMessageFromUnprocessedMessage.getClass().getName());
            }
            createJMSMessageFromUnprocessedMessage.setConsumer(this);
            preDeliver(createJMSMessageFromUnprocessedMessage);
            notifyMessage(createJMSMessageFromUnprocessedMessage, i);
        } catch (Exception e) {
            if (e instanceof InterruptedException) {
                _logger.info("SynchronousQueue.put interupted. Usually result of connection closing");
            } else {
                _logger.error("Caught exception (dump follows) - ignoring...", (Throwable) e);
            }
        }
    }

    public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<H, B> unprocessedMessage) throws Exception;

    public void notifyMessage(AbstractJMSMessage abstractJMSMessage, int i) {
        try {
            if (isMessageListenerSet()) {
                preApplicationProcessing(abstractJMSMessage);
                getMessageListener().onMessage(abstractJMSMessage);
                postDeliver(abstractJMSMessage);
            } else {
                this._synchronousQueue.put(abstractJMSMessage);
            }
        } catch (Exception e) {
            if (e instanceof InterruptedException) {
                _logger.info("reNotification : SynchronousQueue.put interupted. Usually result of connection closing");
            } else {
                _logger.error("reNotification : Caught exception (dump follows) - ignoring...", (Throwable) e);
            }
        }
    }

    void preDeliver(AbstractJMSMessage abstractJMSMessage) {
        switch (this._acknowledgeMode) {
            case 2:
                abstractJMSMessage.setAMQSession(this._session);
                return;
            case 258:
                this._session.acknowledgeMessage(abstractJMSMessage.getDeliveryTag(), false);
                return;
            default:
                return;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void postDeliver(AbstractJMSMessage abstractJMSMessage) throws JMSException {
        abstractJMSMessage.setJMSDestination(this._destination);
        switch (this._acknowledgeMode) {
            case 0:
                if (isNoConsume()) {
                    this._session.acknowledgeMessage(abstractJMSMessage.getDeliveryTag(), false);
                    return;
                } else {
                    this._receivedDeliveryTags.add(Long.valueOf(abstractJMSMessage.getDeliveryTag()));
                    return;
                }
            case 1:
                if (this._session.isInRecovery()) {
                    return;
                }
                this._session.acknowledgeMessage(abstractJMSMessage.getDeliveryTag(), false);
                return;
            case 2:
                if (isNoConsume()) {
                    this._session.acknowledgeMessage(abstractJMSMessage.getDeliveryTag(), false);
                    return;
                }
                return;
            case 3:
                int i = this._outstanding + 1;
                this._outstanding = i;
                if (i >= this._prefetchHigh) {
                    this._dups_ok_acknowledge_send = true;
                }
                if (this._outstanding <= this._prefetchLow) {
                    this._dups_ok_acknowledge_send = false;
                }
                if (!this._dups_ok_acknowledge_send || this._session.isInRecovery()) {
                    return;
                }
                this._session.acknowledgeMessage(abstractJMSMessage.getDeliveryTag(), true);
                return;
            default:
                return;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void acknowledgeLastDelivered() {
        long j;
        if (this._receivedDeliveryTags.isEmpty()) {
            return;
        }
        long longValue = this._receivedDeliveryTags.poll().longValue();
        while (true) {
            j = longValue;
            if (this._receivedDeliveryTags.isEmpty()) {
                break;
            } else {
                longValue = this._receivedDeliveryTags.poll().longValue();
            }
        }
        if (!$assertionsDisabled && !this._receivedDeliveryTags.isEmpty()) {
            throw new AssertionError();
        }
        this._session.acknowledgeMessage(j, true);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyError(Throwable th) {
        this._closed.set(true);
        if (_logger.isTraceEnabled()) {
            if (this._closedStack != null) {
                _logger.trace(((Object) this._consumerTag) + " notifyError():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
                _logger.trace(((Object) this._consumerTag) + " previously" + this._closedStack.toString());
            } else {
                this._closedStack = Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8);
            }
        }
        if (!isMessageListenerSet() && this._synchronousQueue.offer(th)) {
            _logger.debug("Passed exception to synchronous queue for propagation to receive()");
        }
        deregisterConsumer();
    }

    private void deregisterConsumer() {
        this._session.deregisterConsumer(this);
    }

    public AMQShortString getConsumerTag() {
        return this._consumerTag;
    }

    public void setConsumerTag(AMQShortString aMQShortString) {
        this._consumerTag = aMQShortString;
    }

    public AMQSession getSession() {
        return this._session;
    }

    private void checkPreConditions() throws JMSException {
        checkNotClosed();
        if (this._session == null || this._session.isClosed()) {
            throw new IllegalStateException("Invalid Session");
        }
    }

    public void acknowledge() {
        if (isClosed()) {
            throw new IllegalStateException("Consumer is closed");
        }
        Iterator<Long> it = this._unacknowledgedDeliveryTags.iterator();
        while (it.hasNext()) {
            this._session.acknowledgeMessage(it.next().longValue(), false);
            it.remove();
        }
    }

    public void clearUnackedMessages() {
        this._unacknowledgedDeliveryTags.clear();
    }

    public boolean isAutoClose() {
        return this._autoClose;
    }

    public boolean isNoConsume() {
        return this._noConsume;
    }

    public void closeWhenNoMessages(boolean z) {
        this._closeWhenNoMessages = z;
        if (this._closeWhenNoMessages && this._synchronousQueue.isEmpty() && this._receiving.get() && this._messageListener != null) {
            this._receivingThread.interrupt();
        }
    }

    public void rollback() {
        clearUnackedMessages();
        if (!this._receivedDeliveryTags.isEmpty()) {
            _logger.debug("Rejecting received messages in _receivedDTs (RQ)");
        }
        rollbackReceivedMessages();
        if (this._synchronousQueue.size() > 0) {
            if (_logger.isDebugEnabled()) {
                _logger.debug("Rejecting the messages(" + this._synchronousQueue.size() + ") in _syncQueue (PRQ)for consumer with tag:" + ((Object) this._consumerTag));
            }
            Iterator it = this._synchronousQueue.iterator();
            while (it.hasNext()) {
                Object next = it.next();
                if (next instanceof AbstractJMSMessage) {
                    this._session.rejectMessage((AbstractJMSMessage) next, true);
                    if (_logger.isTraceEnabled()) {
                        _logger.trace("Rejected message:" + ((AbstractJMSMessage) next).getDeliveryTag());
                    }
                    it.remove();
                } else {
                    _logger.error("Queue contained a :" + next.getClass() + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
                    it.remove();
                }
            }
            if (this._synchronousQueue.size() != 0) {
                _logger.warn("Queue was not empty after rejecting all messages Remaining:" + this._synchronousQueue.size());
                rollback();
            }
            clearReceiveQueue();
        }
    }

    protected void rollbackReceivedMessages() {
        while (!this._receivedDeliveryTags.isEmpty()) {
            if (_logger.isDebugEnabled()) {
                _logger.debug("Rejecting the messages(" + this._receivedDeliveryTags.size() + ") in _receivedDTs (RQ)for consumer with tag:" + ((Object) this._consumerTag));
            }
            Long poll = this._receivedDeliveryTags.poll();
            if (poll != null) {
                if (_logger.isTraceEnabled()) {
                    _logger.trace("Rejecting tag from _receivedDTs:" + poll);
                }
                this._session.rejectMessage(poll.longValue(), true);
            }
        }
        if (this._receivedDeliveryTags.isEmpty() || !_logger.isDebugEnabled()) {
            return;
        }
        _logger.debug("Queue _receivedDTs (RQ) was not empty after rejection");
    }

    public String debugIdentity() {
        return String.valueOf(this._consumerTag);
    }

    public void clearReceiveQueue() {
        this._synchronousQueue.clear();
    }

    public void start() {
    }

    public void stop() {
    }

    public AMQShortString getQueuename() {
        return this._queuename;
    }

    public void setQueuename(AMQShortString aMQShortString) {
        this._queuename = aMQShortString;
    }

    public void addBindingKey(AMQDestination aMQDestination, String str) throws AMQException {
        this._session.addBindingKey(this, aMQDestination, str);
    }

    static {
        $assertionsDisabled = !BasicMessageConsumer.class.desiredAssertionStatus();
        _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
    }
}
