/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import io.netty.buffer.ByteBuf;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.ScanOutcome;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.AbstractBaseDispatcher;
import org.apache.pulsar.broker.service.AbstractSubscription;
import org.apache.pulsar.broker.service.AnalyzeBacklogResult;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryFilterSupport;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentStreamingDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentStreamingDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionSnapshotCache;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleDisabled;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.stats.PositionInPendingAckStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentSubscription
extends AbstractSubscription
implements Subscription {
    protected final PersistentTopic topic;
    protected final ManagedCursor cursor;
    protected volatile Dispatcher dispatcher;
    protected final String topicName;
    protected final String subName;
    protected final String fullName;
    private static final int FALSE = 0;
    private static final int TRUE = 1;
    private static final AtomicIntegerFieldUpdater<PersistentSubscription> IS_FENCED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentSubscription.class, "isFenced");
    private volatile int isFenced = 0;
    private PersistentMessageExpiryMonitor expiryMonitor;
    private volatile long lastExpireTimestamp = 0L;
    private volatile long lastConsumedFlowTimestamp = 0L;
    private volatile long lastMarkDeleteAdvancedTimestamp = 0L;
    private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
    private static final String REPLICATED_SUBSCRIPTION_PROPERTY = "pulsar.replicated.subscription";
    private static final Map<String, Long> REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = new TreeMap<String, Long>();
    private static final Map<String, Long> NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = Collections.emptyMap();
    private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache;
    private final PendingAckHandle pendingAckHandle;
    private volatile Map<String, String> subscriptionProperties;
    private volatile CompletableFuture<Void> fenceFuture;
    private final AsyncCallbacks.MarkDeleteCallback markDeleteCallback = new AsyncCallbacks.MarkDeleteCallback(){

        public void markDeleteComplete(Object ctx) {
            PositionImpl oldMD = (PositionImpl)ctx;
            PositionImpl newMD = (PositionImpl)PersistentSubscription.this.cursor.getMarkDeletedPosition();
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Mark deleted messages to position {} from position {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, newMD, oldMD});
            }
            PersistentSubscription.this.notifyTheMarkDeletePositionMoveForwardIfNeeded((Position)oldMD);
        }

        public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Failed to mark delete for position {}: {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, ctx, exception});
            }
        }
    };
    private final AsyncCallbacks.DeleteCallback deleteCallback = new AsyncCallbacks.DeleteCallback(){

        public void deleteComplete(Object position) {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Deleted message at {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, position});
            }
            PersistentSubscription.this.notifyTheMarkDeletePositionMoveForwardIfNeeded((Position)((PositionImpl)position));
        }

        public void deleteFailed(ManagedLedgerException exception, Object ctx) {
            log.warn("[{}][{}] Failed to delete message at {}: {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, ctx, exception});
        }
    };
    private static final Logger log;

    static Map<String, Long> getBaseCursorProperties(boolean isReplicated) {
        return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
    }

    static boolean isCursorFromReplicatedSubscription(ManagedCursor cursor) {
        return cursor.getProperties().containsKey(REPLICATED_SUBSCRIPTION_PROPERTY);
    }

    public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor, boolean replicated) {
        this(topic, subscriptionName, cursor, replicated, Collections.emptyMap());
    }

    public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor, boolean replicated, Map<String, String> subscriptionProperties) {
        this.topic = topic;
        this.cursor = cursor;
        this.topicName = topic.getName();
        this.subName = subscriptionName;
        this.fullName = MoreObjects.toStringHelper((Object)this).add("topic", (Object)this.topicName).add("name", (Object)this.subName).toString();
        this.expiryMonitor = new PersistentMessageExpiryMonitor(this.topicName, subscriptionName, cursor, this);
        this.setReplicated(replicated);
        this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties) ? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties);
        this.pendingAckHandle = topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled() && !SystemTopicNames.isEventSystemTopic((TopicName)TopicName.get((String)this.topicName)) ? new PendingAckHandleImpl(this) : new PendingAckHandleDisabled();
        IS_FENCED_UPDATER.set(this, 0);
    }

    public void updateLastMarkDeleteAdvancedTimestamp() {
        this.lastMarkDeleteAdvancedTimestamp = Math.max(this.lastMarkDeleteAdvancedTimestamp, System.currentTimeMillis());
    }

    @Override
    public BrokerInterceptor interceptor() {
        return this.topic.getBrokerService().getInterceptor();
    }

    @Override
    public String getName() {
        return this.subName;
    }

    @Override
    public Topic getTopic() {
        return this.topic;
    }

    @Override
    public boolean isReplicated() {
        return this.replicatedSubscriptionSnapshotCache != null;
    }

    public boolean setReplicated(boolean replicated) {
        ServiceConfiguration config = this.topic.getBrokerService().getPulsar().getConfig();
        if (!replicated || !config.isEnableReplicatedSubscriptions()) {
            this.replicatedSubscriptionSnapshotCache = null;
        } else if (this.replicatedSubscriptionSnapshotCache == null) {
            this.replicatedSubscriptionSnapshotCache = new ReplicatedSubscriptionSnapshotCache(this.subName, config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription());
        }
        if (this.cursor != null) {
            if (replicated) {
                return this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, Long.valueOf(1L));
            }
            return this.cursor.removeProperty(REPLICATED_SUBSCRIPTION_PROPERTY);
        }
        return false;
    }

    @Override
    public CompletableFuture<Void> addConsumer(Consumer consumer) {
        return this.pendingAckHandle.pendingAckHandleFuture().thenCompose(future -> {
            PersistentSubscription persistentSubscription = this;
            synchronized (persistentSubscription) {
                this.cursor.updateLastActive();
                if (IS_FENCED_UPDATER.get(this) == 1) {
                    log.warn("Attempting to add consumer {} on a fenced subscription", (Object)consumer);
                    return FutureUtil.failedFuture((Throwable)new BrokerServiceException.SubscriptionFencedException("Subscription is fenced"));
                }
                if (this.dispatcher == null || !this.dispatcher.isConsumerConnected()) {
                    Dispatcher previousDispatcher = null;
                    boolean useStreamingDispatcher = this.topic.getBrokerService().getPulsar().getConfiguration().isStreamingDispatch();
                    switch (consumer.subType()) {
                        case Exclusive: {
                            if (this.dispatcher != null && this.dispatcher.getType() == CommandSubscribe.SubType.Exclusive) break;
                            previousDispatcher = this.dispatcher;
                            this.dispatcher = useStreamingDispatcher ? new PersistentStreamingDispatcherSingleActiveConsumer(this.cursor, CommandSubscribe.SubType.Exclusive, 0, this.topic, this) : new PersistentDispatcherSingleActiveConsumer(this.cursor, CommandSubscribe.SubType.Exclusive, 0, this.topic, this);
                            break;
                        }
                        case Shared: {
                            if (this.dispatcher != null && this.dispatcher.getType() == CommandSubscribe.SubType.Shared) break;
                            previousDispatcher = this.dispatcher;
                            this.dispatcher = useStreamingDispatcher ? new PersistentStreamingDispatcherMultipleConsumers(this.topic, this.cursor, this) : new PersistentDispatcherMultipleConsumers(this.topic, this.cursor, this);
                            break;
                        }
                        case Failover: {
                            int partitionIndex = TopicName.getPartitionIndex((String)this.topicName);
                            if (partitionIndex < 0) {
                                partitionIndex = -1;
                            }
                            if (this.dispatcher != null && this.dispatcher.getType() == CommandSubscribe.SubType.Failover) break;
                            previousDispatcher = this.dispatcher;
                            this.dispatcher = useStreamingDispatcher ? new PersistentStreamingDispatcherSingleActiveConsumer(this.cursor, CommandSubscribe.SubType.Failover, partitionIndex, this.topic, this) : new PersistentDispatcherSingleActiveConsumer(this.cursor, CommandSubscribe.SubType.Failover, partitionIndex, this.topic, this);
                            break;
                        }
                        case Key_Shared: {
                            KeySharedMeta ksm = consumer.getKeySharedMeta();
                            if (this.dispatcher != null && this.dispatcher.getType() == CommandSubscribe.SubType.Key_Shared && ((PersistentStickyKeyDispatcherMultipleConsumers)this.dispatcher).hasSameKeySharedPolicy(ksm)) break;
                            previousDispatcher = this.dispatcher;
                            this.dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(this.topic, this.cursor, this, this.topic.getBrokerService().getPulsar().getConfiguration(), ksm);
                            break;
                        }
                        default: {
                            return FutureUtil.failedFuture((Throwable)new BrokerServiceException.ServerMetadataException("Unsupported subscription type"));
                        }
                    }
                    if (previousDispatcher != null) {
                        ((CompletableFuture)previousDispatcher.close().thenRun(() -> log.info("[{}][{}] Successfully closed previous dispatcher", (Object)this.topicName, (Object)this.subName))).exceptionally(ex -> {
                            log.error("[{}][{}] Failed to close previous dispatcher", new Object[]{this.topicName, this.subName, ex});
                            return null;
                        });
                    }
                } else if (consumer.subType() != this.dispatcher.getType()) {
                    return FutureUtil.failedFuture((Throwable)new BrokerServiceException.SubscriptionBusyException("Subscription is of different type"));
                }
                return this.dispatcher.addConsumer(consumer);
            }
        });
    }

    @Override
    public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor) throws BrokerServiceException {
        this.cursor.updateLastActive();
        if (this.dispatcher != null) {
            this.dispatcher.removeConsumer(consumer);
        }
        ConsumerStatsImpl stats = consumer.getStats();
        this.bytesOutFromRemovedConsumers.add(stats.bytesOutCounter);
        this.msgOutFromRemovedConsumer.add(stats.msgOutCounter);
        if (this.dispatcher != null && this.dispatcher.getConsumers().isEmpty()) {
            this.deactivateCursor();
            this.topic.getManagedLedger().removeWaitingCursor(this.cursor);
            if (!this.cursor.isDurable()) {
                ((CompletableFuture)this.close().thenRun(() -> {
                    PersistentSubscription persistentSubscription = this;
                    synchronized (persistentSubscription) {
                        if (this.dispatcher != null) {
                            ((CompletableFuture)this.dispatcher.close().thenRun(() -> log.info("[{}][{}] Successfully closed dispatcher for reader", (Object)this.topicName, (Object)this.subName))).exceptionally(ex -> {
                                log.error("[{}][{}] Failed to close dispatcher for reader", new Object[]{this.topicName, this.subName, ex});
                                return null;
                            });
                        }
                    }
                })).exceptionally(exception -> {
                    log.error("[{}][{}] Failed to close subscription for reader", new Object[]{this.topicName, this.subName, exception});
                    return null;
                });
                this.topic.getBrokerService().pulsar().getExecutor().execute(() -> {
                    this.topic.removeSubscription(this.subName);
                    if (!isResetCursor) {
                        try {
                            this.topic.getManagedLedger().deleteCursor(this.cursor.getName());
                        }
                        catch (InterruptedException | ManagedLedgerException e) {
                            log.warn("[{}] [{}] Failed to remove non durable cursor", new Object[]{this.topic.getName(), this.subName, e});
                        }
                    }
                });
            }
        }
        this.topic.decrementUsageCount();
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] [{}] Removed consumer -- count: {}", new Object[]{this.topic.getName(), this.subName, consumer.consumerName(), this.topic.currentUsageCount()});
        }
    }

    public void deactivateCursor() {
        this.cursor.setInactive();
    }

    @Override
    public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
        this.lastConsumedFlowTimestamp = System.currentTimeMillis();
        this.dispatcher.consumerFlow(consumer, additionalNumberOfMessages);
    }

    @Override
    public void acknowledgeMessage(List<Position> positions, CommandAck.AckType ackType, Map<String, Long> properties) {
        this.cursor.updateLastActive();
        Position previousMarkDeletePosition = this.cursor.getMarkDeletedPosition();
        if (ackType == CommandAck.AckType.Cumulative) {
            if (positions.size() != 1) {
                log.warn("[{}][{}] Invalid cumulative ack received with multiple message ids.", (Object)this.topicName, (Object)this.subName);
                return;
            }
            Position position2 = positions.get(0);
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Cumulative ack on {}", new Object[]{this.topicName, this.subName, position2});
            }
            this.cursor.asyncMarkDelete(position2, this.mergeCursorProperties(properties), this.markDeleteCallback, (Object)previousMarkDeletePosition);
        } else {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Individual acks on {}", new Object[]{this.topicName, this.subName, positions});
            }
            this.cursor.asyncDelete(positions, this.deleteCallback, (Object)previousMarkDeletePosition);
            if (this.topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()) {
                positions.forEach(position -> {
                    if (((ManagedCursorImpl)this.cursor).isMessageDeleted(position)) {
                        this.pendingAckHandle.clearIndividualPosition((Position)position);
                    }
                });
            }
            if (this.dispatcher != null) {
                this.dispatcher.getRedeliveryTracker().removeBatch(positions);
            }
        }
        if (!this.cursor.getMarkDeletedPosition().equals(previousMarkDeletePosition)) {
            ReplicatedSubscriptionsSnapshot snapshot;
            this.updateLastMarkDeleteAdvancedTimestamp();
            ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache;
            if (snapshotCache != null && (snapshot = snapshotCache.advancedMarkDeletePosition((PositionImpl)this.cursor.getMarkDeletedPosition())) != null) {
                this.topic.getReplicatedSubscriptionController().ifPresent(c -> c.localSubscriptionUpdated(this.subName, snapshot));
            }
        }
        if (this.topic.getManagedLedger().isTerminated() && this.cursor.getNumberOfEntriesInBacklog(false) == 0L && this.dispatcher != null) {
            AbstractBaseDispatcher.checkAndApplyReachedEndOfTopicOrTopicMigration(this.topic, this.dispatcher.getConsumers());
        }
    }

    public CompletableFuture<Void> transactionIndividualAcknowledge(TxnID txnId, List<MutablePair<PositionImpl, Integer>> positions) {
        return this.pendingAckHandle.individualAcknowledgeMessage(txnId, positions);
    }

    public CompletableFuture<Void> transactionCumulativeAcknowledge(TxnID txnId, List<PositionImpl> positions) {
        return this.pendingAckHandle.cumulativeAcknowledgeMessage(txnId, positions);
    }

    private void notifyTheMarkDeletePositionMoveForwardIfNeeded(Position oldPosition) {
        PositionImpl oldMD = (PositionImpl)oldPosition;
        PositionImpl newMD = (PositionImpl)this.cursor.getMarkDeletedPosition();
        if (this.dispatcher != null && newMD.compareTo(oldMD) > 0) {
            this.dispatcher.markDeletePositionMoveForward();
        }
    }

    public String toString() {
        return this.fullName;
    }

    @Override
    public String getTopicName() {
        return this.topicName;
    }

    @Override
    public CommandSubscribe.SubType getType() {
        return this.dispatcher != null ? this.dispatcher.getType() : null;
    }

    @Override
    public String getTypeString() {
        CommandSubscribe.SubType type = this.getType();
        if (type == null) {
            return "None";
        }
        switch (type) {
            case Exclusive: {
                return "Exclusive";
            }
            case Failover: {
                return "Failover";
            }
            case Shared: {
                return "Shared";
            }
            case Key_Shared: {
                return "Key_Shared";
            }
        }
        return "Null";
    }

    @Override
    public CompletableFuture<AnalyzeBacklogResult> analyzeBacklog(Optional<Position> position) {
        long start = System.currentTimeMillis();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Starting to analyze backlog", (Object)this.topicName, (Object)this.subName);
        }
        AtomicLong entries = new AtomicLong();
        AtomicLong accepted = new AtomicLong();
        AtomicLong rejected = new AtomicLong();
        AtomicLong rescheduled = new AtomicLong();
        AtomicLong messages = new AtomicLong();
        AtomicLong acceptedMessages = new AtomicLong();
        AtomicLong rejectedMessages = new AtomicLong();
        AtomicLong rescheduledMessages = new AtomicLong();
        Position currentPosition = this.cursor.getMarkDeletedPosition();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] currentPosition {}", new Object[]{this.topicName, this.subName, currentPosition});
        }
        EntryFilterSupport entryFilterSupport = this.dispatcher != null ? (EntryFilterSupport)((Object)this.dispatcher) : new EntryFilterSupport(this);
        ServiceConfiguration configuration = this.topic.getBrokerService().getPulsar().getConfiguration();
        long maxEntries = configuration.getSubscriptionBacklogScanMaxEntries();
        long timeOutMs = configuration.getSubscriptionBacklogScanMaxTimeMs();
        int batchSize = configuration.getDispatcherMaxReadBatchSize();
        AtomicReference firstPosition = new AtomicReference();
        AtomicReference lastPosition = new AtomicReference();
        Predicate<Entry> condition = entry -> {
            EntryFilter.FilterResult filterResult;
            if (log.isDebugEnabled()) {
                log.debug("found {}", entry);
            }
            Position entryPosition = entry.getPosition();
            firstPosition.compareAndSet(null, entryPosition);
            lastPosition.set(entryPosition);
            ByteBuf metadataAndPayload = entry.getDataBuffer();
            MessageMetadata messageMetadata = Commands.peekMessageMetadata((ByteBuf)metadataAndPayload, (String)"", (long)-1L);
            int numMessages = 1;
            if (messageMetadata.hasNumMessagesInBatch()) {
                numMessages = messageMetadata.getNumMessagesInBatch();
            }
            if ((filterResult = entryFilterSupport.runFiltersForEntry((Entry)entry, messageMetadata, null)) == null) {
                filterResult = EntryFilter.FilterResult.ACCEPT;
            }
            switch (filterResult) {
                case REJECT: {
                    rejected.incrementAndGet();
                    rejectedMessages.addAndGet(numMessages);
                    break;
                }
                case RESCHEDULE: {
                    rescheduled.incrementAndGet();
                    rescheduledMessages.addAndGet(numMessages);
                    break;
                }
                default: {
                    accepted.incrementAndGet();
                    acceptedMessages.addAndGet(numMessages);
                }
            }
            long num = entries.incrementAndGet();
            messages.addAndGet(numMessages);
            if (num % 1000L == 0L) {
                long end = System.currentTimeMillis();
                log.info("[{}][{}] scan running since {} ms - scanned {} entries", new Object[]{this.topicName, this.subName, end - start, num});
            }
            return true;
        };
        return this.cursor.scan(position, condition, batchSize, maxEntries, timeOutMs).thenApply(outcome -> {
            long end = System.currentTimeMillis();
            AnalyzeBacklogResult result = new AnalyzeBacklogResult();
            result.setFirstPosition((Position)firstPosition.get());
            result.setLastPosition((Position)lastPosition.get());
            result.setEntries(entries.get());
            result.setMessages(messages.get());
            result.setFilterAcceptedEntries(accepted.get());
            result.setFilterAcceptedMessages(acceptedMessages.get());
            result.setFilterRejectedEntries(rejected.get());
            result.setFilterRejectedMessages(rejectedMessages.get());
            result.setFilterRescheduledEntries(rescheduled.get());
            result.setFilterRescheduledMessages(rescheduledMessages.get());
            result.setScanOutcome((ScanOutcome)outcome);
            log.info("[{}][{}] scan took {} ms - {}", new Object[]{this.topicName, this.subName, end - start, result});
            return result;
        });
    }

    @Override
    public CompletableFuture<Void> clearBacklog() {
        final CompletableFuture<Void> future = new CompletableFuture<Void>();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Backlog size before clearing: {}", new Object[]{this.topicName, this.subName, this.cursor.getNumberOfEntriesInBacklog(false)});
        }
        this.cursor.asyncClearBacklog(new AsyncCallbacks.ClearBacklogCallback(){

            public void clearBacklogComplete(Object ctx) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{}] Backlog size after clearing: {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, PersistentSubscription.this.cursor.getNumberOfEntriesInBacklog(false)});
                }
                if (PersistentSubscription.this.dispatcher != null) {
                    PersistentSubscription.this.dispatcher.clearDelayedMessages().whenComplete((__, ex) -> {
                        if (ex != null) {
                            future.completeExceptionally((Throwable)ex);
                        } else {
                            future.complete(null);
                        }
                    });
                } else {
                    future.complete(null);
                }
            }

            public void clearBacklogFailed(ManagedLedgerException exception, Object ctx) {
                log.error("[{}][{}] Failed to clear backlog", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, exception});
                future.completeExceptionally(exception);
            }
        }, null);
        return future;
    }

    @Override
    public CompletableFuture<Void> skipMessages(final int numMessagesToSkip) {
        final CompletableFuture<Void> future = new CompletableFuture<Void>();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Skipping {} messages, current backlog {}", new Object[]{this.topicName, this.subName, numMessagesToSkip, this.cursor.getNumberOfEntriesInBacklog(false)});
        }
        this.cursor.asyncSkipEntries(numMessagesToSkip, ManagedCursor.IndividualDeletedEntries.Exclude, new AsyncCallbacks.SkipEntriesCallback(){

            public void skipEntriesComplete(Object ctx) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{}] Skipped {} messages, new backlog {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, numMessagesToSkip, PersistentSubscription.this.cursor.getNumberOfEntriesInBacklog(false)});
                }
                future.complete(null);
            }

            public void skipEntriesFailed(ManagedLedgerException exception, Object ctx) {
                log.error("[{}][{}] Failed to skip {} messages", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, numMessagesToSkip, exception});
                future.completeExceptionally(exception);
            }
        }, null);
        return future;
    }

    @Override
    public CompletableFuture<Void> resetCursor(final long timestamp) {
        final CompletableFuture<Void> future = new CompletableFuture<Void>();
        PersistentMessageFinder persistentMessageFinder = new PersistentMessageFinder(this.topicName, this.cursor);
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Resetting subscription to timestamp {}", new Object[]{this.topicName, this.subName, timestamp});
        }
        persistentMessageFinder.findMessages(timestamp, new AsyncCallbacks.FindEntryCallback(){

            public void findEntryComplete(Position position, Object ctx) {
                Position finalPosition;
                if (position == null) {
                    finalPosition = PersistentSubscription.this.cursor.getFirstPosition();
                    if (finalPosition == null) {
                        log.warn("[{}][{}] Unable to find position for timestamp {}. Unable to reset cursor to first position", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, timestamp});
                        future.completeExceptionally(new BrokerServiceException.SubscriptionInvalidCursorPosition("Unable to find position for specified timestamp"));
                        return;
                    }
                    log.info("[{}][{}] Unable to find position for timestamp {}. Resetting cursor to first position {} in ledger", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, timestamp, finalPosition});
                } else {
                    finalPosition = position.getNext();
                }
                PersistentSubscription.this.resetCursor(finalPosition, future);
            }

            public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
                if (exception instanceof ManagedLedgerException.ConcurrentFindCursorPositionException) {
                    future.completeExceptionally(new BrokerServiceException.SubscriptionBusyException(exception.getMessage()));
                } else {
                    future.completeExceptionally(new BrokerServiceException(exception));
                }
            }
        });
        return future;
    }

    @Override
    public CompletableFuture<Void> resetCursor(Position position) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.resetCursor(position, future);
        return future;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void resetCursor(final Position finalPosition, final CompletableFuture<Void> future) {
        CompletableFuture<Object> disconnectFuture;
        if (!IS_FENCED_UPDATER.compareAndSet(this, 0, 1)) {
            future.completeExceptionally(new BrokerServiceException.SubscriptionBusyException("Failed to fence subscription"));
            return;
        }
        PersistentSubscription persistentSubscription = this;
        synchronized (persistentSubscription) {
            disconnectFuture = this.dispatcher != null && this.dispatcher.isConsumerConnected() ? this.dispatcher.disconnectActiveConsumers(true) : CompletableFuture.completedFuture(null);
        }
        disconnectFuture.whenComplete((aVoid, throwable) -> {
            if (this.dispatcher != null) {
                this.dispatcher.resetCloseFuture();
            }
            if (throwable != null) {
                log.error("[{}][{}] Failed to disconnect consumer from subscription", new Object[]{this.topicName, this.subName, throwable});
                IS_FENCED_UPDATER.set(this, 0);
                future.completeExceptionally(new BrokerServiceException.SubscriptionBusyException("Failed to disconnect consumers from subscription"));
                return;
            }
            log.info("[{}][{}] Successfully disconnected consumers from subscription, proceeding with cursor reset", (Object)this.topicName, (Object)this.subName);
            try {
                PositionImpl resetTo;
                PositionImpl horizon;
                boolean forceReset = false;
                if (this.topic.getCompactedTopic() != null && this.topic.getCompactedTopic().getCompactionHorizon().isPresent() && (horizon = (PositionImpl)this.topic.getCompactedTopic().getCompactionHorizon().get()).compareTo(resetTo = (PositionImpl)finalPosition) >= 0) {
                    forceReset = true;
                }
                this.cursor.asyncResetCursor(finalPosition, forceReset, new AsyncCallbacks.ResetCursorCallback(){

                    public void resetComplete(Object ctx) {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}][{}] Successfully reset subscription to position {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, finalPosition});
                        }
                        if (PersistentSubscription.this.dispatcher != null) {
                            PersistentSubscription.this.dispatcher.cursorIsReset();
                        }
                        IS_FENCED_UPDATER.set(PersistentSubscription.this, 0);
                        future.complete(null);
                    }

                    public void resetFailed(ManagedLedgerException exception, Object ctx) {
                        log.error("[{}][{}] Failed to reset subscription to position {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, finalPosition, exception});
                        IS_FENCED_UPDATER.set(PersistentSubscription.this, 0);
                        if (exception instanceof ManagedLedgerException.InvalidCursorPositionException) {
                            future.completeExceptionally(new BrokerServiceException.SubscriptionInvalidCursorPosition(exception.getMessage()));
                        } else if (exception instanceof ManagedLedgerException.ConcurrentFindCursorPositionException) {
                            future.completeExceptionally(new BrokerServiceException.SubscriptionBusyException(exception.getMessage()));
                        } else {
                            future.completeExceptionally(new BrokerServiceException(exception));
                        }
                    }
                });
            }
            catch (Exception e) {
                log.error("[{}][{}] Error while resetting cursor", new Object[]{this.topicName, this.subName, e});
                IS_FENCED_UPDATER.set(this, 0);
                future.completeExceptionally(new BrokerServiceException(e));
            }
        });
    }

    @Override
    public CompletableFuture<Entry> peekNthMessage(int messagePosition) {
        final CompletableFuture<Entry> future = new CompletableFuture<Entry>();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Getting message at position {}", new Object[]{this.topicName, this.subName, messagePosition});
        }
        this.cursor.asyncGetNthEntry(messagePosition, ManagedCursor.IndividualDeletedEntries.Exclude, new AsyncCallbacks.ReadEntryCallback(){

            public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
                future.completeExceptionally(exception);
            }

            public void readEntryComplete(Entry entry, Object ctx) {
                future.complete(entry);
            }
        }, null);
        return future;
    }

    @Override
    public long getNumberOfEntriesInBacklog(boolean getPreciseBacklog) {
        return this.cursor.getNumberOfEntriesInBacklog(getPreciseBacklog);
    }

    @Override
    public synchronized Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    public long getNumberOfEntriesSinceFirstNotAckedMessage() {
        return this.cursor.getNumberOfEntriesSinceFirstNotAckedMessage();
    }

    public int getTotalNonContiguousDeletedMessagesRange() {
        return this.cursor.getTotalNonContiguousDeletedMessagesRange();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> close() {
        PersistentSubscription persistentSubscription = this;
        synchronized (persistentSubscription) {
            if (this.dispatcher != null && this.dispatcher.isConsumerConnected()) {
                return FutureUtil.failedFuture((Throwable)new BrokerServiceException.SubscriptionBusyException("Subscription has active consumers"));
            }
            return this.pendingAckHandle.closeAsync().thenAccept(v -> {
                IS_FENCED_UPDATER.set(this, 1);
                log.info("[{}][{}] Successfully closed subscription [{}]", new Object[]{this.topicName, this.subName, this.cursor});
            });
        }
    }

    @Override
    public synchronized CompletableFuture<Void> disconnect() {
        if (this.fenceFuture != null) {
            return this.fenceFuture;
        }
        this.fenceFuture = new CompletableFuture();
        IS_FENCED_UPDATER.set(this, 1);
        ((CompletableFuture)((CompletableFuture)(this.dispatcher != null ? this.dispatcher.close() : CompletableFuture.completedFuture(null)).thenCompose(v -> this.close())).thenRun(() -> {
            log.info("[{}][{}] Successfully disconnected and closed subscription", (Object)this.topicName, (Object)this.subName);
            this.fenceFuture.complete(null);
        })).exceptionally(exception -> {
            log.error("[{}][{}] Error disconnecting consumers from subscription", new Object[]{this.topicName, this.subName, exception});
            this.fenceFuture.completeExceptionally((Throwable)exception);
            this.resumeAfterFence();
            return null;
        });
        return this.fenceFuture;
    }

    public synchronized void resumeAfterFence() {
        if (this.fenceFuture != null) {
            this.fenceFuture.whenComplete((ignore, ignoreEx) -> {
                PersistentSubscription persistentSubscription = this;
                synchronized (persistentSubscription) {
                    try {
                        if (IS_FENCED_UPDATER.compareAndSet(this, 1, 0) && this.dispatcher != null) {
                            this.dispatcher.reset();
                        }
                        this.fenceFuture = null;
                    }
                    catch (Exception ex) {
                        log.error("[{}] Resume subscription [{}] failure", new Object[]{this.topicName, this.subName, ex});
                    }
                }
            });
        }
    }

    @Override
    public CompletableFuture<Void> delete() {
        return this.delete(false);
    }

    @Override
    public CompletableFuture<Void> deleteForcefully() {
        return this.delete(true);
    }

    private CompletableFuture<Void> delete(boolean closeIfConsumersConnected) {
        CompletableFuture<Void> deleteFuture = new CompletableFuture<Void>();
        log.info("[{}][{}] Unsubscribing", (Object)this.topicName, (Object)this.subName);
        CompletableFuture closeSubscriptionFuture = new CompletableFuture();
        if (closeIfConsumersConnected) {
            ((CompletableFuture)this.disconnect().thenRun(() -> closeSubscriptionFuture.complete(null))).exceptionally(ex -> {
                log.error("[{}][{}] Error disconnecting and closing subscription", new Object[]{this.topicName, this.subName, ex});
                closeSubscriptionFuture.completeExceptionally((Throwable)ex);
                return null;
            });
        } else {
            ((CompletableFuture)this.close().thenRun(() -> closeSubscriptionFuture.complete(null))).exceptionally(exception -> {
                log.error("[{}][{}] Error closing subscription", new Object[]{this.topicName, this.subName, exception});
                closeSubscriptionFuture.completeExceptionally((Throwable)exception);
                return null;
            });
        }
        ((CompletableFuture)((CompletableFuture)closeSubscriptionFuture.thenCompose(v -> this.topic.unsubscribe(this.subName))).thenAccept(v -> {
            PersistentSubscription persistentSubscription = this;
            synchronized (persistentSubscription) {
                ((CompletableFuture)(this.dispatcher != null ? this.dispatcher.close() : CompletableFuture.completedFuture(null)).thenRun(() -> {
                    log.info("[{}][{}] Successfully deleted subscription", (Object)this.topicName, (Object)this.subName);
                    deleteFuture.complete(null);
                })).exceptionally(ex -> {
                    IS_FENCED_UPDATER.set(this, 0);
                    if (this.dispatcher != null) {
                        this.dispatcher.reset();
                    }
                    log.error("[{}][{}] Error deleting subscription", new Object[]{this.topicName, this.subName, ex});
                    deleteFuture.completeExceptionally((Throwable)ex);
                    return null;
                });
            }
        })).exceptionally(exception -> {
            IS_FENCED_UPDATER.set(this, 0);
            log.error("[{}][{}] Error deleting subscription", new Object[]{this.topicName, this.subName, exception});
            deleteFuture.completeExceptionally((Throwable)exception);
            return null;
        });
        return deleteFuture;
    }

    @Override
    public CompletableFuture<Void> doUnsubscribe(Consumer consumer) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        try {
            if (this.dispatcher.canUnsubscribe(consumer)) {
                consumer.close();
                return this.delete();
            }
            future.completeExceptionally(new BrokerServiceException.ServerMetadataException("Unconnected or shared consumer attempting to unsubscribe"));
        }
        catch (BrokerServiceException e) {
            log.warn("Error removing consumer {}", (Object)consumer);
            future.completeExceptionally(e);
        }
        return future;
    }

    @Override
    public List<Consumer> getConsumers() {
        Dispatcher dispatcher = this.dispatcher;
        if (dispatcher != null) {
            return dispatcher.getConsumers();
        }
        return Collections.emptyList();
    }

    @Override
    public boolean expireMessages(int messageTTLInSeconds) {
        long backlog = this.getNumberOfEntriesInBacklog(false);
        if (backlog == 0L || this.dispatcher != null && this.dispatcher.isConsumerConnected() && backlog < 1000L && !this.topic.isOldestMessageExpired(this.cursor, messageTTLInSeconds)) {
            return false;
        }
        this.lastExpireTimestamp = System.currentTimeMillis();
        return this.expiryMonitor.expireMessages(messageTTLInSeconds);
    }

    @Override
    public boolean expireMessages(Position position) {
        this.lastExpireTimestamp = System.currentTimeMillis();
        return this.expiryMonitor.expireMessages(position);
    }

    @Override
    public double getExpiredMessageRate() {
        return this.expiryMonitor.getMessageExpiryRate();
    }

    public PersistentMessageExpiryMonitor getExpiryMonitor() {
        return this.expiryMonitor;
    }

    public long estimateBacklogSize() {
        return this.cursor.getEstimatedSizeSinceMarkDeletePosition();
    }

    public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscriptionBacklogSize, boolean getEarliestTimeInBacklog) {
        Consumer activeConsumer;
        SubscriptionStatsImpl subStats = new SubscriptionStatsImpl();
        subStats.lastExpireTimestamp = this.lastExpireTimestamp;
        subStats.lastConsumedFlowTimestamp = this.lastConsumedFlowTimestamp;
        subStats.lastMarkDeleteAdvancedTimestamp = this.lastMarkDeleteAdvancedTimestamp;
        subStats.bytesOutCounter = this.bytesOutFromRemovedConsumers.longValue();
        subStats.msgOutCounter = this.msgOutFromRemovedConsumer.longValue();
        Dispatcher dispatcher = this.dispatcher;
        if (dispatcher != null) {
            Map<Consumer, List<Range>> consumerKeyHashRanges = this.getType() == CommandSubscribe.SubType.Key_Shared ? ((PersistentStickyKeyDispatcherMultipleConsumers)dispatcher).getConsumerKeyHashRanges() : null;
            dispatcher.getConsumers().forEach(consumer -> {
                ConsumerStatsImpl consumerStats = consumer.getStats();
                subStats.consumers.add(consumerStats);
                subStats.msgRateOut += consumerStats.msgRateOut;
                subStats.msgThroughputOut += consumerStats.msgThroughputOut;
                subStats.bytesOutCounter += consumerStats.bytesOutCounter;
                subStats.msgOutCounter += consumerStats.msgOutCounter;
                subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
                subStats.messageAckRate += consumerStats.messageAckRate;
                subStats.chunkedMessageRate = (int)((double)subStats.chunkedMessageRate + consumerStats.chunkedMessageRate);
                subStats.unackedMessages += (long)consumerStats.unackedMessages;
                subStats.lastConsumedTimestamp = Math.max(subStats.lastConsumedTimestamp, consumerStats.lastConsumedTimestamp);
                subStats.lastAckedTimestamp = Math.max(subStats.lastAckedTimestamp, consumerStats.lastAckedTimestamp);
                if (consumerKeyHashRanges != null && consumerKeyHashRanges.containsKey(consumer)) {
                    consumerStats.keyHashRanges = ((List)consumerKeyHashRanges.get(consumer)).stream().map(Range::toString).collect(Collectors.toList());
                }
            });
            subStats.filterProcessedMsgCount = dispatcher.getFilterProcessedMsgCount();
            subStats.filterAcceptedMsgCount = dispatcher.getFilterAcceptedMsgCount();
            subStats.filterRejectedMsgCount = dispatcher.getFilterRejectedMsgCount();
            subStats.filterRescheduledMsgCount = dispatcher.getFilterRescheduledMsgCount();
        }
        CommandSubscribe.SubType subType = this.getType();
        subStats.type = this.getTypeString();
        if (dispatcher instanceof PersistentDispatcherSingleActiveConsumer && (activeConsumer = ((PersistentDispatcherSingleActiveConsumer)dispatcher).getActiveConsumer()) != null) {
            subStats.activeConsumerName = activeConsumer.consumerName();
        }
        if (dispatcher instanceof PersistentDispatcherMultipleConsumers) {
            subStats.delayedMessageIndexSizeInBytes = ((PersistentDispatcherMultipleConsumers)dispatcher).getDelayedTrackerMemoryUsage();
            subStats.bucketDelayedIndexStats = ((PersistentDispatcherMultipleConsumers)dispatcher).getBucketDelayedIndexStats();
        }
        if (Subscription.isIndividualAckMode(subType) && dispatcher instanceof PersistentDispatcherMultipleConsumers) {
            PersistentDispatcherMultipleConsumers d = (PersistentDispatcherMultipleConsumers)dispatcher;
            subStats.unackedMessages = d.getTotalUnackedMessages();
            subStats.blockedSubscriptionOnUnackedMsgs = d.isBlockedDispatcherOnUnackedMsgs();
            subStats.msgDelayed = d.getNumberOfDelayedMessages();
        }
        subStats.msgBacklog = this.getNumberOfEntriesInBacklog(getPreciseBacklog);
        subStats.backlogSize = subscriptionBacklogSize ? ((ManagedLedgerImpl)this.topic.getManagedLedger()).getEstimatedBacklogSize((PositionImpl)this.cursor.getMarkDeletedPosition()) : -1L;
        if (getEarliestTimeInBacklog && subStats.msgBacklog > 0L) {
            ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)this.cursor.getManagedLedger();
            PositionImpl markDeletedPosition = (PositionImpl)this.cursor.getMarkDeletedPosition();
            long result = 0L;
            try {
                result = (Long)managedLedger.getEarliestMessagePublishTimeOfPos(markDeletedPosition).get();
            }
            catch (InterruptedException | ExecutionException e) {
                result = -1L;
            }
            subStats.earliestMsgPublishTimeInBacklog = result;
        }
        subStats.msgBacklogNoDelayed = subStats.msgBacklog - subStats.msgDelayed;
        subStats.msgRateExpired = this.expiryMonitor.getMessageExpiryRate();
        subStats.totalMsgExpired = this.expiryMonitor.getTotalMessageExpired();
        subStats.isReplicated = this.isReplicated();
        subStats.subscriptionProperties = this.subscriptionProperties;
        subStats.isDurable = this.cursor.isDurable();
        if (this.getType() == CommandSubscribe.SubType.Key_Shared && dispatcher instanceof PersistentStickyKeyDispatcherMultipleConsumers) {
            PersistentStickyKeyDispatcherMultipleConsumers keySharedDispatcher = (PersistentStickyKeyDispatcherMultipleConsumers)dispatcher;
            subStats.allowOutOfOrderDelivery = keySharedDispatcher.isAllowOutOfOrderDelivery();
            subStats.keySharedMode = keySharedDispatcher.getKeySharedMode().toString();
            LinkedHashMap<Consumer, PositionImpl> recentlyJoinedConsumers = keySharedDispatcher.getRecentlyJoinedConsumers();
            if (recentlyJoinedConsumers != null && recentlyJoinedConsumers.size() > 0) {
                recentlyJoinedConsumers.forEach((k, v) -> subStats.consumersAfterMarkDeletePosition.put(k.consumerName(), v.toString()));
            }
        }
        subStats.nonContiguousDeletedMessagesRanges = this.cursor.getTotalNonContiguousDeletedMessagesRange();
        subStats.nonContiguousDeletedMessagesRangesSerializedSize = this.cursor.getNonContiguousDeletedMessagesRangeSerializedSize();
        return subStats;
    }

    @Override
    public void redeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch) {
        Dispatcher dispatcher = this.getDispatcher();
        if (dispatcher != null) {
            dispatcher.redeliverUnacknowledgedMessages(consumer, consumerEpoch);
        }
    }

    @Override
    public void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
        Dispatcher dispatcher = this.getDispatcher();
        if (dispatcher != null) {
            dispatcher.redeliverUnacknowledgedMessages(consumer, positions);
        }
    }

    private void trimByMarkDeletePosition(List<PositionImpl> positions) {
        positions.removeIf(position -> this.cursor.getMarkDeletedPosition() != null && position.compareTo((PositionImpl)this.cursor.getMarkDeletedPosition()) <= 0);
    }

    @Override
    public void addUnAckedMessages(int unAckMessages) {
        this.dispatcher.addUnAckedMessages(unAckMessages);
    }

    @Override
    public synchronized long getNumberOfEntriesDelayed() {
        if (this.dispatcher != null) {
            return this.dispatcher.getNumberOfDelayedMessages();
        }
        return 0L;
    }

    @Override
    public void markTopicWithBatchMessagePublished() {
        this.topic.markBatchMessagePublished();
    }

    void topicTerminated() {
        if (this.cursor.getNumberOfEntriesInBacklog(false) == 0L && null != this.dispatcher) {
            AbstractBaseDispatcher.checkAndApplyReachedEndOfTopicOrTopicMigration(this.topic, this.dispatcher.getConsumers());
        }
    }

    @Override
    public Map<String, String> getSubscriptionProperties() {
        return this.subscriptionProperties;
    }

    public PositionImpl getPositionInPendingAck(PositionImpl position) {
        return this.pendingAckHandle.getPositionInPendingAck(position);
    }

    @Override
    public CompletableFuture<Void> updateSubscriptionProperties(Map<String, String> subscriptionProperties) {
        Map<Object, Object> newSubscriptionProperties = subscriptionProperties == null || subscriptionProperties.isEmpty() ? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties);
        return this.cursor.setCursorProperties(newSubscriptionProperties).thenRun(() -> {
            this.subscriptionProperties = newSubscriptionProperties;
        });
    }

    protected Map<String, Long> mergeCursorProperties(Map<String, Long> userProperties) {
        Map<String, Long> baseProperties = PersistentSubscription.getBaseCursorProperties(this.isReplicated());
        if (userProperties.isEmpty()) {
            return baseProperties;
        }
        TreeMap<String, Long> merged = new TreeMap<String, Long>();
        merged.putAll(userProperties);
        merged.putAll(baseProperties);
        return merged;
    }

    @Override
    public void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
        ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache;
        if (snapshotCache != null) {
            snapshotCache.addNewSnapshot(new ReplicatedSubscriptionsSnapshot().copyFrom(snapshot));
        }
    }

    @Override
    public CompletableFuture<Void> endTxn(long txnidMostBits, long txnidLeastBits, int txnAction, long lowWaterMark) {
        TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits);
        if (TxnAction.COMMIT.getValue() == txnAction) {
            return this.pendingAckHandle.commitTxn(txnID, Collections.emptyMap(), lowWaterMark);
        }
        if (TxnAction.ABORT.getValue() == txnAction) {
            Consumer redeliverConsumer = null;
            if (this.getDispatcher() instanceof PersistentDispatcherSingleActiveConsumer) {
                redeliverConsumer = ((PersistentDispatcherSingleActiveConsumer)this.getDispatcher()).getActiveConsumer();
            }
            return this.pendingAckHandle.abortTxn(txnID, redeliverConsumer, lowWaterMark);
        }
        return FutureUtil.failedFuture((Throwable)new BrokerServiceException.NotAllowedException("Unsupported txnAction " + txnAction));
    }

    @VisibleForTesting
    public ManagedCursor getCursor() {
        return this.cursor;
    }

    @VisibleForTesting
    public PendingAckHandle getPendingAckHandle() {
        return this.pendingAckHandle;
    }

    public void syncBatchPositionBitSetForPendingAck(PositionImpl position) {
        this.pendingAckHandle.syncBatchPositionAckSetForTransaction(position);
    }

    public boolean checkIsCanDeleteConsumerPendingAck(PositionImpl position) {
        return this.pendingAckHandle.checkIsCanDeleteConsumerPendingAck(position);
    }

    public TransactionPendingAckStats getTransactionPendingAckStats(boolean lowWaterMarks) {
        return this.pendingAckHandle.getStats(lowWaterMarks);
    }

    public boolean checkAndUnblockIfStuck() {
        return this.dispatcher != null ? this.dispatcher.checkAndUnblockIfStuck() : false;
    }

    public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID) {
        return this.pendingAckHandle.getTransactionInPendingAckStats(txnID);
    }

    public CompletableFuture<ManagedLedger> getPendingAckManageLedger() {
        if (this.pendingAckHandle instanceof PendingAckHandleImpl) {
            return ((PendingAckHandleImpl)this.pendingAckHandle).getStoreManageLedger();
        }
        return FutureUtil.failedFuture((Throwable)new BrokerServiceException.NotAllowedException("Pending ack handle don't use managedLedger!"));
    }

    public boolean checkIfPendingAckStoreInit() {
        return this.pendingAckHandle.checkIfPendingAckStoreInit();
    }

    public PositionInPendingAckStats checkPositionInPendingAckState(PositionImpl position, Integer batchIndex) {
        return this.pendingAckHandle.checkPositionInPendingAckState(position, batchIndex);
    }

    static {
        REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
        log = LoggerFactory.getLogger(PersistentSubscription.class);
    }
}

