/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.InMemoryRedeliveryTracker;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
import org.apache.pulsar.common.util.collections.LongPairSet;
import org.apache.pulsar.utils.CopyOnWriteArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMultipleConsumers
implements Dispatcher,
AsyncCallbacks.ReadEntriesCallback {
    protected final PersistentTopic topic;
    protected final ManagedCursor cursor;
    protected volatile Range<PositionImpl> lastIndividualDeletedRangeFromCursorRecovery;
    private CompletableFuture<Void> closeFuture = null;
    LongPairSet messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
    protected final RedeliveryTracker redeliveryTracker;
    private Optional<DelayedDeliveryTracker> delayedDeliveryTracker = Optional.empty();
    private final boolean isDelayedDeliveryEnabled;
    private volatile boolean havePendingRead = false;
    private volatile boolean havePendingReplayRead = false;
    private boolean shouldRewindBeforeReadingOrReplaying = false;
    protected final String name;
    protected volatile int totalAvailablePermits = 0;
    private volatile int readBatchSize;
    private final Backoff readFailureBackoff = new Backoff(15L, TimeUnit.SECONDS, 1L, TimeUnit.MINUTES, 0L, TimeUnit.MILLISECONDS);
    private static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> TOTAL_UNACKED_MESSAGES_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "totalUnackedMessages");
    private volatile int totalUnackedMessages = 0;
    private final int maxUnackedMessages;
    private volatile int blockedDispatcherOnUnackedMsgs = 0;
    private static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "blockedDispatcherOnUnackedMsgs");
    protected final ServiceConfiguration serviceConfig;
    protected Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
    private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);

    public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription) {
        super(subscription);
        this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
        this.cursor = cursor;
        this.lastIndividualDeletedRangeFromCursorRecovery = cursor.getLastIndividualDeletedRange();
        this.name = topic.getName() + " / " + Codec.decode((String)cursor.getName());
        this.topic = topic;
        this.redeliveryTracker = this.serviceConfig.isSubscriptionRedeliveryTrackerEnabled() ? new InMemoryRedeliveryTracker() : RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
        this.readBatchSize = this.serviceConfig.getDispatcherMaxReadBatchSize();
        this.maxUnackedMessages = topic.getBrokerService().pulsar().getConfiguration().getMaxUnackedMessagesPerSubscription();
        this.isDelayedDeliveryEnabled = topic.getBrokerService().pulsar().getConfiguration().isDelayedDeliveryEnabled();
        this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
    }

    @Override
    public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
        if (IS_CLOSED_UPDATER.get(this) == 1) {
            log.warn("[{}] Dispatcher is already closed. Closing consumer ", (Object)this.name, (Object)consumer);
            consumer.disconnect();
            return;
        }
        if (this.consumerList.isEmpty()) {
            if (this.havePendingRead || this.havePendingReplayRead) {
                this.shouldRewindBeforeReadingOrReplaying = true;
            } else {
                this.cursor.rewind();
                this.shouldRewindBeforeReadingOrReplaying = false;
            }
            this.messagesToRedeliver.clear();
        }
        if (this.isConsumersExceededOnTopic()) {
            log.warn("[{}] Attempting to add consumer to topic which reached max consumers limit", (Object)this.name);
            throw new BrokerServiceException.ConsumerBusyException("Topic reached max consumers limit");
        }
        if (this.isConsumersExceededOnSubscription()) {
            log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit", (Object)this.name);
            throw new BrokerServiceException.ConsumerBusyException("Subscription reached max consumers limit");
        }
        this.consumerList.add(consumer);
        this.consumerList.sort((c1, c2) -> c1.getPriorityLevel() - c2.getPriorityLevel());
        this.consumerSet.add((Object)consumer);
    }

    private boolean isConsumersExceededOnTopic() {
        int maxConsumersPerTopic;
        Policies policies;
        try {
            policies = (Policies)this.topic.getBrokerService().pulsar().getConfigurationCache().policiesCache().getDataIfPresent(AdminResource.path("policies", TopicName.get((String)this.topic.getName()).getNamespace()));
            if (policies == null) {
                policies = new Policies();
            }
        }
        catch (Exception e) {
            policies = new Policies();
        }
        int n = maxConsumersPerTopic = policies.max_consumers_per_topic > 0 ? policies.max_consumers_per_topic : this.serviceConfig.getMaxConsumersPerTopic();
        return maxConsumersPerTopic > 0 && maxConsumersPerTopic <= this.topic.getNumberOfConsumers();
    }

    private boolean isConsumersExceededOnSubscription() {
        int maxConsumersPerSubscription;
        Policies policies;
        try {
            policies = (Policies)this.topic.getBrokerService().pulsar().getConfigurationCache().policiesCache().getDataIfPresent(AdminResource.path("policies", TopicName.get((String)this.topic.getName()).getNamespace()));
            if (policies == null) {
                policies = new Policies();
            }
        }
        catch (Exception e) {
            policies = new Policies();
        }
        int n = maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0 ? policies.max_consumers_per_subscription : this.serviceConfig.getMaxConsumersPerSubscription();
        return maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= this.consumerList.size();
    }

    @Override
    public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
        this.addUnAckedMessages(-consumer.getUnackedMessages());
        if (this.consumerSet.removeAll((Object)consumer) == 1) {
            this.consumerList.remove(consumer);
            log.info("Removed consumer {} with pending {} acks", (Object)consumer, (Object)consumer.getPendingAcks().size());
            if (this.consumerList.isEmpty()) {
                if (this.havePendingRead && this.cursor.cancelPendingReadRequest()) {
                    this.havePendingRead = false;
                }
                this.messagesToRedeliver.clear();
                if (this.closeFuture != null) {
                    log.info("[{}] All consumers removed. Subscription is disconnected", (Object)this.name);
                    this.closeFuture.complete(null);
                }
                this.totalAvailablePermits = 0;
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Consumer are left, reading more entries", (Object)this.name);
                }
                consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, none) -> this.messagesToRedeliver.add(ledgerId, entryId));
                this.totalAvailablePermits -= consumer.getAvailablePermits();
                this.readMoreEntries();
            }
        } else if (log.isDebugEnabled()) {
            log.debug("[{}] Trying to remove a non-connected consumer: {}", (Object)this.name, (Object)consumer);
        }
    }

    @Override
    public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
        if (!this.consumerSet.contains((Object)consumer)) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Ignoring flow control from disconnected consumer {}", (Object)this.name, (Object)consumer);
            }
            return;
        }
        this.totalAvailablePermits += additionalNumberOfMessages;
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Trigger new read after receiving flow control message with permits {}", new Object[]{this.name, consumer, this.totalAvailablePermits});
        }
        this.readMoreEntries();
    }

    public void readMoreEntries() {
        if (this.totalAvailablePermits > 0 && this.isAtleastOneConsumerAvailable()) {
            int messagesToRead = Math.min(this.totalAvailablePermits, this.readBatchSize);
            if (this.serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !this.cursor.isActive()) {
                if (this.topic.getDispatchRateLimiter().isPresent() && this.topic.getDispatchRateLimiter().get().isDispatchRateLimitingEnabled()) {
                    DispatchRateLimiter topicRateLimiter = this.topic.getDispatchRateLimiter().get();
                    if (!topicRateLimiter.hasMessageDispatchPermit()) {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] message-read exceeded topic message-rate {}/{}, schedule after a {}", new Object[]{this.name, topicRateLimiter.getDispatchRateOnMsg(), topicRateLimiter.getDispatchRateOnByte(), 1000});
                        }
                        this.topic.getBrokerService().executor().schedule(() -> this.readMoreEntries(), 1000L, TimeUnit.MILLISECONDS);
                        return;
                    }
                    long availablePermitsOnMsg = topicRateLimiter.getAvailableDispatchRateLimitOnMsg();
                    if (availablePermitsOnMsg > 0L) {
                        messagesToRead = Math.min(messagesToRead, (int)availablePermitsOnMsg);
                    }
                }
                if (this.dispatchRateLimiter.isPresent() && this.dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
                    if (!this.dispatchRateLimiter.get().hasMessageDispatchPermit()) {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] message-read exceeded subscription message-rate {}/{}, schedule after a {}", new Object[]{this.name, this.dispatchRateLimiter.get().getDispatchRateOnMsg(), this.dispatchRateLimiter.get().getDispatchRateOnByte(), 1000});
                        }
                        this.topic.getBrokerService().executor().schedule(() -> this.readMoreEntries(), 1000L, TimeUnit.MILLISECONDS);
                        return;
                    }
                    long availablePermitsOnMsg = this.dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
                    if (availablePermitsOnMsg > 0L) {
                        messagesToRead = Math.min(messagesToRead, (int)availablePermitsOnMsg);
                    }
                }
            }
            if (this.havePendingReplayRead) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Skipping replay while awaiting previous read to complete", (Object)this.name);
                }
                return;
            }
            Set<PositionImpl> messagesToReplayNow = this.getMessagesToReplayNow(messagesToRead);
            if (!messagesToReplayNow.isEmpty()) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Schedule replay of {} messages for {} consumers", new Object[]{this.name, messagesToReplayNow.size(), this.consumerList.size()});
                }
                this.havePendingReplayRead = true;
                Set<? extends Position> deletedMessages = this.asyncReplayEntries(messagesToReplayNow);
                deletedMessages.forEach(position -> this.messagesToRedeliver.remove(((PositionImpl)position).getLedgerId(), ((PositionImpl)position).getEntryId()));
                if (messagesToReplayNow.size() - deletedMessages.size() == 0) {
                    this.havePendingReplayRead = false;
                    this.readMoreEntries();
                }
            } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == 1) {
                log.warn("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", new Object[]{this.name, this.totalUnackedMessages, this.maxUnackedMessages});
            } else if (!this.havePendingRead) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Schedule read of {} messages for {} consumers", new Object[]{this.name, messagesToRead, this.consumerList.size()});
                }
                this.havePendingRead = true;
                this.cursor.asyncReadEntriesOrWait(messagesToRead, (AsyncCallbacks.ReadEntriesCallback)this, (Object)ReadType.Normal);
            } else {
                log.debug("[{}] Cannot schedule next read until previous one is done", (Object)this.name);
            }
        } else if (log.isDebugEnabled()) {
            log.debug("[{}] Consumer buffer is full, pause reading", (Object)this.name);
        }
    }

    protected Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions) {
        return this.cursor.asyncReplayEntries(positions, (AsyncCallbacks.ReadEntriesCallback)this, (Object)ReadType.Replay);
    }

    @Override
    public boolean isConsumerConnected() {
        return !this.consumerList.isEmpty();
    }

    @Override
    public CopyOnWriteArrayList<Consumer> getConsumers() {
        return this.consumerList;
    }

    @Override
    public synchronized boolean canUnsubscribe(Consumer consumer) {
        return this.consumerList.size() == 1 && this.consumerSet.contains((Object)consumer);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> close() {
        Optional<DelayedDeliveryTracker> delayedDeliveryTracker;
        IS_CLOSED_UPDATER.set(this, 1);
        PersistentDispatcherMultipleConsumers persistentDispatcherMultipleConsumers = this;
        synchronized (persistentDispatcherMultipleConsumers) {
            delayedDeliveryTracker = this.delayedDeliveryTracker;
            this.delayedDeliveryTracker = Optional.empty();
        }
        if (delayedDeliveryTracker.isPresent()) {
            delayedDeliveryTracker.get().close();
        }
        if (this.dispatchRateLimiter.isPresent()) {
            this.dispatchRateLimiter.get().close();
        }
        return this.disconnectAllConsumers();
    }

    @Override
    public synchronized CompletableFuture<Void> disconnectAllConsumers() {
        this.closeFuture = new CompletableFuture();
        if (this.consumerList.isEmpty()) {
            this.closeFuture.complete(null);
        } else {
            this.consumerList.forEach(Consumer::disconnect);
            if (this.havePendingRead && this.cursor.cancelPendingReadRequest()) {
                this.havePendingRead = false;
            }
        }
        return this.closeFuture;
    }

    @Override
    public synchronized void resetCloseFuture() {
        this.closeFuture = null;
    }

    @Override
    public void reset() {
        this.resetCloseFuture();
        IS_CLOSED_UPDATER.set(this, 0);
    }

    @Override
    public PulsarApi.CommandSubscribe.SubType getType() {
        return PulsarApi.CommandSubscribe.SubType.Shared;
    }

    public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
        ReadType readType = (ReadType)((Object)ctx);
        if (readType == ReadType.Normal) {
            this.havePendingRead = false;
        } else {
            this.havePendingReplayRead = false;
        }
        if (this.readBatchSize < this.serviceConfig.getDispatcherMaxReadBatchSize()) {
            int newReadBatchSize = Math.min(this.readBatchSize * 2, this.serviceConfig.getDispatcherMaxReadBatchSize());
            if (log.isDebugEnabled()) {
                log.debug("[{}] Increasing read batch size from {} to {}", new Object[]{this.name, this.readBatchSize, newReadBatchSize});
            }
            this.readBatchSize = newReadBatchSize;
        }
        this.readFailureBackoff.reduceToHalf();
        if (this.shouldRewindBeforeReadingOrReplaying && readType == ReadType.Normal) {
            entries.forEach(Entry::release);
            this.cursor.rewind();
            this.shouldRewindBeforeReadingOrReplaying = false;
            this.readMoreEntries();
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Distributing {} messages to {} consumers", new Object[]{this.name, entries.size(), this.consumerList.size()});
        }
        this.sendMessagesToConsumers(readType, entries);
    }

    protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
        if (entries == null || entries.size() == 0) {
            return;
        }
        if (this.needTrimAckedMessages()) {
            this.cursor.trimDeletedEntries(entries);
        }
        int start = 0;
        int entriesToDispatch = entries.size();
        long totalMessagesSent = 0L;
        long totalBytesSent = 0L;
        while (entriesToDispatch > 0 && this.totalAvailablePermits > 0 && this.isAtleastOneConsumerAvailable()) {
            Consumer c = this.getNextConsumer();
            if (c == null) {
                log.info("[{}] rewind because no available consumer found from total {}", (Object)this.name, (Object)this.consumerList.size());
                entries.subList(start, entries.size()).forEach(Entry::release);
                this.cursor.rewind();
                return;
            }
            int messagesForC = Math.min(Math.min(entriesToDispatch, c.getAvailablePermits()), this.serviceConfig.getDispatcherMaxRoundRobinBatchSize());
            if (messagesForC <= 0) continue;
            if (readType == ReadType.Replay) {
                entries.subList(start, start + messagesForC).forEach(entry -> this.messagesToRedeliver.remove(entry.getLedgerId(), entry.getEntryId()));
            }
            SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
            List<Entry> entriesForThisConsumer = entries.subList(start, start + messagesForC);
            EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
            this.filterEntriesForConsumer(entriesForThisConsumer, batchSizes, sendMessageInfo);
            c.sendMessages(entriesForThisConsumer, batchSizes, sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(), this.redeliveryTracker);
            long msgSent = sendMessageInfo.getTotalMessages();
            start += messagesForC;
            entriesToDispatch -= messagesForC;
            this.totalAvailablePermits = (int)((long)this.totalAvailablePermits - msgSent);
            totalMessagesSent += (long)sendMessageInfo.getTotalMessages();
            totalBytesSent += sendMessageInfo.getTotalBytes();
        }
        if (this.serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !this.cursor.isActive()) {
            if (this.topic.getDispatchRateLimiter().isPresent()) {
                this.topic.getDispatchRateLimiter().get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
            }
            if (this.dispatchRateLimiter.isPresent()) {
                this.dispatchRateLimiter.get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
            }
        }
        if (entriesToDispatch > 0) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] No consumers found with available permits, storing {} positions for later replay", (Object)this.name, (Object)(entries.size() - start));
            }
            entries.subList(start, entries.size()).forEach(entry -> {
                this.messagesToRedeliver.add(entry.getLedgerId(), entry.getEntryId());
                entry.release();
            });
        }
        this.readMoreEntries();
    }

    public synchronized void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
        ReadType readType = (ReadType)((Object)ctx);
        long waitTimeMillis = this.readFailureBackoff.next();
        if (exception instanceof ManagedLedgerException.NoMoreEntriesToReadException) {
            if (this.cursor.getNumberOfEntriesInBacklog() == 0L) {
                this.consumerList.forEach(Consumer::reachedEndOfTopic);
            }
        } else if (!(exception instanceof ManagedLedgerException.TooManyRequestsException)) {
            log.error("[{}] Error reading entries at {} : {}, Read Type {} - Retrying to read in {} seconds", new Object[]{this.name, this.cursor.getReadPosition(), exception.getMessage(), readType, (double)waitTimeMillis / 1000.0});
        } else if (log.isDebugEnabled()) {
            log.debug("[{}] Error reading entries at {} : {}, Read Type {} - Retrying to read in {} seconds", new Object[]{this.name, this.cursor.getReadPosition(), exception.getMessage(), readType, (double)waitTimeMillis / 1000.0});
        }
        if (this.shouldRewindBeforeReadingOrReplaying) {
            this.shouldRewindBeforeReadingOrReplaying = false;
            this.cursor.rewind();
        }
        if (readType == ReadType.Normal) {
            this.havePendingRead = false;
        } else {
            this.havePendingReplayRead = false;
            if (exception instanceof ManagedLedgerException.InvalidReplayPositionException) {
                PositionImpl markDeletePosition = (PositionImpl)this.cursor.getMarkDeletedPosition();
                this.messagesToRedeliver.removeIf((ledgerId, entryId) -> ComparisonChain.start().compare(ledgerId, markDeletePosition.getLedgerId()).compare(entryId, markDeletePosition.getEntryId()).result() <= 0);
            }
        }
        this.readBatchSize = this.serviceConfig.getDispatcherMinReadBatchSize();
        this.topic.getBrokerService().executor().schedule(() -> {
            PersistentDispatcherMultipleConsumers persistentDispatcherMultipleConsumers = this;
            synchronized (persistentDispatcherMultipleConsumers) {
                if (!this.havePendingRead) {
                    log.info("[{}] Retrying read operation", (Object)this.name);
                    this.readMoreEntries();
                } else {
                    log.info("[{}] Skipping read retry: havePendingRead {}", new Object[]{this.name, this.havePendingRead, exception});
                }
            }
        }, waitTimeMillis, TimeUnit.MILLISECONDS);
    }

    private boolean needTrimAckedMessages() {
        if (this.lastIndividualDeletedRangeFromCursorRecovery == null) {
            return false;
        }
        return ((PositionImpl)this.lastIndividualDeletedRangeFromCursorRecovery.upperEndpoint()).compareTo((PositionImpl)this.cursor.getReadPosition()) > 0;
    }

    protected boolean isAtleastOneConsumerAvailable() {
        if (this.consumerList.isEmpty() || IS_CLOSED_UPDATER.get(this) == 1) {
            return false;
        }
        for (Consumer consumer : this.consumerList) {
            if (!this.isConsumerAvailable(consumer)) continue;
            return true;
        }
        return false;
    }

    @Override
    public boolean isConsumerAvailable(Consumer consumer) {
        return consumer != null && !consumer.isBlocked() && consumer.getAvailablePermits() > 0;
    }

    @Override
    public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {
        consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, none) -> this.messagesToRedeliver.add(ledgerId, entryId));
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", new Object[]{this.name, consumer, this.messagesToRedeliver});
        }
        this.readMoreEntries();
    }

    @Override
    public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
        positions.forEach(position -> {
            this.messagesToRedeliver.add(position.getLedgerId(), position.getEntryId());
            this.redeliveryTracker.incrementAndGetRedeliveryCount((Position)position);
        });
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", new Object[]{this.name, consumer, positions});
        }
        this.readMoreEntries();
    }

    @Override
    public void addUnAckedMessages(int numberOfMessages) {
        if (this.maxUnackedMessages <= 0) {
            return;
        }
        int unAckedMessages = TOTAL_UNACKED_MESSAGES_UPDATER.addAndGet(this, numberOfMessages);
        if (unAckedMessages >= this.maxUnackedMessages && BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, 0, 1)) {
            log.info("[{}] Dispatcher is blocked due to unackMessages {} reached to max {}", new Object[]{this.name, TOTAL_UNACKED_MESSAGES_UPDATER.get(this), this.maxUnackedMessages});
        } else if (this.topic.getBrokerService().isBrokerDispatchingBlocked() && this.blockedDispatcherOnUnackedMsgs == 1) {
            if (this.totalUnackedMessages < this.topic.getBrokerService().maxUnackedMsgsPerDispatcher / 2 && BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, 1, 0)) {
                this.topic.getBrokerService().unblockDispatchersOnUnAckMessages(Lists.newArrayList((Object[])new PersistentDispatcherMultipleConsumers[]{this}));
            }
        } else if (this.blockedDispatcherOnUnackedMsgs == 1 && unAckedMessages < this.maxUnackedMessages / 2 && BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, 1, 0)) {
            log.info("[{}] Dispatcher is unblocked", (Object)this.name);
            this.topic.getBrokerService().executor().execute(() -> this.readMoreEntries());
        }
        this.topic.getBrokerService().addUnAckedMessages(this, numberOfMessages);
    }

    public boolean isBlockedDispatcherOnUnackedMsgs() {
        return this.blockedDispatcherOnUnackedMsgs == 1;
    }

    public void blockDispatcherOnUnackedMsgs() {
        this.blockedDispatcherOnUnackedMsgs = 1;
    }

    public void unBlockDispatcherOnUnackedMsgs() {
        this.blockedDispatcherOnUnackedMsgs = 0;
    }

    public int getTotalUnackedMessages() {
        return this.totalUnackedMessages;
    }

    public String getName() {
        return this.name;
    }

    @Override
    public RedeliveryTracker getRedeliveryTracker() {
        return this.redeliveryTracker;
    }

    @Override
    public Optional<DispatchRateLimiter> getRateLimiter() {
        return this.dispatchRateLimiter;
    }

    @Override
    public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
        if (!this.dispatchRateLimiter.isPresent() && DispatchRateLimiter.isDispatchRateNeeded(this.topic.getBrokerService(), policies, this.topic.getName(), DispatchRateLimiter.Type.SUBSCRIPTION)) {
            this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this.topic, DispatchRateLimiter.Type.SUBSCRIPTION));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean trackDelayedDelivery(long ledgerId, long entryId, PulsarApi.MessageMetadata msgMetadata) {
        if (!this.isDelayedDeliveryEnabled) {
            return false;
        }
        PersistentDispatcherMultipleConsumers persistentDispatcherMultipleConsumers = this;
        synchronized (persistentDispatcherMultipleConsumers) {
            if (!this.delayedDeliveryTracker.isPresent()) {
                this.delayedDeliveryTracker = Optional.of(this.topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
            }
            return this.delayedDeliveryTracker.get().addMessage(ledgerId, entryId, msgMetadata.getDeliverAtTime());
        }
    }

    private synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
        if (!this.messagesToRedeliver.isEmpty()) {
            return this.messagesToRedeliver.items(maxMessagesToRead, (ledgerId, entryId) -> new PositionImpl(ledgerId, entryId));
        }
        if (this.delayedDeliveryTracker.isPresent() && this.delayedDeliveryTracker.get().hasMessageAvailable()) {
            return this.delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
        }
        return Collections.emptySet();
    }

    @Override
    public synchronized long getNumberOfDelayedMessages() {
        return this.delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L);
    }

    @Override
    public void cursorIsReset() {
        if (this.lastIndividualDeletedRangeFromCursorRecovery != null) {
            this.lastIndividualDeletedRangeFromCursorRecovery = null;
        }
    }

    public PersistentTopic getTopic() {
        return this.topic;
    }

    static enum ReadType {
        Normal,
        Replay;

    }
}

