package org.apache.qpid.client;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.CloseConsumerMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.jms.MessageConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/qpid/client/BasicMessageConsumer.class */
public abstract class BasicMessageConsumer<U> extends Closeable implements MessageConsumer {
    private static final Logger _logger;
    protected final AMQConnection _connection;
    protected final String _messageSelector;
    private final boolean _noLocal;
    protected AMQDestination _destination;
    protected int _consumerTag;
    protected final int _channelId;
    protected final MessageFactoryRegistry _messageFactory;
    protected final AMQSession _session;
    protected final AMQProtocolHandler _protocolHandler;
    private final FieldTable _arguments;
    private final int _prefetchHigh;
    private final int _prefetchLow;
    protected boolean _exclusive;
    protected final int _acknowledgeMode;
    private int _outstanding;
    private boolean _dups_ok_acknowledge_send;
    private long _lastAcked;
    private Thread _receivingThread;
    private AMQShortString _queuename;
    private final boolean _autoClose;
    private final boolean _noConsume;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final AtomicBoolean _receiving = new AtomicBoolean(false);
    private final AtomicReference<MessageListener> _messageListener = new AtomicReference<>();
    private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<>();
    private final SortedSet<Long> _previouslyAcked = new TreeSet();
    private final Object _commitLock = new Object();
    private List<StackTraceElement> _closedStack = null;
    protected final BlockingQueue _synchronousQueue = new LinkedBlockingQueue();

    /* JADX INFO: Access modifiers changed from: protected */
    public BasicMessageConsumer(int i, AMQConnection aMQConnection, AMQDestination aMQDestination, String str, boolean z, MessageFactoryRegistry messageFactoryRegistry, AMQSession aMQSession, AMQProtocolHandler aMQProtocolHandler, FieldTable fieldTable, int i2, int i3, boolean z2, int i4, boolean z3, boolean z4) {
        this._channelId = i;
        this._connection = aMQConnection;
        this._messageSelector = str;
        this._noLocal = z;
        this._destination = aMQDestination;
        this._messageFactory = messageFactoryRegistry;
        this._session = aMQSession;
        this._protocolHandler = aMQProtocolHandler;
        this._arguments = fieldTable;
        this._prefetchHigh = i2;
        this._prefetchLow = i3;
        this._exclusive = z2;
        this._autoClose = z4;
        this._noConsume = z3;
        if (this._noConsume) {
            this._acknowledgeMode = 257;
        } else {
            this._acknowledgeMode = i4;
        }
    }

    public AMQDestination getDestination() {
        return this._destination;
    }

    public String getMessageSelector() throws JMSException {
        checkPreConditions();
        return this._messageSelector;
    }

    public MessageListener getMessageListener() throws JMSException {
        checkPreConditions();
        return this._messageListener.get();
    }

    public int getAcknowledgeMode() {
        return this._acknowledgeMode;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isMessageListenerSet() {
        return this._messageListener.get() != null;
    }

    public void setMessageListener(MessageListener messageListener) throws JMSException {
        checkPreConditions();
        if (!this._session.getAMQConnection().started()) {
            this._messageListener.set(messageListener);
            this._session.setHasMessageListeners();
            if (_logger.isDebugEnabled()) {
                _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination " + this._destination);
                return;
            }
            return;
        }
        if (this._receiving.get()) {
            throw new IllegalStateException("Another thread is already receiving synchronously.");
        }
        if (!this._messageListener.compareAndSet(null, messageListener)) {
            throw new IllegalStateException("Attempt to alter listener while session is started.");
        }
        _logger.debug("Message listener set for destination " + this._destination);
        if (messageListener != null) {
            synchronized (this._session) {
                this._messageListener.set(messageListener);
                this._session.setHasMessageListeners();
                this._session.startDispatcherIfNecessary();
                Object poll = this._synchronousQueue.poll();
                while (poll != null) {
                    notifyMessage((AbstractJMSMessage) poll);
                    poll = this._synchronousQueue.poll();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void preApplicationProcessing(AbstractJMSMessage abstractJMSMessage) throws JMSException {
        if (this._session.getAcknowledgeMode() == 2) {
            this._session.addUnacknowledgedMessage(abstractJMSMessage.getDeliveryTag());
        }
        this._session.setInRecovery(false);
        preDeliver(abstractJMSMessage);
    }

    private boolean acquireReceiving(boolean z) throws JMSException, InterruptedException {
        if (this._connection.isFailingOver()) {
            if (z) {
                return false;
            }
            this._connection.blockUntilNotFailingOver();
        }
        if (!this._receiving.compareAndSet(false, true)) {
            throw new IllegalStateException("Another thread is already receiving.");
        }
        if (isMessageListenerSet()) {
            throw new IllegalStateException("A listener has already been set.");
        }
        this._receivingThread = Thread.currentThread();
        return true;
    }

    private void releaseReceiving() {
        this._receiving.set(false);
        this._receivingThread = null;
    }

    public FieldTable getArguments() {
        return this._arguments;
    }

    public int getPrefetch() {
        return this._prefetchHigh;
    }

    public int getPrefetchHigh() {
        return this._prefetchHigh;
    }

    public int getPrefetchLow() {
        return this._prefetchLow;
    }

    public boolean isNoLocal() {
        return this._noLocal;
    }

    public boolean isExclusive() {
        return this._exclusive;
    }

    public boolean isReceiving() {
        return this._receiving.get();
    }

    public Message receive() throws JMSException {
        return receive(0L);
    }

    public Message receive(long j) throws JMSException {
        checkPreConditions();
        try {
            acquireReceiving(false);
        } catch (InterruptedException e) {
            _logger.warn("Interrupted acquire: " + e);
            if (isClosed()) {
                return null;
            }
        }
        this._session.startDispatcherIfNecessary();
        try {
            try {
                AbstractJMSMessage returnMessageOrThrow = returnMessageOrThrow(getMessageFromQueue(j));
                if (returnMessageOrThrow != null) {
                    preApplicationProcessing(returnMessageOrThrow);
                    postDeliver(returnMessageOrThrow);
                }
                return returnMessageOrThrow;
            } catch (InterruptedException e2) {
                _logger.warn("Interrupted: " + e2);
                releaseReceiving();
                return null;
            }
        } finally {
            releaseReceiving();
        }
    }

    public Object getMessageFromQueue(long j) throws InterruptedException {
        return j > 0 ? this._synchronousQueue.poll(j, TimeUnit.MILLISECONDS) : j < 0 ? this._synchronousQueue.poll() : this._synchronousQueue.take();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract Message receiveBrowse() throws JMSException;

    public Message receiveNoWait() throws JMSException {
        checkPreConditions();
        try {
            if (!acquireReceiving(true)) {
                return null;
            }
            this._session.startDispatcherIfNecessary();
            try {
                try {
                    AbstractJMSMessage returnMessageOrThrow = returnMessageOrThrow(getMessageFromQueue(-1L));
                    if (returnMessageOrThrow != null) {
                        preApplicationProcessing(returnMessageOrThrow);
                        postDeliver(returnMessageOrThrow);
                    }
                    releaseReceiving();
                    return returnMessageOrThrow;
                } catch (Throwable th) {
                    releaseReceiving();
                    throw th;
                }
            } catch (InterruptedException e) {
                _logger.warn("Interrupted: " + e);
                releaseReceiving();
                return null;
            }
        } catch (InterruptedException e2) {
            return null;
        }
    }

    private AbstractJMSMessage returnMessageOrThrow(Object obj) throws JMSException {
        if (obj instanceof Throwable) {
            JMSException jMSException = new JMSException("Message consumer forcibly closed due to error: " + obj);
            jMSException.initCause((Throwable) obj);
            if (obj instanceof Exception) {
                jMSException.setLinkedException((Exception) obj);
            }
            throw jMSException;
        }
        if (!(obj instanceof CloseConsumerMessage)) {
            return (AbstractJMSMessage) obj;
        }
        this._closed.set(true);
        deregisterConsumer();
        return null;
    }

    @Override // org.apache.qpid.client.Closeable
    public void close() throws JMSException {
        close(true);
    }

    public void close(boolean z) throws JMSException {
        if (_logger.isInfoEnabled()) {
            _logger.info("Closing consumer:" + debugIdentity());
        }
        if (this._closed.getAndSet(true)) {
            return;
        }
        this._closing.set(true);
        if (_logger.isDebugEnabled()) {
            StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
            if (this._closedStack != null) {
                _logger.debug(this._consumerTag + " previously:" + this._closedStack.toString());
            } else {
                this._closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
            }
        }
        if (z) {
            synchronized (this._connection.getFailoverMutex()) {
                try {
                    if (!this._session.isClosed() || this._session.isClosing()) {
                        sendCancel();
                    }
                } catch (AMQException e) {
                    throw new JMSAMQException("Error closing consumer: " + e, e);
                } catch (FailoverException e2) {
                    throw new JMSAMQException("FailoverException interrupted basic cancel.", e2);
                }
            }
        } else {
            deregisterConsumer();
        }
        if (this._messageListener == null || !this._receiving.get()) {
            return;
        }
        if (_logger.isInfoEnabled()) {
            _logger.info("Interrupting thread: " + this._receivingThread);
        }
        this._receivingThread.interrupt();
    }

    abstract void sendCancel() throws AMQException, FailoverException;

    /* JADX INFO: Access modifiers changed from: package-private */
    public void markClosed() {
        this._closed.set(true);
        if (_logger.isDebugEnabled()) {
            if (this._closedStack != null) {
                StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
                _logger.debug(this._consumerTag + " markClosed():" + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
                _logger.debug(this._consumerTag + " previously:" + this._closedStack.toString());
            } else {
                StackTraceElement[] stackTrace2 = Thread.currentThread().getStackTrace();
                this._closedStack = Arrays.asList(stackTrace2).subList(3, stackTrace2.length - 1);
            }
        }
        deregisterConsumer();
    }

    public void notifyCloseMessage(CloseConsumerMessage closeConsumerMessage) {
        if (isMessageListenerSet()) {
            _logger.warn("Using an AutoCloseconsumer with message listener is not supported.");
            return;
        }
        try {
            this._synchronousQueue.put(closeConsumerMessage);
        } catch (InterruptedException e) {
            _logger.info(" SynchronousQueue.put interupted. Usually result of connection closing,but we shouldn't have close yet");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    public void notifyMessage(U u) {
        if (u instanceof CloseConsumerMessage) {
            notifyCloseMessage((CloseConsumerMessage) u);
            return;
        }
        try {
            AbstractJMSMessage createJMSMessageFromUnprocessedMessage = createJMSMessageFromUnprocessedMessage(this._session.getMessageDelegateFactory(), u);
            if (_logger.isDebugEnabled()) {
                _logger.debug("Message is of type: " + createJMSMessageFromUnprocessedMessage.getClass().getName());
            }
            notifyMessage(createJMSMessageFromUnprocessedMessage);
        } catch (Exception e) {
            if (e instanceof InterruptedException) {
                _logger.info("SynchronousQueue.put interupted. Usually result of connection closing");
            } else {
                _logger.error("Caught exception (dump follows) - ignoring...", e);
            }
        }
    }

    public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory aMQMessageDelegateFactory, U u) throws Exception;

    public void notifyMessage(AbstractJMSMessage abstractJMSMessage) {
        try {
            if (isMessageListenerSet()) {
                preApplicationProcessing(abstractJMSMessage);
                getMessageListener().onMessage(abstractJMSMessage);
                postDeliver(abstractJMSMessage);
            } else {
                this._synchronousQueue.put(abstractJMSMessage);
            }
        } catch (Exception e) {
            if (e instanceof InterruptedException) {
                _logger.info("reNotification : SynchronousQueue.put interupted. Usually result of connection closing");
            } else {
                _logger.error("reNotification : Caught exception (dump follows) - ignoring...", e);
            }
        }
    }

    void preDeliver(AbstractJMSMessage abstractJMSMessage) {
        switch (this._acknowledgeMode) {
            case 0:
                if (isNoConsume()) {
                    this._session.acknowledgeMessage(abstractJMSMessage.getDeliveryTag(), false);
                    return;
                } else {
                    this._session.addDeliveredMessage(abstractJMSMessage.getDeliveryTag());
                    this._session.markDirty();
                    return;
                }
            case 2:
                abstractJMSMessage.setAMQSession(this._session);
                return;
            case 258:
                this._session.acknowledgeMessage(abstractJMSMessage.getDeliveryTag(), false);
                return;
            default:
                return;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void postDeliver(AbstractJMSMessage abstractJMSMessage) throws JMSException {
        switch (this._acknowledgeMode) {
            case 1:
            case 3:
                if (this._session.isInRecovery()) {
                    return;
                }
                this._session.acknowledgeMessage(abstractJMSMessage.getDeliveryTag(), false);
                return;
            case 2:
                if (isNoConsume()) {
                    this._session.acknowledgeMessage(abstractJMSMessage.getDeliveryTag(), false);
                }
                this._session.markDirty();
                return;
            default:
                return;
        }
    }

    Long getLastDelivered() {
        Long l;
        if (this._receivedDeliveryTags.isEmpty()) {
            return null;
        }
        Long poll = this._receivedDeliveryTags.poll();
        while (true) {
            l = poll;
            if (this._receivedDeliveryTags.isEmpty()) {
                break;
            }
            poll = this._receivedDeliveryTags.poll();
        }
        if ($assertionsDisabled || this._receivedDeliveryTags.isEmpty()) {
            return l;
        }
        throw new AssertionError();
    }

    void acknowledgeDelivered() {
        synchronized (this._commitLock) {
            ArrayList arrayList = new ArrayList();
            while (!this._receivedDeliveryTags.isEmpty()) {
                arrayList.add(this._receivedDeliveryTags.poll());
            }
            Collections.sort(arrayList);
            long j = this._lastAcked;
            long j2 = -1;
            while (j2 != j) {
                j2 = j;
                Iterator it = arrayList.iterator();
                while (it.hasNext() && ((Long) it.next()).longValue() == j + 1) {
                    it.remove();
                    j++;
                }
                Iterator<Long> it2 = this._previouslyAcked.iterator();
                while (it2.hasNext() && it2.next().longValue() == j + 1) {
                    it2.remove();
                    j++;
                }
            }
            if (j != this._lastAcked) {
                this._session.acknowledgeMessage(j, true);
                this._lastAcked = j;
            }
            Iterator it3 = arrayList.iterator();
            while (it3.hasNext()) {
                Long l = (Long) it3.next();
                this._session.acknowledgeMessage(l.longValue(), false);
                this._previouslyAcked.add(l);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyError(Throwable th) {
        this._closed.set(true);
        if (_logger.isDebugEnabled()) {
            StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
            if (this._closedStack != null) {
                _logger.debug(this._consumerTag + " notifyError():" + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
                _logger.debug(this._consumerTag + " previously" + this._closedStack.toString());
            } else {
                this._closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
            }
        }
        if (!isMessageListenerSet() && this._synchronousQueue.offer(th)) {
            _logger.debug("Passed exception to synchronous queue for propagation to receive()");
        }
        deregisterConsumer();
    }

    private void deregisterConsumer() {
        this._session.deregisterConsumer(this);
    }

    public int getConsumerTag() {
        return this._consumerTag;
    }

    public void setConsumerTag(int i) {
        this._consumerTag = i;
    }

    public AMQSession getSession() {
        return this._session;
    }

    private void checkPreConditions() throws JMSException {
        checkNotClosed();
        if (this._session == null || this._session.isClosed()) {
            throw new IllegalStateException("Invalid Session");
        }
    }

    public boolean isAutoClose() {
        return this._autoClose;
    }

    public boolean isNoConsume() {
        return this._noConsume || this._destination.isBrowseOnly();
    }

    public void rollback() {
        rollbackPendingMessages();
    }

    public void rollbackPendingMessages() {
        boolean z;
        if (this._synchronousQueue.size() > 0) {
            if (_logger.isDebugEnabled()) {
                _logger.debug("Rejecting the messages(" + this._synchronousQueue.size() + ") in _syncQueue (PRQ)for consumer with tag:" + this._consumerTag);
            }
            Iterator it = this._synchronousQueue.iterator();
            int size = this._synchronousQueue.size();
            boolean z2 = false;
            while (true) {
                z = z2;
                if (!it.hasNext()) {
                    break;
                }
                Object next = it.next();
                if (next instanceof AbstractJMSMessage) {
                    this._session.rejectMessage((AbstractJMSMessage) next, true);
                    if (_logger.isDebugEnabled()) {
                        _logger.debug("Rejected message:" + ((AbstractJMSMessage) next).getDeliveryTag());
                    }
                    it.remove();
                    z2 = true;
                } else {
                    _logger.error("Queue contained a :" + next.getClass() + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
                    it.remove();
                    z2 = true;
                }
            }
            if (z && size == this._synchronousQueue.size()) {
                _logger.error("Queue had content removed but didn't change in size." + size);
            }
            if (this._synchronousQueue.size() != 0) {
                _logger.warn("Queue was not empty after rejecting all messages Remaining:" + this._synchronousQueue.size());
                rollback();
            }
            clearReceiveQueue();
        }
    }

    public String debugIdentity() {
        return String.valueOf(this._consumerTag) + "[" + System.identityHashCode(this) + "]";
    }

    public void clearReceiveQueue() {
        this._synchronousQueue.clear();
    }

    public List<Long> drainReceiverQueueAndRetrieveDeliveryTags() {
        Iterator it = this._synchronousQueue.iterator();
        ArrayList arrayList = new ArrayList(this._synchronousQueue.size());
        while (it.hasNext()) {
            arrayList.add(Long.valueOf(((AbstractJMSMessage) it.next()).getDeliveryTag()));
            it.remove();
        }
        return arrayList;
    }

    public AMQShortString getQueuename() {
        return this._queuename;
    }

    public void setQueuename(AMQShortString aMQShortString) {
        this._queuename = aMQShortString;
    }

    public void addBindingKey(AMQDestination aMQDestination, String str) throws AMQException {
        this._session.addBindingKey(this, aMQDestination, str);
    }

    public void failedOverPre() {
        clearReceiveQueue();
    }

    public void failedOverPost() {
    }

    static {
        $assertionsDisabled = !BasicMessageConsumer.class.desiredAssertionStatus();
        _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
    }
}
