/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDispatcherMultipleConsumers {
    private final boolean allowOutOfOrderDelivery;
    private final StickyKeyConsumerSelector selector;
    private final boolean recentlyJoinedConsumerTrackingRequired;
    private boolean skipNextReplayToTriggerLookAhead = false;
    private final KeySharedMode keySharedMode;
    private final LinkedHashMap<Consumer, Position> recentlyJoinedConsumers;
    @Nullable
    private Position lastSentPosition;
    private final LongPairRangeSet<Position> individuallySentPositions;
    private static final LongPairRangeSet.LongPairConsumer<Position> positionRangeConverter = PositionFactory::create;
    private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);

    PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription, ServiceConfiguration conf, KeySharedMeta ksm) {
        super(topic, cursor, subscription, ksm.isAllowOutOfOrderDelivery());
        this.allowOutOfOrderDelivery = ksm.isAllowOutOfOrderDelivery();
        this.keySharedMode = ksm.getKeySharedMode();
        this.recentlyJoinedConsumerTrackingRequired = this.keySharedMode == KeySharedMode.AUTO_SPLIT && !this.allowOutOfOrderDelivery;
        this.recentlyJoinedConsumers = this.recentlyJoinedConsumerTrackingRequired ? new LinkedHashMap() : null;
        this.individuallySentPositions = this.recentlyJoinedConsumerTrackingRequired ? new ConcurrentOpenLongPairRangeSet(4096, positionRangeConverter) : null;
        switch (this.keySharedMode) {
            case AUTO_SPLIT: {
                if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
                    this.selector = new ConsistentHashingStickyKeyConsumerSelector(conf.getSubscriptionKeySharedConsistentHashingReplicaPoints());
                    break;
                }
                this.selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
                break;
            }
            case STICKY: {
                this.selector = new HashRangeExclusiveStickyKeyConsumerSelector();
                break;
            }
            default: {
                throw new IllegalArgumentException("Invalid key-shared mode: " + this.keySharedMode);
            }
        }
    }

    @VisibleForTesting
    public StickyKeyConsumerSelector getSelector() {
        return this.selector;
    }

    @Override
    public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) {
        if (IS_CLOSED_UPDATER.get(this) == 1) {
            log.warn("[{}] Dispatcher is already closed. Closing consumer {}", (Object)this.name, (Object)consumer);
            consumer.disconnect();
            return CompletableFuture.completedFuture(null);
        }
        return ((CompletableFuture)super.addConsumer(consumer).thenCompose(__ -> this.selector.addConsumer(consumer).handle((result, ex) -> {
            if (ex != null) {
                PersistentStickyKeyDispatcherMultipleConsumers persistentStickyKeyDispatcherMultipleConsumers = this;
                synchronized (persistentStickyKeyDispatcherMultipleConsumers) {
                    this.consumerSet.removeAll((Object)consumer);
                    this.consumerList.remove(consumer);
                }
                throw FutureUtil.wrapToCompletionException((Throwable)ex);
            }
            return result;
        }))).thenRun(() -> {
            PersistentStickyKeyDispatcherMultipleConsumers persistentStickyKeyDispatcherMultipleConsumers = this;
            synchronized (persistentStickyKeyDispatcherMultipleConsumers) {
                Position lastSentPositionWhenJoining;
                if (this.recentlyJoinedConsumerTrackingRequired && (lastSentPositionWhenJoining = this.updateIfNeededAndGetLastSentPosition()) != null) {
                    consumer.setLastSentPositionWhenJoining(lastSentPositionWhenJoining);
                    if (this.recentlyJoinedConsumers != null && this.consumerList.size() > 1 && this.cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1L) {
                        this.recentlyJoinedConsumers.put(consumer, lastSentPositionWhenJoining);
                    }
                }
            }
        });
    }

    @Override
    public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
        this.selector.removeConsumer(consumer);
        super.removeConsumer(consumer);
        if (this.recentlyJoinedConsumerTrackingRequired) {
            this.recentlyJoinedConsumers.remove(consumer);
            if (this.consumerList.size() == 1) {
                this.recentlyJoinedConsumers.clear();
            } else if (this.consumerList.isEmpty()) {
                this.lastSentPosition = null;
                this.individuallySentPositions.clear();
            }
            if (this.removeConsumersFromRecentJoinedConsumers() || !this.redeliveryMessages.isEmpty()) {
                this.readMoreEntries();
            }
        }
    }

    @Override
    protected synchronized boolean trySendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType readType, List<Entry> entries) {
        Optional<Position> firstReplayPosition;
        this.lastNumberOfEntriesProcessed = 0;
        long totalMessagesSent = 0L;
        long totalBytesSent = 0L;
        long totalEntries = 0L;
        long totalEntriesProcessed = 0L;
        int entriesCount = entries.size();
        if (entriesCount == 0) {
            return true;
        }
        if (this.consumerSet.isEmpty()) {
            entries.forEach(Entry::release);
            this.cursor.rewind();
            return false;
        }
        if (!this.allowOutOfOrderDelivery && (firstReplayPosition = this.getFirstPositionInReplay()).isPresent()) {
            Position replayPosition = firstReplayPosition.get();
            if (this.minReplayedPosition != null && replayPosition.compareTo(this.minReplayedPosition) < 0) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Position {} (<{}) is inserted for relay during current {} read, discard this read and retry with readMoreEntries.", new Object[]{this.name, replayPosition, this.minReplayedPosition, readType});
                }
                if (readType == PersistentDispatcherMultipleConsumers.ReadType.Normal) {
                    entries.forEach(entry -> {
                        long stickyKeyHash = this.getStickyKeyHash((Entry)entry);
                        this.addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
                        entry.release();
                    });
                } else if (readType == PersistentDispatcherMultipleConsumers.ReadType.Replay) {
                    entries.forEach(Entry::release);
                }
                this.skipNextBackoff = true;
                return true;
            }
        }
        if (this.recentlyJoinedConsumerTrackingRequired) {
            this.updateIfNeededAndGetLastSentPosition();
            if (this.lastSentPosition == null && this.cursor.getMarkDeletedPosition() != null) {
                this.lastSentPosition = ((ManagedCursorImpl)this.cursor).processIndividuallyDeletedMessagesAndGetMarkDeletedPosition(range -> {
                    Position lower = (Position)range.lowerEndpoint();
                    Position upper = (Position)range.upperEndpoint();
                    this.individuallySentPositions.addOpenClosed(lower.getLedgerId(), lower.getEntryId(), upper.getLedgerId(), upper.getEntryId());
                    return true;
                });
            }
        }
        MutableBoolean triggerLookAhead = new MutableBoolean();
        Map<Consumer, List<Entry>> entriesByConsumerForDispatching = this.filterAndGroupEntriesForDispatching(entries, readType, triggerLookAhead);
        AtomicInteger remainingConsumersToFinishSending = new AtomicInteger(entriesByConsumerForDispatching.size());
        for (Map.Entry<Consumer, List<Entry>> current : entriesByConsumerForDispatching.entrySet()) {
            Consumer consumer = current.getKey();
            List<Entry> entriesForConsumer = current.getValue();
            if (log.isDebugEnabled()) {
                log.debug("[{}] select consumer {} with messages num {}, read type is {}", new Object[]{this.name, consumer.consumerName(), entriesForConsumer.size(), readType});
            }
            ManagedLedger managedLedger = this.cursor.getManagedLedger();
            for (Entry entry2 : entriesForConsumer) {
                if (readType == PersistentDispatcherMultipleConsumers.ReadType.Replay) {
                    this.redeliveryMessages.remove(entry2.getLedgerId(), entry2.getEntryId());
                }
                if (!this.recentlyJoinedConsumerTrackingRequired) continue;
                Position position = entry2.getPosition();
                if (this.lastSentPosition != null && position.compareTo(this.lastSentPosition) <= 0 || this.individuallySentPositions.contains(position.getLedgerId(), position.getEntryId())) continue;
                Position previousPosition = managedLedger.getPreviousPosition(position);
                this.individuallySentPositions.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(), position.getLedgerId(), position.getEntryId());
            }
            SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
            EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForConsumer.size());
            EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForConsumer.size());
            totalEntries += (long)this.filterEntriesForConsumer(entriesForConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, this.cursor, readType == PersistentDispatcherMultipleConsumers.ReadType.Replay, consumer);
            totalEntriesProcessed += (long)entriesForConsumer.size();
            consumer.sendMessages(entriesForConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), this.getRedeliveryTracker()).addListener(future -> {
                if (future.isDone() && remainingConsumersToFinishSending.decrementAndGet() == 0) {
                    this.readMoreEntries();
                }
            });
            TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this, -(sendMessageInfo.getTotalMessages() - batchIndexesAcks.getTotalAckedIndexCount()));
            totalMessagesSent += (long)sendMessageInfo.getTotalMessages();
            totalBytesSent += sendMessageInfo.getTotalBytes();
        }
        if (this.recentlyJoinedConsumerTrackingRequired && this.lastSentPosition != null) {
            ManagedLedger managedLedger = this.cursor.getManagedLedger();
            com.google.common.collect.Range range2 = this.individuallySentPositions.firstRange();
            if (range2 != null && ((Position)range2.upperEndpoint()).compareTo(this.lastSentPosition) <= 0) {
                this.individuallySentPositions.removeAtMost(this.lastSentPosition.getLedgerId(), this.lastSentPosition.getEntryId());
                range2 = this.individuallySentPositions.firstRange();
            }
            if (range2 != null && (((Position)range2.lowerEndpoint()).compareTo(this.lastSentPosition) <= 0 || managedLedger.getNumberOfEntries(com.google.common.collect.Range.openClosed((Comparable)this.lastSentPosition, (Comparable)((Position)range2.lowerEndpoint()))) <= 0L)) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Found a position range to last sent: {}", (Object)this.name, (Object)range2);
                }
                Position newLastSentPosition = (Position)range2.upperEndpoint();
                Position positionAfterNewLastSent = managedLedger.getNextValidPosition(newLastSentPosition);
                Position lastConfirmedEntrySnapshot = managedLedger.getLastConfirmedEntry();
                if (lastConfirmedEntrySnapshot != null) {
                    while (positionAfterNewLastSent.compareTo(lastConfirmedEntrySnapshot) <= 0 && this.individuallySentPositions.contains(positionAfterNewLastSent.getLedgerId(), positionAfterNewLastSent.getEntryId())) {
                        range2 = this.individuallySentPositions.rangeContaining(positionAfterNewLastSent.getLedgerId(), positionAfterNewLastSent.getEntryId());
                        newLastSentPosition = (Position)range2.upperEndpoint();
                        positionAfterNewLastSent = managedLedger.getNextValidPosition(newLastSentPosition);
                    }
                }
                if (this.lastSentPosition.compareTo(newLastSentPosition) < 0) {
                    this.lastSentPosition = newLastSentPosition;
                }
                this.individuallySentPositions.removeAtMost(this.lastSentPosition.getLedgerId(), this.lastSentPosition.getEntryId());
            }
        }
        this.lastNumberOfEntriesProcessed = (int)totalEntriesProcessed;
        this.acquirePermitsForDeliveredMessages(this.topic, this.cursor, totalEntries, totalMessagesSent, totalBytesSent);
        if (triggerLookAhead.booleanValue()) {
            this.skipNextReplayToTriggerLookAhead = true;
            this.skipNextBackoff = true;
            return true;
        }
        return totalEntries == 0L;
    }

    private boolean isReplayQueueSizeBelowLimit() {
        return this.redeliveryMessages.size() < this.getEffectiveLookAheadLimit();
    }

    private int getEffectiveLookAheadLimit() {
        return PersistentStickyKeyDispatcherMultipleConsumers.getEffectiveLookAheadLimit(this.serviceConfig, this.consumerList.size());
    }

    static int getEffectiveLookAheadLimit(ServiceConfiguration serviceConfig, int consumerCount) {
        int effectiveLimit;
        int perConsumerLimit = serviceConfig.getKeySharedLookAheadMsgInReplayThresholdPerConsumer();
        int perSubscriptionLimit = serviceConfig.getKeySharedLookAheadMsgInReplayThresholdPerSubscription();
        if (perConsumerLimit <= 0) {
            effectiveLimit = perSubscriptionLimit;
        } else {
            effectiveLimit = perConsumerLimit * consumerCount;
            if (perSubscriptionLimit > 0 && perSubscriptionLimit < effectiveLimit) {
                effectiveLimit = perSubscriptionLimit;
            }
        }
        if (effectiveLimit <= 0) {
            int maxUnackedMessagesByConsumers;
            int maxUnackedMessagesPerSubscription = serviceConfig.getMaxUnackedMessagesPerSubscription();
            if (maxUnackedMessagesPerSubscription <= 0) {
                maxUnackedMessagesPerSubscription = Integer.MAX_VALUE;
            }
            if ((maxUnackedMessagesByConsumers = consumerCount * serviceConfig.getMaxUnackedMessagesPerConsumer()) <= 0) {
                maxUnackedMessagesByConsumers = Integer.MAX_VALUE;
            }
            effectiveLimit = Math.min(maxUnackedMessagesPerSubscription, maxUnackedMessagesByConsumers);
        }
        return effectiveLimit;
    }

    private Map<Consumer, List<Entry>> filterAndGroupEntriesForDispatching(List<Entry> entries, PersistentDispatcherMultipleConsumers.ReadType readType, MutableBoolean triggerLookAhead) {
        HashMap<Consumer, List<Entry>> entriesGroupedByConsumer = new HashMap<Consumer, List<Entry>>();
        HashMap<Consumer, MutableInt> permitsForConsumer = new HashMap<Consumer, MutableInt>();
        boolean hasRecentlyJoinedConsumers = this.hasRecentlyJoinedConsumers();
        HashMap<Consumer, Position> maxLastSentPositionCache = hasRecentlyJoinedConsumers ? new HashMap<Consumer, Position>() : null;
        boolean lookAheadAllowed = this.isReplayQueueSizeBelowLimit();
        HashSet<Consumer> blockedByHashConsumers = lookAheadAllowed && readType == PersistentDispatcherMultipleConsumers.ReadType.Normal ? new HashSet<Consumer>() : null;
        HashSet<Consumer> consumersForEntriesForLookaheadCheck = lookAheadAllowed ? new HashSet<Consumer>() : null;
        for (Entry entry : entries) {
            int stickyKeyHash = this.getStickyKeyHash(entry);
            Consumer consumer = this.selector.select(stickyKeyHash);
            MutableBoolean blockedByHash = null;
            boolean dispatchEntry = false;
            if (consumer != null) {
                if (lookAheadAllowed) {
                    consumersForEntriesForLookaheadCheck.add(consumer);
                }
                Position maxLastSentPosition = hasRecentlyJoinedConsumers ? maxLastSentPositionCache.computeIfAbsent(consumer, __ -> this.resolveMaxLastSentPositionForRecentlyJoinedConsumer(consumer, readType)) : null;
                blockedByHash = lookAheadAllowed && readType == PersistentDispatcherMultipleConsumers.ReadType.Normal ? new MutableBoolean(false) : null;
                MutableInt permits = permitsForConsumer.computeIfAbsent(consumer, k -> new MutableInt(this.getAvailablePermits(consumer)));
                if (permits.intValue() > 0 && this.canDispatchEntry(entry, readType, stickyKeyHash, maxLastSentPosition, blockedByHash)) {
                    permits.decrement();
                    dispatchEntry = true;
                }
            }
            if (dispatchEntry) {
                List consumerEntries = entriesGroupedByConsumer.computeIfAbsent(consumer, k -> new ArrayList());
                consumerEntries.add(entry);
                continue;
            }
            if (blockedByHash != null && blockedByHash.isTrue()) {
                blockedByHashConsumers.add(consumer);
            }
            this.addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
            entry.release();
        }
        if (lookAheadAllowed && entriesGroupedByConsumer.isEmpty()) {
            if (readType == PersistentDispatcherMultipleConsumers.ReadType.Normal) {
                for (Consumer consumer : blockedByHashConsumers) {
                    if (entriesGroupedByConsumer.containsKey(consumer) || ((MutableInt)permitsForConsumer.get(consumer)).intValue() <= 0) continue;
                    triggerLookAhead.setTrue();
                    break;
                }
            }
            if (!triggerLookAhead.booleanValue()) {
                for (Consumer consumer : this.getConsumers()) {
                    if (consumersForEntriesForLookaheadCheck.contains(consumer) || this.getAvailablePermits(consumer) <= 0) continue;
                    triggerLookAhead.setTrue();
                    break;
                }
            }
        }
        return entriesGroupedByConsumer;
    }

    private boolean canDispatchEntry(Entry entry, PersistentDispatcherMultipleConsumers.ReadType readType, int stickyKeyHash, Position maxLastSentPosition, MutableBoolean blockedByHash) {
        if (maxLastSentPosition != null && entry.getPosition().compareTo(maxLastSentPosition) > 0) {
            return false;
        }
        if (readType == PersistentDispatcherMultipleConsumers.ReadType.Normal && this.redeliveryMessages.containsStickyKeyHash(stickyKeyHash)) {
            if (blockedByHash != null) {
                blockedByHash.setTrue();
            }
            return false;
        }
        return true;
    }

    @Override
    protected Predicate<Position> createFilterForReplay() {
        return new ReplayPositionFilter();
    }

    private Position resolveMaxLastSentPositionForRecentlyJoinedConsumer(Consumer consumer, PersistentDispatcherMultipleConsumers.ReadType readType) {
        Position minLastSentPositionForRecentJoinedConsumer;
        if (this.recentlyJoinedConsumers == null) {
            return null;
        }
        this.removeConsumersFromRecentJoinedConsumers();
        Position maxLastSentPosition = this.recentlyJoinedConsumers.get(consumer);
        if (maxLastSentPosition == null) {
            return null;
        }
        if (readType == PersistentDispatcherMultipleConsumers.ReadType.Replay && (minLastSentPositionForRecentJoinedConsumer = this.recentlyJoinedConsumers.values().iterator().next()) != null && minLastSentPositionForRecentJoinedConsumer.compareTo(maxLastSentPosition) < 0) {
            maxLastSentPosition = minLastSentPositionForRecentJoinedConsumer;
        }
        return maxLastSentPosition;
    }

    @Override
    public void markDeletePositionMoveForward() {
        this.topic.getBrokerService().getTopicOrderedExecutor().execute(() -> {
            PersistentStickyKeyDispatcherMultipleConsumers persistentStickyKeyDispatcherMultipleConsumers = this;
            synchronized (persistentStickyKeyDispatcherMultipleConsumers) {
                if (this.hasRecentlyJoinedConsumers() && this.removeConsumersFromRecentJoinedConsumers()) {
                    this.readMoreEntries();
                }
            }
        });
    }

    private boolean hasRecentlyJoinedConsumers() {
        return !MapUtils.isEmpty(this.recentlyJoinedConsumers);
    }

    private boolean removeConsumersFromRecentJoinedConsumers() {
        if (MapUtils.isEmpty(this.recentlyJoinedConsumers)) {
            return false;
        }
        Iterator<Map.Entry<Consumer, Position>> itr = this.recentlyJoinedConsumers.entrySet().iterator();
        boolean hasConsumerRemovedFromTheRecentJoinedConsumers = false;
        Position mdp = this.cursor.getMarkDeletedPosition();
        if (mdp != null) {
            Map.Entry<Consumer, Position> entry;
            while (itr.hasNext() && (entry = itr.next()).getValue().compareTo(mdp) <= 0) {
                itr.remove();
                hasConsumerRemovedFromTheRecentJoinedConsumers = true;
            }
        }
        return hasConsumerRemovedFromTheRecentJoinedConsumers;
    }

    @Nullable
    private synchronized Position updateIfNeededAndGetLastSentPosition() {
        if (this.lastSentPosition == null) {
            return null;
        }
        Position mdp = this.cursor.getMarkDeletedPosition();
        if (mdp != null && mdp.compareTo(this.lastSentPosition) > 0) {
            this.lastSentPosition = mdp;
        }
        return this.lastSentPosition;
    }

    @Override
    protected synchronized boolean canReplayMessages() {
        if (this.skipNextReplayToTriggerLookAhead) {
            this.skipNextReplayToTriggerLookAhead = false;
            return false;
        }
        return true;
    }

    private int getAvailablePermits(Consumer c) {
        if (!c.cnx().isActive()) {
            return 0;
        }
        int availablePermits = Math.max(c.getAvailablePermits(), 0);
        if (availablePermits > 0 && c.getMaxUnackedMessages() > 0) {
            int maxAdditionalUnackedMessages = Math.max(c.getMaxUnackedMessages() - c.getUnackedMessages(), 0);
            if (maxAdditionalUnackedMessages == 0) {
                return 0;
            }
            int avgMessagesPerEntry = Math.max(c.getAvgMessagesPerEntry(), 1);
            int estimatedRemainingPermits = (maxAdditionalUnackedMessages + avgMessagesPerEntry - 1) / avgMessagesPerEntry;
            return Math.min(availablePermits, estimatedRemainingPermits);
        }
        return availablePermits;
    }

    @Override
    protected boolean doesntHavePendingRead() {
        return !this.havePendingRead && !this.havePendingReplayRead;
    }

    @Override
    protected boolean isNormalReadAllowed() {
        if (!this.isReplayQueueSizeBelowLimit()) {
            return false;
        }
        for (Consumer consumer : this.consumerList) {
            if (consumer == null || consumer.isBlocked() || this.getAvailablePermits(consumer) <= 0) continue;
            return true;
        }
        return false;
    }

    @Override
    protected int getMaxEntriesReadLimit() {
        return Math.max(this.getEffectiveLookAheadLimit() - this.redeliveryMessages.size(), 1);
    }

    @Override
    protected void handleNormalReadNotAllowed() {
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] Skipping read for the topic since normal read isn't allowed. Rescheduling a read with a backoff.", (Object)this.topic.getName(), (Object)this.getSubscriptionName());
        }
        this.reScheduleReadWithBackoff();
    }

    @Override
    public CommandSubscribe.SubType getType() {
        return CommandSubscribe.SubType.Key_Shared;
    }

    @Override
    protected Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions) {
        return this.cursor.asyncReplayEntries(positions, (AsyncCallbacks.ReadEntriesCallback)this, (Object)PersistentDispatcherMultipleConsumers.ReadType.Replay, true);
    }

    public KeySharedMode getKeySharedMode() {
        return this.keySharedMode;
    }

    public boolean isAllowOutOfOrderDelivery() {
        return this.allowOutOfOrderDelivery;
    }

    public boolean hasSameKeySharedPolicy(KeySharedMeta ksm) {
        return ksm.getKeySharedMode() == this.keySharedMode && ksm.isAllowOutOfOrderDelivery() == this.allowOutOfOrderDelivery;
    }

    public LinkedHashMap<Consumer, Position> getRecentlyJoinedConsumers() {
        return this.recentlyJoinedConsumers;
    }

    public synchronized String getLastSentPosition() {
        if (this.lastSentPosition == null) {
            return null;
        }
        return this.lastSentPosition.toString();
    }

    @VisibleForTesting
    public Position getLastSentPositionField() {
        return this.lastSentPosition;
    }

    public synchronized String getIndividuallySentPositions() {
        if (this.individuallySentPositions == null) {
            return null;
        }
        return this.individuallySentPositions.toString();
    }

    @VisibleForTesting
    public LongPairRangeSet<Position> getIndividuallySentPositionsField() {
        return this.individuallySentPositions;
    }

    public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
        return this.selector.getConsumerKeyHashRanges();
    }

    private class ReplayPositionFilter
    implements Predicate<Position> {
        private final Map<Consumer, MutableInt> availablePermitsMap = new HashMap<Consumer, MutableInt>();
        private final Map<Consumer, Position> maxLastSentPositionCache = PersistentStickyKeyDispatcherMultipleConsumers.this.hasRecentlyJoinedConsumers() ? new HashMap() : null;

        private ReplayPositionFilter() {
        }

        @Override
        public boolean test(Position position) {
            Position maxLastSentPosition;
            if (PersistentStickyKeyDispatcherMultipleConsumers.this.isAllowOutOfOrderDelivery()) {
                return true;
            }
            Long stickyKeyHash = PersistentStickyKeyDispatcherMultipleConsumers.this.redeliveryMessages.getHash(position.getLedgerId(), position.getEntryId());
            if (stickyKeyHash == null) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] replay of entry at position {} doesn't contain sticky key hash.", (Object)PersistentStickyKeyDispatcherMultipleConsumers.this.name, (Object)position);
                }
                return true;
            }
            Consumer consumer = PersistentStickyKeyDispatcherMultipleConsumers.this.selector.select(stickyKeyHash.intValue());
            if (consumer == null) {
                return false;
            }
            MutableInt availablePermits = this.availablePermitsMap.computeIfAbsent(consumer, k -> new MutableInt(PersistentStickyKeyDispatcherMultipleConsumers.this.getAvailablePermits(consumer)));
            if (availablePermits.intValue() <= 0) {
                return false;
            }
            Position position2 = maxLastSentPosition = this.maxLastSentPositionCache != null ? this.maxLastSentPositionCache.computeIfAbsent(consumer, __ -> PersistentStickyKeyDispatcherMultipleConsumers.this.resolveMaxLastSentPositionForRecentlyJoinedConsumer(consumer, PersistentDispatcherMultipleConsumers.ReadType.Replay)) : null;
            if (maxLastSentPosition != null && position.compareTo(maxLastSentPosition) > 0) {
                return false;
            }
            availablePermits.decrement();
            return true;
        }
    }
}

