package org.apache.pulsar.broker.service.persistent;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Range;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.class */
public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDispatcherMultipleConsumers {
    private final boolean allowOutOfOrderDelivery;
    private final StickyKeyConsumerSelector selector;
    private final boolean recentlyJoinedConsumerTrackingRequired;
    private boolean skipNextReplayToTriggerLookAhead;
    private final KeySharedMode keySharedMode;
    private final LinkedHashMap<Consumer, Position> recentlyJoinedConsumers;

    @Nullable
    private Position lastSentPosition;
    private final LongPairRangeSet<Position> individuallySentPositions;
    private static final LongPairRangeSet.LongPairConsumer<Position> positionRangeConverter = PositionFactory::create;
    private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);

    /* renamed from: org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$pulsar$common$api$proto$KeySharedMode = new int[KeySharedMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$pulsar$common$api$proto$KeySharedMode[KeySharedMode.AUTO_SPLIT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$api$proto$KeySharedMode[KeySharedMode.STICKY.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers$ReplayPositionFilter.class */
    private class ReplayPositionFilter implements Predicate<Position> {
        private final Map<Consumer, MutableInt> availablePermitsMap = new HashMap();
        private final Map<Consumer, Position> maxLastSentPositionCache;

        private ReplayPositionFilter() {
            this.maxLastSentPositionCache = PersistentStickyKeyDispatcherMultipleConsumers.this.hasRecentlyJoinedConsumers() ? new HashMap() : null;
        }

        @Override // java.util.function.Predicate
        public boolean test(Position position) {
            if (PersistentStickyKeyDispatcherMultipleConsumers.this.isAllowOutOfOrderDelivery()) {
                return true;
            }
            Long hash = PersistentStickyKeyDispatcherMultipleConsumers.this.redeliveryMessages.getHash(position.getLedgerId(), position.getEntryId());
            if (hash == null) {
                if (!PersistentStickyKeyDispatcherMultipleConsumers.log.isDebugEnabled()) {
                    return true;
                }
                PersistentStickyKeyDispatcherMultipleConsumers.log.debug("[{}] replay of entry at position {} doesn't contain sticky key hash.", PersistentStickyKeyDispatcherMultipleConsumers.this.name, position);
                return true;
            }
            Consumer select = PersistentStickyKeyDispatcherMultipleConsumers.this.selector.select(hash.intValue());
            if (select == null) {
                return false;
            }
            MutableInt computeIfAbsent = this.availablePermitsMap.computeIfAbsent(select, consumer -> {
                return new MutableInt(PersistentStickyKeyDispatcherMultipleConsumers.this.getAvailablePermits(select));
            });
            if (computeIfAbsent.intValue() <= 0) {
                return false;
            }
            Position computeIfAbsent2 = this.maxLastSentPositionCache != null ? this.maxLastSentPositionCache.computeIfAbsent(select, consumer2 -> {
                return PersistentStickyKeyDispatcherMultipleConsumers.this.resolveMaxLastSentPositionForRecentlyJoinedConsumer(select, PersistentDispatcherMultipleConsumers.ReadType.Replay);
            }) : null;
            if (computeIfAbsent2 != null && position.compareTo(computeIfAbsent2) > 0) {
                return false;
            }
            computeIfAbsent.decrement();
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic persistentTopic, ManagedCursor managedCursor, Subscription subscription, ServiceConfiguration serviceConfiguration, KeySharedMeta keySharedMeta) {
        super(persistentTopic, managedCursor, subscription, keySharedMeta.isAllowOutOfOrderDelivery());
        this.skipNextReplayToTriggerLookAhead = false;
        this.allowOutOfOrderDelivery = keySharedMeta.isAllowOutOfOrderDelivery();
        this.keySharedMode = keySharedMeta.getKeySharedMode();
        this.recentlyJoinedConsumerTrackingRequired = this.keySharedMode == KeySharedMode.AUTO_SPLIT && !this.allowOutOfOrderDelivery;
        this.recentlyJoinedConsumers = this.recentlyJoinedConsumerTrackingRequired ? new LinkedHashMap<>() : null;
        this.individuallySentPositions = this.recentlyJoinedConsumerTrackingRequired ? new ConcurrentOpenLongPairRangeSet(4096, positionRangeConverter) : null;
        switch (AnonymousClass1.$SwitchMap$org$apache$pulsar$common$api$proto$KeySharedMode[this.keySharedMode.ordinal()]) {
            case 1:
                if (serviceConfiguration.isSubscriptionKeySharedUseConsistentHashing()) {
                    this.selector = new ConsistentHashingStickyKeyConsumerSelector(serviceConfiguration.getSubscriptionKeySharedConsistentHashingReplicaPoints());
                    return;
                } else {
                    this.selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
                    return;
                }
            case 2:
                this.selector = new HashRangeExclusiveStickyKeyConsumerSelector();
                return;
            default:
                throw new IllegalArgumentException("Invalid key-shared mode: " + this.keySharedMode);
        }
    }

    @VisibleForTesting
    public StickyKeyConsumerSelector getSelector() {
        return this.selector;
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) {
        if (IS_CLOSED_UPDATER.get(this) != 1) {
            return super.addConsumer(consumer).thenCompose(r6 -> {
                return this.selector.addConsumer(consumer).handle((r5, th) -> {
                    if (th == null) {
                        return r5;
                    }
                    synchronized (this) {
                        this.consumerSet.removeAll(consumer);
                        this.consumerList.remove(consumer);
                    }
                    throw FutureUtil.wrapToCompletionException(th);
                });
            }).thenRun(() -> {
                Position updateIfNeededAndGetLastSentPosition;
                synchronized (this) {
                    if (this.recentlyJoinedConsumerTrackingRequired && (updateIfNeededAndGetLastSentPosition = updateIfNeededAndGetLastSentPosition()) != null) {
                        consumer.setLastSentPositionWhenJoining(updateIfNeededAndGetLastSentPosition);
                        if (this.recentlyJoinedConsumers != null && this.consumerList.size() > 1 && this.cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
                            this.recentlyJoinedConsumers.put(consumer, updateIfNeededAndGetLastSentPosition);
                        }
                    }
                }
            });
        }
        log.warn("[{}] Dispatcher is already closed. Closing consumer {}", this.name, consumer);
        consumer.disconnect();
        return CompletableFuture.completedFuture(null);
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
        this.selector.removeConsumer(consumer);
        super.removeConsumer(consumer);
        if (this.recentlyJoinedConsumerTrackingRequired) {
            this.recentlyJoinedConsumers.remove(consumer);
            if (this.consumerList.size() == 1) {
                this.recentlyJoinedConsumers.clear();
            } else if (this.consumerList.isEmpty()) {
                this.lastSentPosition = null;
                this.individuallySentPositions.clear();
            }
            if (removeConsumersFromRecentJoinedConsumers() || !this.redeliveryMessages.isEmpty()) {
                readMoreEntries();
            }
        }
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
    protected synchronized boolean trySendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType readType, List<Entry> list) {
        this.lastNumberOfEntriesProcessed = 0;
        long j = 0;
        long j2 = 0;
        long j3 = 0;
        long j4 = 0;
        if (list.size() == 0) {
            return true;
        }
        if (this.consumerSet.isEmpty()) {
            list.forEach((v0) -> {
                v0.release();
            });
            this.cursor.rewind();
            return false;
        }
        if (!this.allowOutOfOrderDelivery) {
            Optional<Position> firstPositionInReplay = getFirstPositionInReplay();
            if (firstPositionInReplay.isPresent()) {
                Position position = firstPositionInReplay.get();
                if (this.minReplayedPosition != null && position.compareTo(this.minReplayedPosition) < 0) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Position {} (<{}) is inserted for relay during current {} read, discard this read and retry with readMoreEntries.", new Object[]{this.name, position, this.minReplayedPosition, readType});
                    }
                    if (readType == PersistentDispatcherMultipleConsumers.ReadType.Normal) {
                        list.forEach(entry -> {
                            addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), getStickyKeyHash(entry));
                            entry.release();
                        });
                    } else if (readType == PersistentDispatcherMultipleConsumers.ReadType.Replay) {
                        list.forEach((v0) -> {
                            v0.release();
                        });
                    }
                    this.skipNextBackoff = true;
                    return true;
                }
            }
        }
        if (this.recentlyJoinedConsumerTrackingRequired) {
            updateIfNeededAndGetLastSentPosition();
            if (this.lastSentPosition == null && this.cursor.getMarkDeletedPosition() != null) {
                this.lastSentPosition = this.cursor.processIndividuallyDeletedMessagesAndGetMarkDeletedPosition(range -> {
                    Position lowerEndpoint = range.lowerEndpoint();
                    Position upperEndpoint = range.upperEndpoint();
                    this.individuallySentPositions.addOpenClosed(lowerEndpoint.getLedgerId(), lowerEndpoint.getEntryId(), upperEndpoint.getLedgerId(), upperEndpoint.getEntryId());
                    return true;
                });
            }
        }
        MutableBoolean mutableBoolean = new MutableBoolean();
        Map<Consumer, List<Entry>> filterAndGroupEntriesForDispatching = filterAndGroupEntriesForDispatching(list, readType, mutableBoolean);
        AtomicInteger atomicInteger = new AtomicInteger(filterAndGroupEntriesForDispatching.size());
        for (Map.Entry<Consumer, List<Entry>> entry2 : filterAndGroupEntriesForDispatching.entrySet()) {
            Consumer key = entry2.getKey();
            List<Entry> value = entry2.getValue();
            if (log.isDebugEnabled()) {
                log.debug("[{}] select consumer {} with messages num {}, read type is {}", new Object[]{this.name, key.consumerName(), Integer.valueOf(value.size()), readType});
            }
            ManagedLedger managedLedger = this.cursor.getManagedLedger();
            for (Entry entry3 : value) {
                if (readType == PersistentDispatcherMultipleConsumers.ReadType.Replay) {
                    this.redeliveryMessages.remove(entry3.getLedgerId(), entry3.getEntryId());
                }
                if (this.recentlyJoinedConsumerTrackingRequired) {
                    Position position2 = entry3.getPosition();
                    if (this.lastSentPosition == null || position2.compareTo(this.lastSentPosition) > 0) {
                        if (!this.individuallySentPositions.contains(position2.getLedgerId(), position2.getEntryId())) {
                            Position previousPosition = managedLedger.getPreviousPosition(position2);
                            this.individuallySentPositions.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(), position2.getLedgerId(), position2.getEntryId());
                        }
                    }
                }
            }
            SendMessageInfo threadLocal = SendMessageInfo.getThreadLocal();
            EntryBatchSizes entryBatchSizes = EntryBatchSizes.get(value.size());
            EntryBatchIndexesAcks entryBatchIndexesAcks = EntryBatchIndexesAcks.get(value.size());
            j3 += filterEntriesForConsumer(value, entryBatchSizes, threadLocal, entryBatchIndexesAcks, this.cursor, readType == PersistentDispatcherMultipleConsumers.ReadType.Replay, key);
            j4 += value.size();
            key.sendMessages(value, entryBatchSizes, entryBatchIndexesAcks, threadLocal.getTotalMessages(), threadLocal.getTotalBytes(), threadLocal.getTotalChunkedMessages(), getRedeliveryTracker()).addListener(future -> {
                if (future.isDone() && atomicInteger.decrementAndGet() == 0) {
                    readMoreEntries();
                }
            });
            TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this, -(threadLocal.getTotalMessages() - entryBatchIndexesAcks.getTotalAckedIndexCount()));
            j += threadLocal.getTotalMessages();
            j2 += threadLocal.getTotalBytes();
        }
        if (this.recentlyJoinedConsumerTrackingRequired && this.lastSentPosition != null) {
            ManagedLedger managedLedger2 = this.cursor.getManagedLedger();
            Range firstRange = this.individuallySentPositions.firstRange();
            if (firstRange != null && firstRange.upperEndpoint().compareTo(this.lastSentPosition) <= 0) {
                this.individuallySentPositions.removeAtMost(this.lastSentPosition.getLedgerId(), this.lastSentPosition.getEntryId());
                firstRange = this.individuallySentPositions.firstRange();
            }
            if (firstRange != null && (firstRange.lowerEndpoint().compareTo(this.lastSentPosition) <= 0 || managedLedger2.getNumberOfEntries(Range.openClosed(this.lastSentPosition, firstRange.lowerEndpoint())) <= 0)) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Found a position range to last sent: {}", this.name, firstRange);
                }
                Position upperEndpoint = firstRange.upperEndpoint();
                Position nextValidPosition = managedLedger2.getNextValidPosition(upperEndpoint);
                Position lastConfirmedEntry = managedLedger2.getLastConfirmedEntry();
                if (lastConfirmedEntry != null) {
                    while (nextValidPosition.compareTo(lastConfirmedEntry) <= 0 && this.individuallySentPositions.contains(nextValidPosition.getLedgerId(), nextValidPosition.getEntryId())) {
                        upperEndpoint = (Position) this.individuallySentPositions.rangeContaining(nextValidPosition.getLedgerId(), nextValidPosition.getEntryId()).upperEndpoint();
                        nextValidPosition = managedLedger2.getNextValidPosition(upperEndpoint);
                    }
                }
                if (this.lastSentPosition.compareTo(upperEndpoint) < 0) {
                    this.lastSentPosition = upperEndpoint;
                }
                this.individuallySentPositions.removeAtMost(this.lastSentPosition.getLedgerId(), this.lastSentPosition.getEntryId());
            }
        }
        this.lastNumberOfEntriesProcessed = (int) j4;
        acquirePermitsForDeliveredMessages(this.topic, this.cursor, j3, j, j2);
        if (!mutableBoolean.booleanValue()) {
            return j3 == 0;
        }
        this.skipNextReplayToTriggerLookAhead = true;
        this.skipNextBackoff = true;
        return true;
    }

    private boolean isReplayQueueSizeBelowLimit() {
        return this.redeliveryMessages.size() < getEffectiveLookAheadLimit();
    }

    private int getEffectiveLookAheadLimit() {
        return getEffectiveLookAheadLimit(this.serviceConfig, this.consumerList.size());
    }

    static int getEffectiveLookAheadLimit(ServiceConfiguration serviceConfiguration, int i) {
        int i2;
        int keySharedLookAheadMsgInReplayThresholdPerConsumer = serviceConfiguration.getKeySharedLookAheadMsgInReplayThresholdPerConsumer();
        int keySharedLookAheadMsgInReplayThresholdPerSubscription = serviceConfiguration.getKeySharedLookAheadMsgInReplayThresholdPerSubscription();
        if (keySharedLookAheadMsgInReplayThresholdPerConsumer <= 0) {
            i2 = keySharedLookAheadMsgInReplayThresholdPerSubscription;
        } else {
            i2 = keySharedLookAheadMsgInReplayThresholdPerConsumer * i;
            if (keySharedLookAheadMsgInReplayThresholdPerSubscription > 0 && keySharedLookAheadMsgInReplayThresholdPerSubscription < i2) {
                i2 = keySharedLookAheadMsgInReplayThresholdPerSubscription;
            }
        }
        if (i2 <= 0) {
            int maxUnackedMessagesPerSubscription = serviceConfiguration.getMaxUnackedMessagesPerSubscription();
            if (maxUnackedMessagesPerSubscription <= 0) {
                maxUnackedMessagesPerSubscription = Integer.MAX_VALUE;
            }
            int maxUnackedMessagesPerConsumer = i * serviceConfiguration.getMaxUnackedMessagesPerConsumer();
            if (maxUnackedMessagesPerConsumer <= 0) {
                maxUnackedMessagesPerConsumer = Integer.MAX_VALUE;
            }
            i2 = Math.min(maxUnackedMessagesPerSubscription, maxUnackedMessagesPerConsumer);
        }
        return i2;
    }

    private Map<Consumer, List<Entry>> filterAndGroupEntriesForDispatching(List<Entry> list, PersistentDispatcherMultipleConsumers.ReadType readType, MutableBoolean mutableBoolean) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        boolean hasRecentlyJoinedConsumers = hasRecentlyJoinedConsumers();
        HashMap hashMap3 = hasRecentlyJoinedConsumers ? new HashMap() : null;
        boolean isReplayQueueSizeBelowLimit = isReplayQueueSizeBelowLimit();
        HashSet hashSet = (isReplayQueueSizeBelowLimit && readType == PersistentDispatcherMultipleConsumers.ReadType.Normal) ? new HashSet() : null;
        HashSet hashSet2 = isReplayQueueSizeBelowLimit ? new HashSet() : null;
        for (Entry entry : list) {
            int stickyKeyHash = getStickyKeyHash(entry);
            Consumer select = this.selector.select(stickyKeyHash);
            MutableBoolean mutableBoolean2 = null;
            boolean z = false;
            if (select != null) {
                if (isReplayQueueSizeBelowLimit) {
                    hashSet2.add(select);
                }
                Position position = hasRecentlyJoinedConsumers ? (Position) hashMap3.computeIfAbsent(select, consumer -> {
                    return resolveMaxLastSentPositionForRecentlyJoinedConsumer(select, readType);
                }) : null;
                mutableBoolean2 = (isReplayQueueSizeBelowLimit && readType == PersistentDispatcherMultipleConsumers.ReadType.Normal) ? new MutableBoolean(false) : null;
                MutableInt mutableInt = (MutableInt) hashMap2.computeIfAbsent(select, consumer2 -> {
                    return new MutableInt(getAvailablePermits(select));
                });
                if (mutableInt.intValue() > 0 && canDispatchEntry(entry, readType, stickyKeyHash, position, mutableBoolean2)) {
                    mutableInt.decrement();
                    z = true;
                }
            }
            if (z) {
                ((List) hashMap.computeIfAbsent(select, consumer3 -> {
                    return new ArrayList();
                })).add(entry);
            } else {
                if (mutableBoolean2 != null && mutableBoolean2.isTrue()) {
                    hashSet.add(select);
                }
                addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
                entry.release();
            }
        }
        if (isReplayQueueSizeBelowLimit && hashMap.isEmpty()) {
            if (readType == PersistentDispatcherMultipleConsumers.ReadType.Normal) {
                Iterator it = hashSet.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    Consumer consumer4 = (Consumer) it.next();
                    if (!hashMap.containsKey(consumer4) && ((MutableInt) hashMap2.get(consumer4)).intValue() > 0) {
                        mutableBoolean.setTrue();
                        break;
                    }
                }
            }
            if (!mutableBoolean.booleanValue()) {
                Iterator<Consumer> it2 = getConsumers().iterator();
                while (true) {
                    if (!it2.hasNext()) {
                        break;
                    }
                    Consumer next = it2.next();
                    if (!hashSet2.contains(next) && getAvailablePermits(next) > 0) {
                        mutableBoolean.setTrue();
                        break;
                    }
                }
            }
        }
        return hashMap;
    }

    private boolean canDispatchEntry(Entry entry, PersistentDispatcherMultipleConsumers.ReadType readType, int i, Position position, MutableBoolean mutableBoolean) {
        if (position != null && entry.getPosition().compareTo(position) > 0) {
            return false;
        }
        if (readType != PersistentDispatcherMultipleConsumers.ReadType.Normal || !this.redeliveryMessages.containsStickyKeyHash(i)) {
            return true;
        }
        if (mutableBoolean == null) {
            return false;
        }
        mutableBoolean.setTrue();
        return false;
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
    protected Predicate<Position> createFilterForReplay() {
        return new ReplayPositionFilter();
    }

    private Position resolveMaxLastSentPositionForRecentlyJoinedConsumer(Consumer consumer, PersistentDispatcherMultipleConsumers.ReadType readType) {
        Position next;
        if (this.recentlyJoinedConsumers == null) {
            return null;
        }
        removeConsumersFromRecentJoinedConsumers();
        Position position = this.recentlyJoinedConsumers.get(consumer);
        if (position == null) {
            return null;
        }
        if (readType == PersistentDispatcherMultipleConsumers.ReadType.Replay && (next = this.recentlyJoinedConsumers.values().iterator().next()) != null && next.compareTo(position) < 0) {
            position = next;
        }
        return position;
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public void markDeletePositionMoveForward() {
        this.topic.getBrokerService().getTopicOrderedExecutor().execute(() -> {
            synchronized (this) {
                if (hasRecentlyJoinedConsumers() && removeConsumersFromRecentJoinedConsumers()) {
                    readMoreEntries();
                }
            }
        });
    }

    private boolean hasRecentlyJoinedConsumers() {
        return !MapUtils.isEmpty(this.recentlyJoinedConsumers);
    }

    private boolean removeConsumersFromRecentJoinedConsumers() {
        if (MapUtils.isEmpty(this.recentlyJoinedConsumers)) {
            return false;
        }
        Iterator<Map.Entry<Consumer, Position>> it = this.recentlyJoinedConsumers.entrySet().iterator();
        boolean z = false;
        Position markDeletedPosition = this.cursor.getMarkDeletedPosition();
        if (markDeletedPosition != null) {
            while (it.hasNext() && it.next().getValue().compareTo(markDeletedPosition) <= 0) {
                it.remove();
                z = true;
            }
        }
        return z;
    }

    @Nullable
    private synchronized Position updateIfNeededAndGetLastSentPosition() {
        if (this.lastSentPosition == null) {
            return null;
        }
        Position markDeletedPosition = this.cursor.getMarkDeletedPosition();
        if (markDeletedPosition != null && markDeletedPosition.compareTo(this.lastSentPosition) > 0) {
            this.lastSentPosition = markDeletedPosition;
        }
        return this.lastSentPosition;
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
    protected synchronized boolean canReplayMessages() {
        if (!this.skipNextReplayToTriggerLookAhead) {
            return true;
        }
        this.skipNextReplayToTriggerLookAhead = false;
        return false;
    }

    private int getAvailablePermits(Consumer consumer) {
        if (!consumer.cnx().isActive()) {
            return 0;
        }
        int max = Math.max(consumer.getAvailablePermits(), 0);
        if (max <= 0 || consumer.getMaxUnackedMessages() <= 0) {
            return max;
        }
        int max2 = Math.max(consumer.getMaxUnackedMessages() - consumer.getUnackedMessages(), 0);
        if (max2 == 0) {
            return 0;
        }
        int max3 = Math.max(consumer.getAvgMessagesPerEntry(), 1);
        return Math.min(max, ((max2 + max3) - 1) / max3);
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
    protected boolean doesntHavePendingRead() {
        return (this.havePendingRead || this.havePendingReplayRead) ? false : true;
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
    protected boolean isNormalReadAllowed() {
        if (!isReplayQueueSizeBelowLimit()) {
            return false;
        }
        Iterator<Consumer> it = this.consumerList.iterator();
        while (it.hasNext()) {
            Consumer next = it.next();
            if (next != null && !next.isBlocked() && getAvailablePermits(next) > 0) {
                return true;
            }
        }
        return false;
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
    protected int getMaxEntriesReadLimit() {
        return Math.max(getEffectiveLookAheadLimit() - this.redeliveryMessages.size(), 1);
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
    protected void handleNormalReadNotAllowed() {
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] Skipping read for the topic since normal read isn't allowed. Rescheduling a read with a backoff.", this.topic.getName(), getSubscriptionName());
        }
        reScheduleReadWithBackoff();
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers, org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public CommandSubscribe.SubType getType() {
        return CommandSubscribe.SubType.Key_Shared;
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
    protected Set<? extends Position> asyncReplayEntries(Set<? extends Position> set) {
        return this.cursor.asyncReplayEntries(set, this, PersistentDispatcherMultipleConsumers.ReadType.Replay, true);
    }

    public KeySharedMode getKeySharedMode() {
        return this.keySharedMode;
    }

    public boolean isAllowOutOfOrderDelivery() {
        return this.allowOutOfOrderDelivery;
    }

    public boolean hasSameKeySharedPolicy(KeySharedMeta keySharedMeta) {
        return keySharedMeta.getKeySharedMode() == this.keySharedMode && keySharedMeta.isAllowOutOfOrderDelivery() == this.allowOutOfOrderDelivery;
    }

    public LinkedHashMap<Consumer, Position> getRecentlyJoinedConsumers() {
        return this.recentlyJoinedConsumers;
    }

    public synchronized String getLastSentPosition() {
        if (this.lastSentPosition == null) {
            return null;
        }
        return this.lastSentPosition.toString();
    }

    @VisibleForTesting
    public Position getLastSentPositionField() {
        return this.lastSentPosition;
    }

    public synchronized String getIndividuallySentPositions() {
        if (this.individuallySentPositions == null) {
            return null;
        }
        return this.individuallySentPositions.toString();
    }

    @VisibleForTesting
    public LongPairRangeSet<Position> getIndividuallySentPositionsField() {
        return this.individuallySentPositions;
    }

    public Map<Consumer, List<org.apache.pulsar.client.api.Range>> getConsumerKeyHashRanges() {
        return this.selector.getConsumerKeyHashRanges();
    }
}
