package org.apache.pulsar.broker.service.persistent;

import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.ZkAdminPaths;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.InMemoryRedeliveryTracker;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
import org.apache.pulsar.common.util.collections.LongPairSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.class */
public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMultipleConsumers implements Dispatcher, AsyncCallbacks.ReadEntriesCallback {
    protected final PersistentTopic topic;
    protected final ManagedCursor cursor;
    protected volatile Range<PositionImpl> lastIndividualDeletedRangeFromCursorRecovery;
    private CompletableFuture<Void> closeFuture;
    LongPairSet messagesToRedeliver;
    protected final RedeliveryTracker redeliveryTracker;
    private Optional<DelayedDeliveryTracker> delayedDeliveryTracker;
    private volatile boolean havePendingRead;
    private volatile boolean havePendingReplayRead;
    private boolean shouldRewindBeforeReadingOrReplaying;
    protected final String name;
    protected volatile int totalAvailablePermits;
    private volatile int readBatchSize;
    private final Backoff readFailureBackoff;
    private volatile int totalUnackedMessages;
    private volatile int blockedDispatcherOnUnackedMsgs;
    protected final ServiceConfiguration serviceConfig;
    protected Optional<DispatchRateLimiter> dispatchRateLimiter;
    protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> TOTAL_AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "totalAvailablePermits");
    private static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> TOTAL_UNACKED_MESSAGES_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "totalUnackedMessages");
    private static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "blockedDispatcherOnUnackedMsgs");
    private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers$ReadType.class */
    public enum ReadType {
        Normal,
        Replay
    }

    public PersistentDispatcherMultipleConsumers(PersistentTopic persistentTopic, ManagedCursor managedCursor, Subscription subscription) {
        super(subscription);
        this.closeFuture = null;
        this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
        this.delayedDeliveryTracker = Optional.empty();
        this.havePendingRead = false;
        this.havePendingReplayRead = false;
        this.shouldRewindBeforeReadingOrReplaying = false;
        this.totalAvailablePermits = 0;
        this.readFailureBackoff = new Backoff(15L, TimeUnit.SECONDS, 1L, TimeUnit.MINUTES, 0L, TimeUnit.MILLISECONDS);
        this.totalUnackedMessages = 0;
        this.blockedDispatcherOnUnackedMsgs = 0;
        this.dispatchRateLimiter = Optional.empty();
        this.serviceConfig = persistentTopic.getBrokerService().pulsar().getConfiguration();
        this.cursor = managedCursor;
        this.lastIndividualDeletedRangeFromCursorRecovery = managedCursor.getLastIndividualDeletedRange();
        this.name = persistentTopic.getName() + " / " + Codec.decode(managedCursor.getName());
        this.topic = persistentTopic;
        this.redeliveryTracker = this.serviceConfig.isSubscriptionRedeliveryTrackerEnabled() ? new InMemoryRedeliveryTracker() : RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
        this.readBatchSize = this.serviceConfig.getDispatcherMaxReadBatchSize();
        initializeDispatchRateLimiterIfNeeded(Optional.empty());
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
        if (IS_CLOSED_UPDATER.get(this) == 1) {
            log.warn("[{}] Dispatcher is already closed. Closing consumer {}", this.name, consumer);
            consumer.disconnect();
            return;
        }
        if (this.consumerList.isEmpty()) {
            if (this.havePendingRead || this.havePendingReplayRead) {
                this.shouldRewindBeforeReadingOrReplaying = true;
            } else {
                this.cursor.rewind();
                this.shouldRewindBeforeReadingOrReplaying = false;
            }
            this.messagesToRedeliver.clear();
        }
        if (isConsumersExceededOnSubscription()) {
            log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit", this.name);
            throw new BrokerServiceException.ConsumerBusyException("Subscription reached max consumers limit");
        }
        this.consumerList.add(consumer);
        this.consumerList.sort((consumer2, consumer3) -> {
            return consumer2.getPriorityLevel() - consumer3.getPriorityLevel();
        });
        this.consumerSet.add(consumer);
    }

    private boolean isConsumersExceededOnSubscription() {
        Policies policies = null;
        Integer num = null;
        try {
            num = (Integer) Optional.ofNullable(this.topic.getBrokerService().getTopicPolicies(TopicName.get(this.topic.getName()))).map((v0) -> {
                return v0.getMaxConsumersPerSubscription();
            }).orElse(null);
            if (num == null) {
                policies = (Policies) this.topic.getBrokerService().pulsar().getConfigurationCache().policiesCache().getDataIfPresent(AdminResource.path(ZkAdminPaths.POLICIES, TopicName.get(this.topic.getName()).getNamespace()));
                if (policies == null) {
                    policies = new Policies();
                }
            }
        } catch (Exception e) {
            policies = new Policies();
        }
        if (num == null) {
            num = Integer.valueOf(policies.max_consumers_per_subscription > 0 ? policies.max_consumers_per_subscription : this.serviceConfig.getMaxConsumersPerSubscription());
        }
        return num.intValue() > 0 && num.intValue() <= this.consumerList.size();
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
        addUnAckedMessages(-consumer.getUnackedMessages());
        if (this.consumerSet.removeAll(consumer) != 1) {
            log.info("[{}] Trying to remove a non-connected consumer: {}", this.name, consumer);
            return;
        }
        this.consumerList.remove(consumer);
        log.info("Removed consumer {} with pending {} acks", consumer, Long.valueOf(consumer.getPendingAcks().size()));
        if (!this.consumerList.isEmpty()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Consumer are left, reading more entries", this.name);
            }
            consumer.getPendingAcks().forEach((j, j2, j3, j4) -> {
                this.messagesToRedeliver.add(j, j2);
                this.redeliveryTracker.addIfAbsent(PositionImpl.get(j, j2));
            });
            this.totalAvailablePermits -= consumer.getAvailablePermits();
            readMoreEntries();
            return;
        }
        if (this.havePendingRead && this.cursor.cancelPendingReadRequest()) {
            this.havePendingRead = false;
        }
        this.messagesToRedeliver.clear();
        this.redeliveryTracker.clear();
        if (this.closeFuture != null) {
            log.info("[{}] All consumers removed. Subscription is disconnected", this.name);
            this.closeFuture.complete(null);
        }
        this.totalAvailablePermits = 0;
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public synchronized void consumerFlow(Consumer consumer, int i) {
        if (!this.consumerSet.contains(consumer)) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Ignoring flow control from disconnected consumer {}", this.name, consumer);
            }
        } else {
            this.totalAvailablePermits += i;
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Trigger new read after receiving flow control message with permits {}", new Object[]{this.name, consumer, Integer.valueOf(this.totalAvailablePermits)});
            }
            readMoreEntries();
        }
    }

    public void readMoreEntries() {
        int i = this.totalAvailablePermits;
        if (i <= 0 || !isAtleastOneConsumerAvailable()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Consumer buffer is full, pause reading", this.name);
                return;
            }
            return;
        }
        int min = Math.min(i, this.readBatchSize);
        Consumer randomConsumer = getRandomConsumer();
        if (randomConsumer != null && randomConsumer.isPreciseDispatcherFlowControl()) {
            min = Math.min((int) Math.ceil((i * 1.0d) / randomConsumer.getAvgMessagesPerEntry()), this.readBatchSize);
        }
        if (!isConsumerWritable()) {
            min = 1;
        }
        if (this.serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !this.cursor.isActive()) {
            if (this.topic.getDispatchRateLimiter().isPresent() && this.topic.getDispatchRateLimiter().get().isDispatchRateLimitingEnabled()) {
                DispatchRateLimiter dispatchRateLimiter = this.topic.getDispatchRateLimiter().get();
                if (!dispatchRateLimiter.hasMessageDispatchPermit()) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] message-read exceeded topic message-rate {}/{}, schedule after a {}", new Object[]{this.name, Long.valueOf(dispatchRateLimiter.getDispatchRateOnMsg()), Long.valueOf(dispatchRateLimiter.getDispatchRateOnByte()), 1000});
                    }
                    this.topic.getBrokerService().executor().schedule(() -> {
                        readMoreEntries();
                    }, 1000L, TimeUnit.MILLISECONDS);
                    return;
                } else {
                    long availableDispatchRateLimitOnMsg = dispatchRateLimiter.getAvailableDispatchRateLimitOnMsg();
                    if (availableDispatchRateLimitOnMsg > 0) {
                        min = Math.min(min, (int) availableDispatchRateLimitOnMsg);
                    }
                }
            }
            if (this.dispatchRateLimiter.isPresent() && this.dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
                if (!this.dispatchRateLimiter.get().hasMessageDispatchPermit()) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] message-read exceeded subscription message-rate {}/{}, schedule after a {}", new Object[]{this.name, Long.valueOf(this.dispatchRateLimiter.get().getDispatchRateOnMsg()), Long.valueOf(this.dispatchRateLimiter.get().getDispatchRateOnByte()), 1000});
                    }
                    this.topic.getBrokerService().executor().schedule(() -> {
                        readMoreEntries();
                    }, 1000L, TimeUnit.MILLISECONDS);
                    return;
                } else {
                    long availableDispatchRateLimitOnMsg2 = this.dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
                    if (availableDispatchRateLimitOnMsg2 > 0) {
                        min = Math.min(min, (int) availableDispatchRateLimitOnMsg2);
                    }
                }
            }
        }
        if (this.havePendingReplayRead) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Skipping replay while awaiting previous read to complete", this.name);
                return;
            }
            return;
        }
        int max = Math.max(min, 1);
        Set<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(max);
        if (!messagesToReplayNow.isEmpty()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Schedule replay of {} messages for {} consumers", new Object[]{this.name, Integer.valueOf(messagesToReplayNow.size()), Integer.valueOf(this.consumerList.size())});
            }
            this.havePendingReplayRead = true;
            Set<? extends Position> asyncReplayEntriesInOrder = this.topic.isDelayedDeliveryEnabled() ? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow);
            asyncReplayEntriesInOrder.forEach(position -> {
                this.messagesToRedeliver.remove(((PositionImpl) position).getLedgerId(), ((PositionImpl) position).getEntryId());
            });
            if (messagesToReplayNow.size() - asyncReplayEntriesInOrder.size() == 0) {
                this.havePendingReplayRead = false;
                readMoreEntries();
                return;
            }
            return;
        }
        if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == 1) {
            log.warn("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", new Object[]{this.name, Integer.valueOf(this.totalUnackedMessages), Integer.valueOf(this.topic.getMaxUnackedMessagesOnSubscription())});
            return;
        }
        if (this.havePendingRead) {
            log.debug("[{}] Cannot schedule next read until previous one is done", this.name);
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Schedule read of {} messages for {} consumers", new Object[]{this.name, Integer.valueOf(max), Integer.valueOf(this.consumerList.size())});
        }
        this.havePendingRead = true;
        this.cursor.asyncReadEntriesOrWait(max, this.serviceConfig.getDispatcherMaxReadSizeBytes(), this, ReadType.Normal);
    }

    protected Set<? extends Position> asyncReplayEntries(Set<? extends Position> set) {
        return this.cursor.asyncReplayEntries(set, this, ReadType.Replay);
    }

    protected Set<? extends Position> asyncReplayEntriesInOrder(Set<? extends Position> set) {
        return this.cursor.asyncReplayEntries(set, this, ReadType.Replay, true);
    }

    @Override // org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public boolean isConsumerConnected() {
        return !this.consumerList.isEmpty();
    }

    @Override // org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public CopyOnWriteArrayList<Consumer> getConsumers() {
        return this.consumerList;
    }

    @Override // org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public synchronized boolean canUnsubscribe(Consumer consumer) {
        return this.consumerList.size() == 1 && this.consumerSet.contains(consumer);
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public CompletableFuture<Void> close() {
        Optional<DelayedDeliveryTracker> optional;
        IS_CLOSED_UPDATER.set(this, 1);
        synchronized (this) {
            optional = this.delayedDeliveryTracker;
            this.delayedDeliveryTracker = Optional.empty();
        }
        optional.ifPresent((v0) -> {
            v0.close();
        });
        this.dispatchRateLimiter.ifPresent((v0) -> {
            v0.close();
        });
        return disconnectAllConsumers();
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public synchronized CompletableFuture<Void> disconnectAllConsumers(boolean z) {
        this.closeFuture = new CompletableFuture<>();
        if (this.consumerList.isEmpty()) {
            this.closeFuture.complete(null);
        } else {
            this.consumerList.forEach(consumer -> {
                consumer.disconnect(z);
            });
            if (this.havePendingRead && this.cursor.cancelPendingReadRequest()) {
                this.havePendingRead = false;
            }
        }
        return this.closeFuture;
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public CompletableFuture<Void> disconnectActiveConsumers(boolean z) {
        return disconnectAllConsumers(z);
    }

    @Override // org.apache.pulsar.broker.service.AbstractBaseDispatcher, org.apache.pulsar.broker.service.Dispatcher
    public synchronized void resetCloseFuture() {
        this.closeFuture = null;
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public void reset() {
        resetCloseFuture();
        IS_CLOSED_UPDATER.set(this, 0);
    }

    @Override // org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public PulsarApi.CommandSubscribe.SubType getType() {
        return PulsarApi.CommandSubscribe.SubType.Shared;
    }

    public synchronized void readEntriesComplete(List<Entry> list, Object obj) {
        ReadType readType = (ReadType) obj;
        if (readType == ReadType.Normal) {
            this.havePendingRead = false;
        } else {
            this.havePendingReplayRead = false;
        }
        if (this.readBatchSize < this.serviceConfig.getDispatcherMaxReadBatchSize()) {
            int min = Math.min(this.readBatchSize * 2, this.serviceConfig.getDispatcherMaxReadBatchSize());
            if (log.isDebugEnabled()) {
                log.debug("[{}] Increasing read batch size from {} to {}", new Object[]{this.name, Integer.valueOf(this.readBatchSize), Integer.valueOf(min)});
            }
            this.readBatchSize = min;
        }
        this.readFailureBackoff.reduceToHalf();
        if (!this.shouldRewindBeforeReadingOrReplaying || readType != ReadType.Normal) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Distributing {} messages to {} consumers", new Object[]{this.name, Integer.valueOf(list.size()), Integer.valueOf(this.consumerList.size())});
            }
            sendMessagesToConsumers(readType, list);
        } else {
            list.forEach((v0) -> {
                v0.release();
            });
            this.cursor.rewind();
            this.shouldRewindBeforeReadingOrReplaying = false;
            readMoreEntries();
        }
    }

    protected void sendMessagesToConsumers(ReadType readType, List<Entry> list) {
        if (needTrimAckedMessages()) {
            this.cursor.trimDeletedEntries(list);
        }
        int size = list.size();
        if (size == 0) {
            readMoreEntries();
            return;
        }
        int i = 0;
        long j = 0;
        long j2 = 0;
        while (size > 0 && this.totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
            Consumer nextConsumer = getNextConsumer();
            if (nextConsumer == null) {
                log.info("[{}] rewind because no available consumer found from total {}", this.name, Integer.valueOf(this.consumerList.size()));
                list.subList(i, list.size()).forEach((v0) -> {
                    v0.release();
                });
                this.cursor.rewind();
                return;
            }
            int availablePermits = nextConsumer.isWritable() ? nextConsumer.getAvailablePermits() : 1;
            if (log.isDebugEnabled() && !nextConsumer.isWritable()) {
                log.debug("[{}-{}] consumer is not writable. dispatching only 1 message to {} ", new Object[]{this.topic.getName(), this.name, nextConsumer});
            }
            int min = Math.min(Math.min(size, availablePermits), this.serviceConfig.getDispatcherMaxRoundRobinBatchSize());
            if (min > 0) {
                if (readType == ReadType.Replay) {
                    list.subList(i, i + min).forEach(entry -> {
                        this.messagesToRedeliver.remove(entry.getLedgerId(), entry.getEntryId());
                    });
                }
                SendMessageInfo threadLocal = SendMessageInfo.getThreadLocal();
                List<Entry> subList = list.subList(i, i + min);
                EntryBatchSizes entryBatchSizes = EntryBatchSizes.get(subList.size());
                EntryBatchIndexesAcks entryBatchIndexesAcks = EntryBatchIndexesAcks.get(subList.size());
                filterEntriesForConsumer(subList, entryBatchSizes, threadLocal, entryBatchIndexesAcks, this.cursor, readType == ReadType.Replay);
                nextConsumer.sendMessages(subList, entryBatchSizes, entryBatchIndexesAcks, threadLocal.getTotalMessages(), threadLocal.getTotalBytes(), threadLocal.getTotalChunkedMessages(), this.redeliveryTracker);
                i += min;
                size -= min;
                TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -(threadLocal.getTotalMessages() - entryBatchIndexesAcks.getTotalAckedIndexCount()));
                j += threadLocal.getTotalMessages();
                j2 += threadLocal.getTotalBytes();
            }
        }
        if (this.serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !this.cursor.isActive()) {
            if (this.topic.getDispatchRateLimiter().isPresent()) {
                this.topic.getDispatchRateLimiter().get().tryDispatchPermit(j, j2);
            }
            if (this.dispatchRateLimiter.isPresent()) {
                this.dispatchRateLimiter.get().tryDispatchPermit(j, j2);
            }
        }
        if (size > 0) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] No consumers found with available permits, storing {} positions for later replay", this.name, Integer.valueOf(list.size() - i));
            }
            list.subList(i, list.size()).forEach(entry2 -> {
                this.messagesToRedeliver.add(entry2.getLedgerId(), entry2.getEntryId());
                entry2.release();
            });
        }
        readMoreEntries();
    }

    public synchronized void readEntriesFailed(ManagedLedgerException managedLedgerException, Object obj) {
        ReadType readType = (ReadType) obj;
        long next = this.readFailureBackoff.next();
        if (managedLedgerException instanceof ManagedLedgerException.NoMoreEntriesToReadException) {
            if (this.cursor.getNumberOfEntriesInBacklog(false) == 0) {
                this.consumerList.forEach((v0) -> {
                    v0.reachedEndOfTopic();
                });
            }
        } else if (managedLedgerException.getCause() instanceof TransactionNotSealedException) {
            next = 1;
            if (log.isDebugEnabled()) {
                log.debug("[{}] Error reading transaction entries : {}, Read Type {} - Retrying to read in {} seconds", new Object[]{this.name, managedLedgerException.getMessage(), readType, Double.valueOf(1 / 1000.0d)});
            }
        } else if (!(managedLedgerException instanceof ManagedLedgerException.TooManyRequestsException)) {
            log.error("[{}] Error reading entries at {} : {}, Read Type {} - Retrying to read in {} seconds", new Object[]{this.name, this.cursor.getReadPosition(), managedLedgerException.getMessage(), readType, Double.valueOf(next / 1000.0d)});
        } else if (log.isDebugEnabled()) {
            log.debug("[{}] Error reading entries at {} : {}, Read Type {} - Retrying to read in {} seconds", new Object[]{this.name, this.cursor.getReadPosition(), managedLedgerException.getMessage(), readType, Double.valueOf(next / 1000.0d)});
        }
        if (this.shouldRewindBeforeReadingOrReplaying) {
            this.shouldRewindBeforeReadingOrReplaying = false;
            this.cursor.rewind();
        }
        if (readType == ReadType.Normal) {
            this.havePendingRead = false;
        } else {
            this.havePendingReplayRead = false;
            if (managedLedgerException instanceof ManagedLedgerException.InvalidReplayPositionException) {
                PositionImpl markDeletedPosition = this.cursor.getMarkDeletedPosition();
                this.messagesToRedeliver.removeIf((j, j2) -> {
                    return ComparisonChain.start().compare(j, markDeletedPosition.getLedgerId()).compare(j2, markDeletedPosition.getEntryId()).result() <= 0;
                });
            }
        }
        this.readBatchSize = this.serviceConfig.getDispatcherMinReadBatchSize();
        this.topic.getBrokerService().executor().schedule(() -> {
            synchronized (this) {
                if (this.havePendingRead) {
                    log.info("[{}] Skipping read retry: havePendingRead {}", new Object[]{this.name, Boolean.valueOf(this.havePendingRead), managedLedgerException});
                } else {
                    log.info("[{}] Retrying read operation", this.name);
                    readMoreEntries();
                }
            }
        }, next, TimeUnit.MILLISECONDS);
    }

    private boolean needTrimAckedMessages() {
        return this.lastIndividualDeletedRangeFromCursorRecovery != null && this.lastIndividualDeletedRangeFromCursorRecovery.upperEndpoint().compareTo(this.cursor.getReadPosition()) > 0;
    }

    protected boolean isAtleastOneConsumerAvailable() {
        if (this.consumerList.isEmpty() || IS_CLOSED_UPDATER.get(this) == 1) {
            return false;
        }
        Iterator<Consumer> it = this.consumerList.iterator();
        while (it.hasNext()) {
            if (isConsumerAvailable(it.next())) {
                return true;
            }
        }
        return false;
    }

    private boolean isConsumerWritable() {
        Iterator<Consumer> it = this.consumerList.iterator();
        while (it.hasNext()) {
            if (it.next().isWritable()) {
                return true;
            }
        }
        if (!log.isDebugEnabled()) {
            return false;
        }
        log.debug("[{}-{}] consumer is not writable", this.topic.getName(), this.name);
        return false;
    }

    @Override // org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers
    public boolean isConsumerAvailable(Consumer consumer) {
        return (consumer == null || consumer.isBlocked() || consumer.getAvailablePermits() <= 0) ? false : true;
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {
        consumer.getPendingAcks().forEach((j, j2, j3, j4) -> {
            this.messagesToRedeliver.add(j, j2);
        });
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", new Object[]{this.name, consumer, this.messagesToRedeliver});
        }
        readMoreEntries();
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> list) {
        list.forEach(positionImpl -> {
            this.messagesToRedeliver.add(positionImpl.getLedgerId(), positionImpl.getEntryId());
            this.redeliveryTracker.addIfAbsent(positionImpl);
        });
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", new Object[]{this.name, consumer, list});
        }
        readMoreEntries();
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public void addUnAckedMessages(int i) {
        int maxUnackedMessagesOnSubscription = this.topic.getMaxUnackedMessagesOnSubscription();
        if (maxUnackedMessagesOnSubscription == -1) {
            maxUnackedMessagesOnSubscription = this.topic.getBrokerService().pulsar().getConfiguration().getMaxUnackedMessagesPerSubscription();
        }
        if (maxUnackedMessagesOnSubscription <= 0) {
            return;
        }
        int addAndGet = TOTAL_UNACKED_MESSAGES_UPDATER.addAndGet(this, i);
        if (addAndGet >= maxUnackedMessagesOnSubscription && BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, 0, 1)) {
            log.info("[{}] Dispatcher is blocked due to unackMessages {} reached to max {}", new Object[]{this.name, Integer.valueOf(TOTAL_UNACKED_MESSAGES_UPDATER.get(this)), Integer.valueOf(maxUnackedMessagesOnSubscription)});
        } else if (this.topic.getBrokerService().isBrokerDispatchingBlocked() && this.blockedDispatcherOnUnackedMsgs == 1) {
            if (this.totalUnackedMessages < this.topic.getBrokerService().maxUnackedMsgsPerDispatcher / 2 && BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, 1, 0)) {
                this.topic.getBrokerService().unblockDispatchersOnUnAckMessages(Lists.newArrayList(new PersistentDispatcherMultipleConsumers[]{this}));
            }
        } else if (this.blockedDispatcherOnUnackedMsgs == 1 && addAndGet < maxUnackedMessagesOnSubscription / 2 && BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, 1, 0)) {
            log.info("[{}] Dispatcher is unblocked", this.name);
            this.topic.getBrokerService().executor().execute(() -> {
                readMoreEntries();
            });
        }
        this.topic.getBrokerService().addUnAckedMessages(this, i);
    }

    public boolean isBlockedDispatcherOnUnackedMsgs() {
        return this.blockedDispatcherOnUnackedMsgs == 1;
    }

    public void blockDispatcherOnUnackedMsgs() {
        this.blockedDispatcherOnUnackedMsgs = 1;
    }

    public void unBlockDispatcherOnUnackedMsgs() {
        this.blockedDispatcherOnUnackedMsgs = 0;
    }

    public int getTotalUnackedMessages() {
        return this.totalUnackedMessages;
    }

    public String getName() {
        return this.name;
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public RedeliveryTracker getRedeliveryTracker() {
        return this.redeliveryTracker;
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public Optional<DispatchRateLimiter> getRateLimiter() {
        return this.dispatchRateLimiter;
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public void updateRateLimiter(DispatchRate dispatchRate) {
        if (!this.dispatchRateLimiter.isPresent() && dispatchRate != null) {
            this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this.topic, DispatchRateLimiter.Type.SUBSCRIPTION));
        }
        this.dispatchRateLimiter.ifPresent(dispatchRateLimiter -> {
            if (dispatchRate != null) {
                this.dispatchRateLimiter.get().updateDispatchRate(dispatchRate);
            } else {
                this.dispatchRateLimiter.get().updateDispatchRate();
            }
        });
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> optional) {
        if (this.dispatchRateLimiter.isPresent() || !DispatchRateLimiter.isDispatchRateNeeded(this.topic.getBrokerService(), optional, this.topic.getName(), DispatchRateLimiter.Type.SUBSCRIPTION)) {
            return;
        }
        this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this.topic, DispatchRateLimiter.Type.SUBSCRIPTION));
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public boolean trackDelayedDelivery(long j, long j2, PulsarApi.MessageMetadata messageMetadata) {
        boolean addMessage;
        if (!this.topic.isDelayedDeliveryEnabled()) {
            return false;
        }
        synchronized (this) {
            if (!this.delayedDeliveryTracker.isPresent()) {
                this.delayedDeliveryTracker = Optional.of(this.topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
            }
            this.delayedDeliveryTracker.get().resetTickTime(this.topic.getDelayedDeliveryTickTimeMillis());
            addMessage = this.delayedDeliveryTracker.get().addMessage(j, j2, messageMetadata.getDeliverAtTime());
        }
        return addMessage;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized Set<PositionImpl> getMessagesToReplayNow(int i) {
        if (!this.messagesToRedeliver.isEmpty()) {
            return this.messagesToRedeliver.items(i, (j, j2) -> {
                return new PositionImpl(j, j2);
            });
        }
        if (!this.delayedDeliveryTracker.isPresent() || !this.delayedDeliveryTracker.get().hasMessageAvailable()) {
            return Collections.emptySet();
        }
        this.delayedDeliveryTracker.get().resetTickTime(this.topic.getDelayedDeliveryTickTimeMillis());
        return this.delayedDeliveryTracker.get().getScheduledMessages(i);
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public synchronized long getNumberOfDelayedMessages() {
        return ((Long) this.delayedDeliveryTracker.map((v0) -> {
            return v0.getNumberOfDelayedMessages();
        }).orElse(0L)).longValue();
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public void clearDelayedMessages() {
        this.delayedDeliveryTracker.ifPresent((v0) -> {
            v0.clear();
        });
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public void cursorIsReset() {
        if (this.lastIndividualDeletedRangeFromCursorRecovery != null) {
            this.lastIndividualDeletedRangeFromCursorRecovery = null;
        }
    }

    @Override // org.apache.pulsar.broker.service.AbstractBaseDispatcher
    public void addMessageToReplay(long j, long j2) {
        this.messagesToRedeliver.add(j, j2);
    }

    public PersistentTopic getTopic() {
        return this.topic;
    }
}
