package org.apache.qpid.server.queue;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize;
import org.apache.qpid.util.MessageQueue;

/* loaded from: input_file:org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.class */
public class ConcurrentSelectorDeliveryManager implements DeliveryManager {
    private static final Logger _log;

    @Configured(path = "advanced.compressBufferOnQueue", defaultValue = "false")
    public boolean compressBufferOnQueue;
    private final SubscriptionManager _subscriptions;
    private final AMQQueue _queue;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final MessageQueue<QueueEntry> _messages = new ConcurrentLinkedMessageQueueAtomicSize();
    private final AtomicBoolean _processing = new AtomicBoolean();
    private AtomicBoolean _movingMessages = new AtomicBoolean();
    private ReentrantLock _lock = new ReentrantLock();
    private AtomicLong _totalMessageSize = new AtomicLong();
    private AtomicInteger _extraMessages = new AtomicInteger();
    private Set<Subscription> _hasContent = Collections.synchronizedSet(new HashSet());
    private final Object _queueHeadLock = new Object();
    private String _processingThreadName = "";
    private StoreContext _reapingStoreContext = new StoreContext();
    private final String id = "(" + String.valueOf(System.identityHashCode(this)) + ")";
    final Runner _asyncDelivery = new Runner();

    /* loaded from: input_file:org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager$Runner.class */
    private class Runner implements Runnable {
        private Runner() {
        }

        @Override // java.lang.Runnable
        public void run() {
            String name = Thread.currentThread().getName();
            Thread.currentThread().setName("CSDM-AsyncDelivery:" + name);
            boolean z = true;
            while (z && !ConcurrentSelectorDeliveryManager.this._movingMessages.get()) {
                ConcurrentSelectorDeliveryManager.this.processQueue();
                synchronized (ConcurrentSelectorDeliveryManager.this._asyncDelivery) {
                    if (!ConcurrentSelectorDeliveryManager.this.hasQueuedMessages() || !ConcurrentSelectorDeliveryManager.this._subscriptions.hasActiveSubscribers()) {
                        z = false;
                        ConcurrentSelectorDeliveryManager.this._processing.set(false);
                    }
                }
            }
            Thread.currentThread().setName(name);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptionManager, AMQQueue aMQQueue) {
        Configurator.configure(this);
        if (this.compressBufferOnQueue) {
            _log.warn("Compressing Buffers on queue.");
        }
        this._subscriptions = subscriptionManager;
        this._queue = aMQQueue;
    }

    private boolean addMessageToQueue(QueueEntry queueEntry, boolean z) {
        AMQMessage message = queueEntry.getMessage();
        if (this.compressBufferOnQueue) {
            Iterator<ContentChunk> contentBodyIterator = message.getContentBodyIterator();
            while (contentBodyIterator.hasNext()) {
                contentBodyIterator.next().reduceToFit();
            }
        }
        if (z) {
            synchronized (this._queueHeadLock) {
                this._messages.pushHead(queueEntry);
            }
        } else {
            this._messages.offer(queueEntry);
        }
        this._totalMessageSize.addAndGet(message.getSize());
        return true;
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public boolean hasQueuedMessages() {
        boolean z;
        this._lock.lock();
        try {
            if (this._messages.isEmpty()) {
                if (this._hasContent.isEmpty()) {
                    z = false;
                    return z;
                }
            }
            z = true;
            return z;
        } finally {
            this._lock.unlock();
        }
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public int getQueueMessageCount() {
        return getMessageCount();
    }

    private int getMessageCount() {
        return this._messages.size() + this._extraMessages.get();
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public long getTotalMessageSize() {
        return this._totalMessageSize.get();
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public long getOldestMessageArrival() {
        QueueEntry queueEntry = (QueueEntry) this._messages.peek();
        if (queueEntry == null) {
            return Long.MAX_VALUE;
        }
        return queueEntry.getMessage().getArrivalTime();
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public void subscriberHasPendingResend(boolean z, Subscription subscription, QueueEntry queueEntry) {
        this._lock.lock();
        try {
            if (z) {
                _log.debug("Queue has adding subscriber content");
                this._hasContent.add(subscription);
                this._totalMessageSize.addAndGet(queueEntry.getSize());
                this._extraMessages.addAndGet(1);
            } else {
                _log.debug("Queue has removing subscriber content");
                if (queueEntry == null) {
                    this._hasContent.remove(subscription);
                } else {
                    this._totalMessageSize.addAndGet(-queueEntry.getSize());
                    this._extraMessages.addAndGet(-1);
                }
            }
            this._lock.unlock();
        } catch (Throwable th) {
            this._lock.unlock();
            throw th;
        }
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public void removeExpired() throws AMQException {
        this._lock.lock();
        Iterator it = this._messages.iterator();
        while (it.hasNext()) {
            QueueEntry queueEntry = (QueueEntry) it.next();
            if (queueEntry.expired()) {
                this._totalMessageSize.addAndGet(-queueEntry.getSize());
                this._queue.dequeue(this._reapingStoreContext, queueEntry);
                it.remove();
            }
        }
        this._lock.unlock();
    }

    public boolean isProcessingAsync() {
        return this._processing.get();
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public List<QueueEntry> getMessages() {
        this._lock.lock();
        ArrayList arrayList = new ArrayList();
        Iterator it = this._messages.iterator();
        while (it.hasNext()) {
            arrayList.add((QueueEntry) it.next());
        }
        this._lock.unlock();
        return arrayList;
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public List<QueueEntry> getMessages(long j, long j2) {
        if (j <= 0 || j2 <= 0) {
            return null;
        }
        long j3 = (j2 - j) + 1;
        this._lock.lock();
        ArrayList arrayList = new ArrayList();
        for (QueueEntry queueEntry : this._messages) {
            long longValue = queueEntry.getMessage().getMessageId().longValue();
            if (longValue >= j && longValue <= j2) {
                arrayList.add(queueEntry);
            }
            if (arrayList.size() == j3) {
                break;
            }
        }
        this._lock.unlock();
        return arrayList;
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public void populatePreDeliveryQueue(Subscription subscription) {
        if (_log.isDebugEnabled()) {
            _log.debug("Populating PreDeliveryQueue for Subscription(" + System.identityHashCode(subscription) + ")");
        }
        for (QueueEntry queueEntry : this._messages) {
            if (subscription.hasInterest(queueEntry)) {
                subscription.enqueueForPreDelivery(queueEntry, false);
            }
        }
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public boolean performGet(AMQProtocolSession aMQProtocolSession, AMQChannel aMQChannel, boolean z) throws AMQException {
        QueueEntry nextMessage = getNextMessage();
        if (nextMessage == null) {
            return false;
        }
        if (!z) {
            try {
                if (_log.isDebugEnabled()) {
                    _log.debug("No ack mode so dequeuing message immediately: " + nextMessage.getMessage().getMessageId());
                }
                this._queue.dequeue(aMQChannel.getStoreContext(), nextMessage);
            } finally {
                nextMessage.setDeliveredToConsumer();
            }
        }
        synchronized (aMQChannel) {
            long nextDeliveryTag = aMQChannel.getNextDeliveryTag();
            if (z) {
                aMQChannel.addUnacknowledgedMessage(nextMessage, nextDeliveryTag, null);
            }
            aMQProtocolSession.getProtocolOutputConverter().writeGetOk(nextMessage.getMessage(), aMQChannel.getChannelId(), nextDeliveryTag, this._queue.getMessageCount());
        }
        this._totalMessageSize.addAndGet(-nextMessage.getSize());
        if (!z) {
            nextMessage.getMessage().decrementReference(aMQChannel.getStoreContext());
        }
        return true;
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public void startMovingMessages() {
        this._movingMessages.set(true);
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public void stopMovingMessages() {
        this._movingMessages.set(false);
        if (this._lock.isHeldByCurrentThread()) {
            this._lock.unlock();
        }
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public void removeMovedMessages(List<QueueEntry> list) {
        if (this._subscriptions.hasActiveSubscribers()) {
            for (Subscription subscription : this._subscriptions.getSubscriptions()) {
                if (!subscription.isSuspended() && subscription.filtersMessages()) {
                    Queue<QueueEntry> preDeliveryQueue = subscription.getPreDeliveryQueue();
                    Iterator<QueueEntry> it = list.iterator();
                    while (it.hasNext()) {
                        preDeliveryQueue.remove(it.next());
                    }
                }
            }
        }
        for (QueueEntry queueEntry : list) {
            if (this._messages.remove(queueEntry)) {
                this._totalMessageSize.getAndAdd(-queueEntry.getSize());
            }
        }
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public void removeAMessageFromTop(StoreContext storeContext, AMQQueue aMQQueue) throws AMQException {
        this._lock.lock();
        QueueEntry queueEntry = (QueueEntry) this._messages.poll();
        if (queueEntry != null) {
            aMQQueue.dequeue(storeContext, queueEntry);
            this._totalMessageSize.addAndGet(-queueEntry.getSize());
            queueEntry.getMessage().decrementReference(storeContext);
        }
        this._lock.unlock();
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public long clearAllMessages(StoreContext storeContext) throws AMQException {
        long j = 0;
        this._lock.lock();
        synchronized (this._queueHeadLock) {
            QueueEntry nextMessage = getNextMessage();
            while (nextMessage != null) {
                this._messages.poll();
                this._queue.dequeue(storeContext, nextMessage);
                nextMessage.getMessage().decrementReference(this._reapingStoreContext);
                nextMessage = getNextMessage();
                j++;
            }
            this._totalMessageSize.set(0L);
        }
        this._lock.unlock();
        return j;
    }

    private QueueEntry getNextMessage() throws AMQException {
        return getNextMessage(this._messages, null, false);
    }

    private QueueEntry getNextMessage(Queue<QueueEntry> queue, Subscription subscription, boolean z) throws AMQException {
        QueueEntry peek = queue.peek();
        while (true) {
            QueueEntry queueEntry = peek;
            if (!purgeMessage(queueEntry, subscription, z)) {
                return queueEntry;
            }
            AMQMessage message = queueEntry.getMessage();
            QueueEntry poll = queue.poll();
            if (!$assertionsDisabled && poll != queueEntry) {
                throw new AssertionError();
            }
            if (message.expired(this._queue) && !queueEntry.taken(subscription)) {
                this._totalMessageSize.addAndGet(-queueEntry.getSize());
                this._queue.dequeue(this._reapingStoreContext, queueEntry);
                message.decrementReference(this._reapingStoreContext);
                if (_log.isInfoEnabled()) {
                    _log.info(debugIdentity() + " Doing clean up of the main _message queue.");
                }
            }
            if (_log.isDebugEnabled()) {
                _log.debug("Removed taken message:" + message.debugIdentity());
            }
            peek = queue.peek();
        }
    }

    private boolean purgeMessage(QueueEntry queueEntry, Subscription subscription) throws AMQException {
        return purgeMessage(queueEntry, subscription, false);
    }

    private boolean purgeMessage(QueueEntry queueEntry, Subscription subscription, boolean z) throws AMQException {
        boolean z2 = false;
        if (queueEntry != null) {
            if (queueEntry.expired()) {
                return true;
            }
            if (subscription != null) {
                z2 = !subscription.isBrowser() || queueEntry.isTaken();
            } else {
                queueEntry.isTaken();
                z2 = true;
            }
        }
        return z ? z2 && queueEntry.isTaken() : z2 && queueEntry.taken(subscription);
    }

    public void sendNextMessage(Subscription subscription, AMQQueue aMQQueue) {
        Queue<QueueEntry> nextQueue = subscription.getNextQueue(this._messages);
        if (_log.isDebugEnabled()) {
            _log.debug(debugIdentity() + "Async sendNextMessage for sub (" + System.identityHashCode(subscription) + ") from queue (" + System.identityHashCode(nextQueue) + ") AMQQueue (" + System.identityHashCode(aMQQueue) + ")");
        }
        if (nextQueue == null) {
            if (_log.isInfoEnabled()) {
                _log.info(debugIdentity() + subscription + ": asked to send messages but has none on given queue:" + aMQQueue);
                return;
            }
            return;
        }
        QueueEntry queueEntry = null;
        try {
            synchronized (this._queueHeadLock) {
                QueueEntry nextMessage = getNextMessage(nextQueue, subscription, false);
                if (nextMessage == null) {
                    if (_log.isDebugEnabled()) {
                        _log.debug(debugIdentity() + "No messages for Subscriber(" + System.identityHashCode(subscription) + ") from queue; (" + System.identityHashCode(nextQueue) + ")");
                    }
                    return;
                }
                if (_log.isDebugEnabled()) {
                    _log.debug(debugIdentity() + "Async Delivery Message :" + nextMessage + "(" + System.identityHashCode(nextMessage) + ") by :" + System.identityHashCode(this) + ") to :" + System.identityHashCode(subscription));
                }
                if (nextQueue == this._messages) {
                    this._totalMessageSize.addAndGet(-nextMessage.getSize());
                }
                subscription.send(nextMessage, this._queue);
                QueueEntry poll = nextQueue.poll();
                if (poll != nextMessage) {
                    _log.error("Just send message:" + nextMessage.getMessage().debugIdentity() + " BUT removed this from queue:" + poll);
                }
                if (_log.isDebugEnabled()) {
                    _log.debug(debugIdentity() + "Async Delivered Message r:" + poll.getMessage().debugIdentity() + "d:" + nextMessage + ") by :" + System.identityHashCode(this) + ") to :" + System.identityHashCode(subscription));
                }
                if (nextQueue == subscription.getResendQueue()) {
                    if (_log.isDebugEnabled()) {
                        _log.debug(debugIdentity() + "All messages sent from resendQueue for " + subscription);
                    }
                    if (nextQueue.isEmpty()) {
                        subscriberHasPendingResend(false, subscription, null);
                    }
                    this._extraMessages.decrementAndGet();
                } else if (nextQueue == subscription.getPreDeliveryQueue() && !subscription.isBrowser() && _log.isInfoEnabled()) {
                    cleanMainQueue(subscription);
                }
            }
        } catch (AMQException e) {
            if (0 != 0) {
                queueEntry.release();
            } else {
                _log.error(debugIdentity() + "Unable to release message as it is null. " + e, e);
            }
            _log.error(debugIdentity() + "Unable to deliver message as dequeue failed: " + e, e);
        }
    }

    private void cleanMainQueue(Subscription subscription) {
        try {
            getNextMessage(this._messages, subscription, true);
        } catch (AMQException e) {
            _log.warn("Problem during main queue purge:" + e.getMessage());
        }
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public void enqueueMovedMessages(StoreContext storeContext, List<QueueEntry> list) {
        this._lock.lock();
        Iterator<QueueEntry> it = list.iterator();
        while (it.hasNext()) {
            addMessageToQueue(it.next(), false);
        }
        for (Subscription subscription : this._subscriptions.getSubscriptions()) {
            for (QueueEntry queueEntry : list) {
                if (subscription.hasInterest(queueEntry)) {
                    subscription.enqueueForPreDelivery(queueEntry, true);
                }
            }
        }
        this._lock.unlock();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processQueue() {
        if (_log.isDebugEnabled()) {
            this._processingThreadName = Thread.currentThread().getName();
        }
        if (_log.isDebugEnabled()) {
            _log.debug(debugIdentity() + "Running process Queue." + currentStatus());
        }
        boolean hasActiveSubscribers = this._subscriptions.hasActiveSubscribers();
        while (hasActiveSubscribers && hasQueuedMessages() && !this._movingMessages.get()) {
            hasActiveSubscribers = false;
            for (Subscription subscription : this._subscriptions.getSubscriptions()) {
                synchronized (subscription.getSendLock()) {
                    if (!subscription.isSuspended()) {
                        sendNextMessage(subscription, this._queue);
                        hasActiveSubscribers = true;
                    }
                }
            }
        }
        if (_log.isDebugEnabled()) {
            _log.debug(debugIdentity() + "Done process Queue." + currentStatus());
        }
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public void deliver(StoreContext storeContext, AMQShortString aMQShortString, QueueEntry queueEntry, boolean z) throws AMQException {
        boolean isHeldByCurrentThread;
        boolean isDebugEnabled = _log.isDebugEnabled();
        if (isDebugEnabled) {
            _log.debug(debugIdentity() + "deliver :first(" + z + ") :" + queueEntry);
        }
        this._lock.lock();
        try {
            Subscription nextSubscriber = this._subscriptions.nextSubscriber(queueEntry);
            if (nextSubscriber == null || (!nextSubscriber.filtersMessages() && hasQueuedMessages())) {
                if (isDebugEnabled) {
                    _log.debug(debugIdentity() + "Testing Message(" + queueEntry + ") for Queued Delivery:" + currentStatus());
                }
                if (!queueEntry.getMessage().getMessagePublishInfo().isImmediate()) {
                    addMessageToQueue(queueEntry, z);
                    this._lock.unlock();
                    if (isDebugEnabled) {
                        _log.debug(debugIdentity() + "We have " + this._subscriptions.getSubscriptions().size() + " subscribers to give the message to:" + currentStatus());
                    }
                    for (Subscription subscription : this._subscriptions.getSubscriptions()) {
                        if (subscription.hasInterest(queueEntry)) {
                            if (isDebugEnabled) {
                                _log.debug(debugIdentity() + "Queuing message(" + System.identityHashCode(queueEntry) + ") for PreDelivery for subscriber(" + System.identityHashCode(subscription) + ")");
                            }
                            subscription.enqueueForPreDelivery(queueEntry, z);
                        }
                    }
                    if (nextSubscriber != null && hasQueuedMessages() && !isProcessingAsync() && this._subscriptions.hasActiveSubscribers()) {
                        this._queue.deliverAsync();
                    }
                }
            } else {
                if (nextSubscriber.filtersMessages()) {
                    if (nextSubscriber.getPreDeliveryQueue().size() > 0) {
                        _log.error("Direct delivery from PDQ with queued msgs:" + nextSubscriber.getPreDeliveryQueue().size());
                    }
                } else if (this._messages.size() > 0) {
                    _log.error("Direct delivery from MainQueue queued msgs:" + this._messages.size());
                }
                this._lock.unlock();
                synchronized (nextSubscriber.getSendLock()) {
                    if (!nextSubscriber.isSuspended()) {
                        if (isDebugEnabled) {
                            _log.debug(debugIdentity() + "Delivering Message:" + queueEntry.getMessage().debugIdentity() + " to(" + System.identityHashCode(nextSubscriber) + ") :" + nextSubscriber);
                        }
                        if (queueEntry.taken(nextSubscriber)) {
                            if (isHeldByCurrentThread) {
                                return;
                            } else {
                                return;
                            }
                        }
                        nextSubscriber.send(queueEntry, this._queue);
                    } else if (isDebugEnabled) {
                        _log.debug(debugIdentity() + " Subscription(" + System.identityHashCode(nextSubscriber) + ") became suspended between nextSubscriber and send for message:" + queueEntry.getMessage().debugIdentity());
                    }
                    if (!queueEntry.isTaken()) {
                        if (isDebugEnabled) {
                            _log.debug(debugIdentity() + " Message(" + queueEntry.getMessage().debugIdentity() + ") has not been taken so recursing!: Subscriber:" + System.identityHashCode(nextSubscriber));
                        }
                        deliver(storeContext, aMQShortString, queueEntry, z);
                    } else if (isDebugEnabled) {
                        _log.debug(debugIdentity() + " Message(" + queueEntry.toString() + ") has been taken so disregarding deliver request to Subscriber:" + System.identityHashCode(nextSubscriber));
                    }
                }
            }
            if (this._lock.isHeldByCurrentThread()) {
                this._lock.unlock();
            }
        } finally {
            if (this._lock.isHeldByCurrentThread()) {
                this._lock.unlock();
            }
        }
    }

    private String debugIdentity() {
        return this.id;
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public void processAsync(Executor executor) {
        if (_log.isDebugEnabled()) {
            _log.debug(debugIdentity() + "Processing Async." + currentStatus());
        }
        synchronized (this._asyncDelivery) {
            if (hasQueuedMessages() && this._subscriptions.hasActiveSubscribers() && this._processing.compareAndSet(false, true)) {
                if (_log.isDebugEnabled()) {
                    _log.debug(debugIdentity() + "Executing Async process.");
                }
                executor.execute(this._asyncDelivery);
            }
        }
    }

    private String currentStatus() {
        return " Queued:" + (this._messages.isEmpty() ? "Empty " : "Contains(H:M)") + "(" + this._messages.headSize() + ":" + (this._messages.size() - this._messages.headSize()) + ")  Extra: " + (this._hasContent.isEmpty() ? "Empty " : "Contains") + "(" + this._hasContent.size() + ":" + this._extraMessages.get() + ")  Active:" + this._subscriptions.hasActiveSubscribers() + " Processing:" + (this._processing.get() ? " true : Processing Thread: " + this._processingThreadName : " false");
    }

    static {
        $assertionsDisabled = !ConcurrentSelectorDeliveryManager.class.desiredAssertionStatus();
        _log = Logger.getLogger(ConcurrentSelectorDeliveryManager.class);
    }
}
