/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import com.carrotsearch.hppc.ObjectObjectHashMap;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.StreamingStats;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.MessageDeduplication;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.broker.stats.ReplicationMetrics;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaVersion;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicImpl;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.utils.StatsOutputStream;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentTopic
implements Topic,
AsyncCallbacks.AddEntryCallback {
    private final String topic;
    private final ManagedLedger ledger;
    private final ConcurrentOpenHashSet<Producer> producers;
    private final ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions;
    private final ConcurrentOpenHashMap<String, Replicator> replicators;
    private final BrokerService brokerService;
    private volatile boolean isFenced;
    protected static final AtomicLongFieldUpdater<PersistentTopic> USAGE_COUNT_UPDATER = AtomicLongFieldUpdater.newUpdater(PersistentTopic.class, "usageCount");
    private volatile long usageCount = 0L;
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    public final String replicatorPrefix;
    static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
    private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;
    private static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60L;
    private volatile long lastActive;
    private volatile boolean hasBatchMessagePublished = false;
    private final DispatchRateLimiter dispatchRateLimiter;
    public static final int MESSAGE_RATE_BACKOFF_MS = 1000;
    private final MessageDeduplication messageDeduplication;
    private static final long COMPACTION_NEVER_RUN = -4273917950L;
    CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(-4273917950L);
    final CompactedTopic compactedTopic;
    CompletableFuture<MessageIdImpl> currentOffload = CompletableFuture.completedFuture((MessageIdImpl)MessageId.earliest);
    private volatile boolean isEncryptionRequired = false;
    private static final FastThreadLocal<TopicStatsHelper> threadLocalTopicStats = new FastThreadLocal<TopicStatsHelper>(){

        protected TopicStatsHelper initialValue() {
            return new TopicStatsHelper();
        }
    };
    private static final Logger log = LoggerFactory.getLogger(PersistentTopic.class);

    public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) throws BrokerServiceException.NamingException {
        this.topic = topic;
        this.ledger = ledger;
        this.brokerService = brokerService;
        this.producers = new ConcurrentOpenHashSet(16, 1);
        this.subscriptions = new ConcurrentOpenHashMap(16, 1);
        this.replicators = new ConcurrentOpenHashMap(16, 1);
        this.isFenced = false;
        this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix();
        USAGE_COUNT_UPDATER.set(this, 0L);
        this.dispatchRateLimiter = new DispatchRateLimiter(this);
        this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
        for (ManagedCursor cursor : ledger.getCursors()) {
            if (cursor.getName().startsWith(this.replicatorPrefix)) {
                String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
                String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
                boolean isReplicatorStarted = this.addReplicationCluster(remoteCluster, this, cursor, localCluster);
                if (isReplicatorStarted) continue;
                throw new BrokerServiceException.NamingException(String.valueOf(this.getName()) + " Failed to start replicator " + remoteCluster);
            }
            if (cursor.getName().equals(DEDUPLICATION_CURSOR_NAME)) continue;
            String subscriptionName = Codec.decode((String)cursor.getName());
            this.subscriptions.put((Object)subscriptionName, (Object)this.createPersistentSubscription(subscriptionName, cursor));
            ((PersistentSubscription)this.subscriptions.get((Object)subscriptionName)).deactivateCursor();
        }
        this.lastActive = System.nanoTime();
        this.messageDeduplication = new MessageDeduplication(brokerService.pulsar(), this, ledger);
        try {
            Policies policies = (Policies)brokerService.pulsar().getConfigurationCache().policiesCache().get(AdminResource.path("policies", TopicName.get((String)topic).getNamespace())).orElseThrow(() -> new KeeperException.NoNodeException());
            this.isEncryptionRequired = policies.encryption_required;
        }
        catch (Exception e) {
            log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", (Object)topic, (Object)e.getMessage());
            this.isEncryptionRequired = false;
        }
    }

    private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor) {
        Preconditions.checkNotNull((Object)this.compactedTopic);
        if (subscriptionName.equals("__compaction")) {
            return new CompactorSubscription(this, this.compactedTopic, subscriptionName, cursor);
        }
        return new PersistentSubscription(this, subscriptionName, cursor);
    }

    @Override
    public void publishMessage(ByteBuf headersAndPayload, Topic.PublishContext publishContext) {
        if (this.messageDeduplication.shouldPublishNextMessage(publishContext, headersAndPayload)) {
            this.ledger.asyncAddEntry(headersAndPayload, (AsyncCallbacks.AddEntryCallback)this, (Object)publishContext);
        } else {
            publishContext.completed(null, -1L, -1L);
        }
    }

    public void addComplete(Position pos, Object ctx) {
        Topic.PublishContext publishContext = (Topic.PublishContext)ctx;
        PositionImpl position = (PositionImpl)pos;
        this.messageDeduplication.recordMessagePersisted(publishContext, position);
        publishContext.completed(null, position.getLedgerId(), position.getEntryId());
    }

    public void addFailed(ManagedLedgerException exception, Object ctx) {
        Topic.PublishContext callback = (Topic.PublishContext)ctx;
        if (exception instanceof ManagedLedgerException.ManagedLedgerAlreadyClosedException) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Failed to persist msg in store: {}", (Object)this.topic, (Object)exception.getMessage());
            }
            callback.completed(new BrokerServiceException.TopicClosedException(exception), -1L, -1L);
            return;
        }
        log.warn("[{}] Failed to persist msg in store: {}", (Object)this.topic, (Object)exception.getMessage());
        if (exception instanceof ManagedLedgerException.ManagedLedgerTerminatedException) {
            callback.completed(new BrokerServiceException.TopicTerminatedException(exception), -1L, -1L);
        } else {
            callback.completed(new BrokerServiceException.PersistenceException(exception), -1L, -1L);
        }
        if (exception instanceof ManagedLedgerException.ManagedLedgerFencedException) {
            this.close();
        }
    }

    @Override
    public void addProducer(Producer producer) throws BrokerServiceException {
        Preconditions.checkArgument((producer.getTopic() == this ? 1 : 0) != 0);
        this.lock.readLock().lock();
        try {
            if (this.isFenced) {
                log.warn("[{}] Attempting to add producer to a fenced topic", (Object)this.topic);
                throw new BrokerServiceException.TopicFencedException("Topic is temporarily unavailable");
            }
            if (this.ledger.isTerminated()) {
                log.warn("[{}] Attempting to add producer to a terminated topic", (Object)this.topic);
                throw new BrokerServiceException.TopicTerminatedException("Topic was already terminated");
            }
            if (this.isProducersExceeded()) {
                log.warn("[{}] Attempting to add producer to topic which reached max producers limit", (Object)this.topic);
                throw new BrokerServiceException.ProducerBusyException("Topic reached max producers limit");
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] {} Got request to create producer ", (Object)this.topic, (Object)producer.getProducerName());
            }
            if (!this.producers.add((Object)producer)) {
                throw new BrokerServiceException.NamingException("Producer with name '" + producer.getProducerName() + "' is already connected to topic");
            }
            USAGE_COUNT_UPDATER.incrementAndGet(this);
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Added producer -- count: {}", new Object[]{this.topic, producer.getProducerName(), USAGE_COUNT_UPDATER.get(this)});
            }
            this.messageDeduplication.producerAdded(producer.getProducerName());
            this.startReplProducers();
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    private boolean isProducersExceeded() {
        int maxProducers;
        Policies policies;
        try {
            policies = this.brokerService.pulsar().getConfigurationCache().policiesCache().get(AdminResource.path("policies", TopicName.get((String)this.topic).getNamespace())).orElseGet(() -> new Policies());
        }
        catch (Exception exception) {
            policies = new Policies();
        }
        int n = maxProducers = policies.max_producers_per_topic > 0 ? policies.max_producers_per_topic : this.brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
        return maxProducers > 0 && (long)maxProducers <= this.producers.size();
    }

    private boolean hasLocalProducers() {
        AtomicBoolean foundLocal = new AtomicBoolean(false);
        this.producers.forEach(producer -> {
            if (!producer.isRemote()) {
                foundLocal.set(true);
            }
        });
        return foundLocal.get();
    }

    private boolean hasRemoteProducers() {
        AtomicBoolean foundRemote = new AtomicBoolean(false);
        this.producers.forEach(producer -> {
            if (producer.isRemote()) {
                foundRemote.set(true);
            }
        });
        return foundRemote.get();
    }

    public void startReplProducers() {
        try {
            Policies policies = (Policies)this.brokerService.pulsar().getConfigurationCache().policiesCache().get(AdminResource.path("policies", TopicName.get((String)this.topic).getNamespace())).orElseThrow(() -> new KeeperException.NoNodeException());
            if (policies.replication_clusters != null) {
                TreeSet configuredClusters = Sets.newTreeSet((Iterable)policies.replication_clusters);
                this.replicators.forEach((region, replicator) -> {
                    if (configuredClusters.contains(region)) {
                        replicator.startProducer();
                    }
                });
            }
        }
        catch (Exception e) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Error getting policies while starting repl-producers {}", (Object)this.topic, (Object)e.getMessage());
            }
            this.replicators.forEach((region, replicator) -> replicator.startProducer());
        }
    }

    public CompletableFuture<Void> stopReplProducers() {
        ArrayList closeFutures = Lists.newArrayList();
        this.replicators.forEach((region, replicator) -> {
            boolean bl = closeFutures.add(replicator.disconnect());
        });
        return FutureUtil.waitForAll((List)closeFutures);
    }

    private synchronized CompletableFuture<Void> closeReplProducersIfNoBacklog() {
        ArrayList closeFutures = Lists.newArrayList();
        this.replicators.forEach((region, replicator) -> {
            boolean bl = closeFutures.add(replicator.disconnect(true));
        });
        return FutureUtil.waitForAll((List)closeFutures);
    }

    @Override
    public void removeProducer(Producer producer) {
        Preconditions.checkArgument((producer.getTopic() == this ? 1 : 0) != 0);
        if (this.producers.remove((Object)producer)) {
            USAGE_COUNT_UPDATER.decrementAndGet(this);
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Removed producer -- count: {}", new Object[]{this.topic, producer.getProducerName(), USAGE_COUNT_UPDATER.get(this)});
            }
            this.lastActive = System.nanoTime();
            this.messageDeduplication.producerRemoved(producer.getProducerName());
        }
    }

    @Override
    public CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, long consumerId, PulsarApi.CommandSubscribe.SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId, Map<String, String> metadata, boolean readCompacted, PulsarApi.CommandSubscribe.InitialPosition initialPosition) {
        CompletableFuture<Consumer> future = new CompletableFuture<Consumer>();
        if (readCompacted && subType != PulsarApi.CommandSubscribe.SubType.Failover && subType != PulsarApi.CommandSubscribe.SubType.Exclusive) {
            future.completeExceptionally(new BrokerServiceException.NotAllowedException("readCompacted only allowed on failover or exclusive subscriptions"));
            return future;
        }
        if (StringUtils.isBlank((CharSequence)subscriptionName)) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Empty subscription name", (Object)this.topic);
            }
            future.completeExceptionally(new BrokerServiceException.NamingException("Empty subscription name"));
            return future;
        }
        if (this.hasBatchMessagePublished && !cnx.isBatchMessageCompatibleVersion()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Consumer doesn't support batch-message {}", (Object)this.topic, (Object)subscriptionName);
            }
            future.completeExceptionally(new BrokerServiceException.UnsupportedVersionException("Consumer doesn't support batch-message"));
            return future;
        }
        if (subscriptionName.startsWith(this.replicatorPrefix) || subscriptionName.equals(DEDUPLICATION_CURSOR_NAME)) {
            log.warn("[{}] Failed to create subscription for {}", (Object)this.topic, (Object)subscriptionName);
            future.completeExceptionally(new BrokerServiceException.NamingException("Subscription with reserved subscription name attempted"));
            return future;
        }
        this.lock.readLock().lock();
        try {
            if (this.isFenced) {
                log.warn("[{}] Attempting to subscribe to a fenced topic", (Object)this.topic);
                future.completeExceptionally(new BrokerServiceException.TopicFencedException("Topic is temporarily unavailable"));
                CompletableFuture<Consumer> completableFuture = future;
                return completableFuture;
            }
            USAGE_COUNT_UPDATER.incrementAndGet(this);
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] [{}] Added consumer -- count: {}", new Object[]{this.topic, subscriptionName, consumerName, USAGE_COUNT_UPDATER.get(this)});
            }
        }
        finally {
            this.lock.readLock().unlock();
        }
        CompletableFuture<Subscription> subscriptionFuture = isDurable ? this.getDurableSubscription(subscriptionName, initialPosition) : this.getNonDurableSubscription(subscriptionName, startMessageId);
        int maxUnackedMessages = isDurable ? this.brokerService.pulsar().getConfiguration().getMaxUnackedMessagesPerConsumer() : 0;
        ((CompletableFuture)subscriptionFuture.thenAccept(subscription -> {
            try {
                Consumer consumer = new Consumer((Subscription)subscription, subType, this.topic, consumerId, priorityLevel, consumerName, maxUnackedMessages, cnx, cnx.getRole(), metadata, readCompacted, initialPosition);
                subscription.addConsumer(consumer);
                if (!cnx.isActive()) {
                    consumer.close();
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", new Object[]{this.topic, subscriptionName, consumer.consumerName(), USAGE_COUNT_UPDATER.get(this)});
                    }
                    future.completeExceptionally(new BrokerServiceException("Connection was closed while the opening the cursor "));
                } else {
                    log.info("[{}][{}] Created new subscription for {}", new Object[]{this.topic, subscriptionName, consumerId});
                    future.complete(consumer);
                }
            }
            catch (BrokerServiceException e) {
                if (e instanceof BrokerServiceException.ConsumerBusyException) {
                    log.warn("[{}][{}] Consumer {} {} already connected", new Object[]{this.topic, subscriptionName, consumerId, consumerName});
                } else if (e instanceof BrokerServiceException.SubscriptionBusyException) {
                    log.warn("[{}][{}] {}", new Object[]{this.topic, subscriptionName, e.getMessage()});
                }
                USAGE_COUNT_UPDATER.decrementAndGet(this);
                future.completeExceptionally(e);
            }
        })).exceptionally(ex -> {
            log.warn("[{}] Failed to create subscription for {}: ", new Object[]{this.topic, subscriptionName, ex.getMessage()});
            USAGE_COUNT_UPDATER.decrementAndGet(this);
            future.completeExceptionally(new BrokerServiceException.PersistenceException((Throwable)ex));
            return null;
        });
        return future;
    }

    private CompletableFuture<Subscription> getDurableSubscription(final String subscriptionName, PulsarApi.CommandSubscribe.InitialPosition initialPosition) {
        final CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<Subscription>();
        this.ledger.asyncOpenCursor(Codec.encode((String)subscriptionName), initialPosition, new AsyncCallbacks.OpenCursorCallback(){

            public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{}] Opened cursor", (Object)PersistentTopic.this.topic, (Object)subscriptionName);
                }
                subscriptionFuture.complete((Subscription)PersistentTopic.this.subscriptions.computeIfAbsent((Object)subscriptionName, name -> PersistentTopic.this.createPersistentSubscription(subscriptionName, cursor)));
            }

            public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
                log.warn("[{}] Failed to create subscription for {}: {}", new Object[]{PersistentTopic.this.topic, subscriptionName, exception.getMessage()});
                USAGE_COUNT_UPDATER.decrementAndGet(PersistentTopic.this);
                subscriptionFuture.completeExceptionally(new BrokerServiceException.PersistenceException((Throwable)exception));
                if (exception instanceof ManagedLedgerException.ManagedLedgerFencedException) {
                    PersistentTopic.this.close();
                }
            }
        }, null);
        return subscriptionFuture;
    }

    private CompletableFuture<? extends Subscription> getNonDurableSubscription(String subscriptionName, MessageId startMessageId) {
        CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<Subscription>();
        log.info("[{}][{}] Creating non-durable subscription at msg id {}", new Object[]{this.topic, subscriptionName, startMessageId});
        Subscription subscription = (Subscription)this.subscriptions.computeIfAbsent((Object)subscriptionName, name -> {
            MessageIdImpl msgId = startMessageId != null ? (MessageIdImpl)startMessageId : (MessageIdImpl)MessageId.latest;
            long ledgerId = msgId.getLedgerId();
            long entryId = msgId.getEntryId();
            if (msgId instanceof BatchMessageIdImpl && ((BatchMessageIdImpl)msgId).getBatchIndex() >= 0) {
                entryId = msgId.getEntryId() - 1L;
            }
            PositionImpl startPosition = new PositionImpl(ledgerId, entryId);
            ManagedCursor cursor = null;
            try {
                cursor = this.ledger.newNonDurableCursor((Position)startPosition);
            }
            catch (ManagedLedgerException e) {
                subscriptionFuture.completeExceptionally(e);
            }
            return new PersistentSubscription(this, subscriptionName, cursor);
        });
        if (!subscriptionFuture.isDone()) {
            subscriptionFuture.complete(subscription);
        } else {
            this.subscriptions.remove((Object)subscriptionName);
        }
        return subscriptionFuture;
    }

    @Override
    public CompletableFuture<Subscription> createSubscription(String subscriptionName, PulsarApi.CommandSubscribe.InitialPosition initialPosition) {
        return this.getDurableSubscription(subscriptionName, initialPosition);
    }

    @Override
    public CompletableFuture<Void> unsubscribe(final String subscriptionName) {
        final CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<Void>();
        this.ledger.asyncDeleteCursor(Codec.encode((String)subscriptionName), new AsyncCallbacks.DeleteCursorCallback(){

            public void deleteCursorComplete(Object ctx) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{}] Cursor deleted successfully", (Object)PersistentTopic.this.topic, (Object)subscriptionName);
                }
                PersistentTopic.this.subscriptions.remove((Object)subscriptionName);
                unsubscribeFuture.complete(null);
                PersistentTopic.this.lastActive = System.nanoTime();
            }

            public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{}] Error deleting cursor for subscription", new Object[]{PersistentTopic.this.topic, subscriptionName, exception});
                }
                unsubscribeFuture.completeExceptionally(new BrokerServiceException.PersistenceException((Throwable)exception));
            }
        }, null);
        return unsubscribeFuture;
    }

    void removeSubscription(String subscriptionName) {
        this.subscriptions.remove((Object)subscriptionName);
    }

    @Override
    public CompletableFuture<Void> delete() {
        return this.delete(false);
    }

    private CompletableFuture<Void> delete(boolean failIfHasSubscriptions) {
        return this.delete(failIfHasSubscriptions, false);
    }

    @Override
    public CompletableFuture<Void> deleteForcefully() {
        return this.delete(false, true);
    }

    private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean closeIfClientsConnected) {
        final CompletableFuture<Void> deleteFuture = new CompletableFuture<Void>();
        this.lock.writeLock().lock();
        try {
            if (this.isFenced) {
                log.warn("[{}] Topic is already being closed or deleted", (Object)this.topic);
                deleteFuture.completeExceptionally(new BrokerServiceException.TopicFencedException("Topic is already fenced"));
                CompletableFuture<Void> completableFuture = deleteFuture;
                return completableFuture;
            }
            CompletableFuture closeClientFuture = new CompletableFuture();
            if (closeIfClientsConnected) {
                ArrayList futures = Lists.newArrayList();
                this.replicators.forEach((cluster, replicator) -> {
                    boolean bl = futures.add(replicator.disconnect());
                });
                this.producers.forEach(producer -> {
                    boolean bl = futures.add(producer.disconnect());
                });
                this.subscriptions.forEach((s, sub) -> {
                    boolean bl = futures.add(sub.disconnect());
                });
                ((CompletableFuture)FutureUtil.waitForAll((List)futures).thenRun(() -> closeClientFuture.complete(null))).exceptionally(ex -> {
                    log.error("[{}] Error closing clients", (Object)this.topic, ex);
                    this.isFenced = false;
                    closeClientFuture.completeExceptionally((Throwable)ex);
                    return null;
                });
            } else {
                closeClientFuture.complete(null);
            }
            ((CompletableFuture)closeClientFuture.thenAccept(delete -> {
                if (USAGE_COUNT_UPDATER.get(this) == 0L) {
                    this.isFenced = true;
                    ArrayList futures = Lists.newArrayList();
                    if (failIfHasSubscriptions) {
                        if (!this.subscriptions.isEmpty()) {
                            this.isFenced = false;
                            deleteFuture.completeExceptionally(new BrokerServiceException.TopicBusyException("Topic has subscriptions"));
                            return;
                        }
                    } else {
                        this.subscriptions.forEach((s, sub) -> {
                            boolean bl = futures.add(sub.delete());
                        });
                    }
                    FutureUtil.waitForAll((List)futures).whenComplete((v, ex) -> {
                        if (ex != null) {
                            log.error("[{}] Error deleting topic", (Object)this.topic, ex);
                            this.isFenced = false;
                            deleteFuture.completeExceptionally((Throwable)ex);
                        } else {
                            this.ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback(){

                                public void deleteLedgerComplete(Object ctx) {
                                    PersistentTopic.this.brokerService.removeTopicFromCache(PersistentTopic.this.topic);
                                    log.info("[{}] Topic deleted", (Object)PersistentTopic.this.topic);
                                    deleteFuture.complete(null);
                                }

                                public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
                                    PersistentTopic.this.isFenced = false;
                                    log.error("[{}] Error deleting topic", (Object)PersistentTopic.this.topic, (Object)exception);
                                    deleteFuture.completeExceptionally(new BrokerServiceException.PersistenceException((Throwable)exception));
                                }
                            }, null);
                        }
                    });
                } else {
                    deleteFuture.completeExceptionally(new BrokerServiceException.TopicBusyException("Topic has " + USAGE_COUNT_UPDATER.get(this) + " connected producers/consumers"));
                }
            })).exceptionally(ex -> {
                deleteFuture.completeExceptionally(new BrokerServiceException.TopicBusyException("Failed to close clients before deleting topic."));
                return null;
            });
        }
        finally {
            this.lock.writeLock().unlock();
        }
        return deleteFuture;
    }

    @Override
    public CompletableFuture<Void> close() {
        CompletableFuture<Void> closeFuture;
        block4: {
            closeFuture = new CompletableFuture<Void>();
            this.lock.writeLock().lock();
            try {
                if (!this.isFenced) {
                    this.isFenced = true;
                    break block4;
                }
                log.warn("[{}] Topic is already being closed or deleted", (Object)this.topic);
                closeFuture.completeExceptionally(new BrokerServiceException.TopicFencedException("Topic is already fenced"));
                CompletableFuture<Void> completableFuture = closeFuture;
                return completableFuture;
            }
            finally {
                this.lock.writeLock().unlock();
            }
        }
        ArrayList futures = Lists.newArrayList();
        this.replicators.forEach((cluster, replicator) -> {
            boolean bl = futures.add(replicator.disconnect());
        });
        this.producers.forEach(producer -> {
            boolean bl = futures.add(producer.disconnect());
        });
        this.subscriptions.forEach((s, sub) -> {
            boolean bl = futures.add(sub.disconnect());
        });
        ((CompletableFuture)FutureUtil.waitForAll((List)futures).thenRun(() -> {
            this.ledger.asyncClose(new AsyncCallbacks.CloseCallback(){

                public void closeComplete(Object ctx) {
                    PersistentTopic.this.brokerService.removeTopicFromCache(PersistentTopic.this.topic);
                    log.info("[{}] Topic closed", (Object)PersistentTopic.this.topic);
                    closeFuture.complete(null);
                }

                public void closeFailed(ManagedLedgerException exception, Object ctx) {
                    log.error("[{}] Failed to close managed ledger, proceeding anyway.", (Object)PersistentTopic.this.topic, (Object)exception);
                    PersistentTopic.this.brokerService.removeTopicFromCache(PersistentTopic.this.topic);
                    closeFuture.complete(null);
                }
            }, null);
            this.dispatchRateLimiter.close();
        })).exceptionally(exception -> {
            log.error("[{}] Error closing topic", (Object)this.topic, exception);
            this.isFenced = false;
            closeFuture.completeExceptionally((Throwable)exception);
            return null;
        });
        return closeFuture;
    }

    private CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        ((CompletableFuture)this.checkReplication().thenAccept(res -> {
            log.info("[{}] Policies updated successfully", (Object)this.topic);
            result.complete(null);
        })).exceptionally(th -> {
            log.error("[{}] Policies update failed {}, scheduled retry in {} seconds", new Object[]{this.topic, th.getMessage(), 60L, th});
            this.brokerService.executor().schedule(this::checkReplicationAndRetryOnFailure, 60L, TimeUnit.SECONDS);
            result.completeExceptionally((Throwable)th);
            return null;
        });
        return result;
    }

    public CompletableFuture<Void> checkDeduplicationStatus() {
        return this.messageDeduplication.checkStatus();
    }

    private CompletableFuture<Void> checkPersistencePolicies() {
        TopicName topicName = TopicName.get((String)this.topic);
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        ((CompletableFuture)this.brokerService.getManagedLedgerConfig(topicName).thenAccept(config -> {
            this.ledger.setConfig(config);
            future.complete(null);
        })).exceptionally(ex -> {
            log.warn("[{}] Failed to update persistence-policies {}", (Object)this.topic, (Object)ex.getMessage());
            future.completeExceptionally((Throwable)ex);
            return null;
        });
        return future;
    }

    @Override
    public CompletableFuture<Void> checkReplication() {
        TopicName name = TopicName.get((String)this.topic);
        if (!name.isGlobal()) {
            return CompletableFuture.completedFuture(null);
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Checking replication status", (Object)name);
        }
        Policies policies = null;
        try {
            policies = (Policies)this.brokerService.pulsar().getConfigurationCache().policiesCache().get(AdminResource.path("policies", name.getNamespace())).orElseThrow(() -> new KeeperException.NoNodeException());
        }
        catch (Exception e) {
            CompletableFuture<Void> future = new CompletableFuture<Void>();
            future.completeExceptionally(new BrokerServiceException.ServerMetadataException(e));
            return future;
        }
        int newMessageTTLinSeconds = policies.message_ttl_in_seconds;
        TreeSet configuredClusters = policies.replication_clusters != null ? Sets.newTreeSet((Iterable)policies.replication_clusters) : Collections.emptySet();
        String localCluster = this.brokerService.pulsar().getConfiguration().getClusterName();
        if (TopicName.get((String)this.topic).isGlobal() && !configuredClusters.contains(localCluster)) {
            log.info("Deleting topic [{}] because local cluster is not part of global namespace repl list {}", (Object)configuredClusters);
            return this.deleteForcefully();
        }
        ArrayList futures = Lists.newArrayList();
        for (String cluster2 : configuredClusters) {
            if (cluster2.equals(localCluster) || this.replicators.containsKey((Object)cluster2)) continue;
            futures.add(this.startReplicator(cluster2));
        }
        this.replicators.forEach((cluster, replicator) -> {
            ((PersistentReplicator)replicator).updateMessageTTL(newMessageTTLinSeconds);
            if (!cluster.equals(localCluster) && !configuredClusters.contains(cluster)) {
                futures.add(this.removeReplicator((String)cluster));
            }
        });
        return FutureUtil.waitForAll((List)futures);
    }

    @Override
    public void checkMessageExpiry() {
        block3: {
            TopicName name = TopicName.get((String)this.topic);
            try {
                Policies policies = (Policies)this.brokerService.pulsar().getConfigurationCache().policiesCache().get(AdminResource.path("policies", name.getNamespace())).orElseThrow(() -> new KeeperException.NoNodeException());
                if (policies.message_ttl_in_seconds != 0) {
                    this.subscriptions.forEach((subName, sub) -> sub.expireMessages(policies.message_ttl_in_seconds));
                    this.replicators.forEach((region, replicator) -> ((PersistentReplicator)replicator).expireMessages(policies.message_ttl_in_seconds));
                }
            }
            catch (Exception exception) {
                if (!log.isDebugEnabled()) break block3;
                log.debug("[{}] Error getting policies", (Object)this.topic);
            }
        }
    }

    @Override
    public void checkMessageDeduplicationInfo() {
        this.messageDeduplication.purgeInactiveProducers();
    }

    public void checkCompaction() {
        TopicName name = TopicName.get((String)this.topic);
        try {
            Policies policies = (Policies)this.brokerService.pulsar().getConfigurationCache().policiesCache().get(AdminResource.path("policies", name.getNamespace())).orElseThrow(() -> new KeeperException.NoNodeException());
            if (policies.compaction_threshold != 0L && this.currentCompaction.isDone()) {
                long backlogEstimate = 0L;
                PersistentSubscription compactionSub = (PersistentSubscription)this.subscriptions.get((Object)"__compaction");
                backlogEstimate = compactionSub != null ? compactionSub.estimateBacklogSize() : this.ledger.getEstimatedBacklogSize();
                if (backlogEstimate > policies.compaction_threshold) {
                    try {
                        this.triggerCompaction();
                    }
                    catch (BrokerServiceException.AlreadyRunningException alreadyRunningException) {
                        log.debug("[{}] Compaction already running, so don't trigger again, even though backlog({}) is over threshold({})", new Object[]{name, backlogEstimate, policies.compaction_threshold});
                    }
                }
            }
        }
        catch (Exception exception) {
            log.debug("[{}] Error getting policies", (Object)this.topic);
        }
    }

    CompletableFuture<Void> startReplicator(final String remoteCluster) {
        log.info("[{}] Starting replicator to remote: {}", (Object)this.topic, (Object)remoteCluster);
        final CompletableFuture<Void> future = new CompletableFuture<Void>();
        String name = PersistentReplicator.getReplicatorName(this.replicatorPrefix, remoteCluster);
        this.ledger.asyncOpenCursor(name, new AsyncCallbacks.OpenCursorCallback(){

            public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                String localCluster = PersistentTopic.this.brokerService.pulsar().getConfiguration().getClusterName();
                boolean isReplicatorStarted = PersistentTopic.this.addReplicationCluster(remoteCluster, PersistentTopic.this, cursor, localCluster);
                if (isReplicatorStarted) {
                    future.complete(null);
                } else {
                    future.completeExceptionally(new BrokerServiceException.NamingException(String.valueOf(PersistentTopic.this.getName()) + " Failed to start replicator " + remoteCluster));
                }
            }

            public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
                future.completeExceptionally(new BrokerServiceException.PersistenceException((Throwable)exception));
            }
        }, null);
        return future;
    }

    protected boolean addReplicationCluster(String remoteCluster, PersistentTopic persistentTopic, ManagedCursor cursor, String localCluster) {
        AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
        this.replicators.computeIfAbsent((Object)remoteCluster, r -> {
            try {
                return new PersistentReplicator(this, cursor, localCluster, remoteCluster, this.brokerService);
            }
            catch (BrokerServiceException.NamingException namingException) {
                isReplicatorStarted.set(false);
                log.error("[{}] Replicator startup failed due to partitioned-topic {}", (Object)this.topic, (Object)remoteCluster);
                return null;
            }
        });
        if (!isReplicatorStarted.get()) {
            this.replicators.remove((Object)remoteCluster);
        }
        return isReplicatorStarted.get();
    }

    CompletableFuture<Void> removeReplicator(final String remoteCluster) {
        log.info("[{}] Removing replicator to {}", (Object)this.topic, (Object)remoteCluster);
        final CompletableFuture<Void> future = new CompletableFuture<Void>();
        final String name = PersistentReplicator.getReplicatorName(this.replicatorPrefix, remoteCluster);
        ((CompletableFuture)((Replicator)this.replicators.get((Object)remoteCluster)).disconnect().thenRun(() -> this.ledger.asyncDeleteCursor(name, new AsyncCallbacks.DeleteCursorCallback(){

            public void deleteCursorComplete(Object ctx) {
                PersistentTopic.this.replicators.remove((Object)remoteCluster);
                future.complete(null);
            }

            public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
                log.error("[{}] Failed to delete cursor {} {}", new Object[]{PersistentTopic.this.topic, name, exception.getMessage(), exception});
                future.completeExceptionally(new BrokerServiceException.PersistenceException((Throwable)exception));
            }
        }, null))).exceptionally(e -> {
            log.error("[{}] Failed to close replication producer {} {}", new Object[]{this.topic, name, e.getMessage(), e});
            future.completeExceptionally((Throwable)e);
            return null;
        });
        return future;
    }

    public boolean isDeduplicationEnabled() {
        return this.messageDeduplication.isEnabled();
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).add("topic", (Object)this.topic).toString();
    }

    @Override
    public ConcurrentOpenHashSet<Producer> getProducers() {
        return this.producers;
    }

    public int getNumberOfConsumers() {
        int count = 0;
        for (PersistentSubscription subscription : this.subscriptions.values()) {
            count += ((CopyOnWriteArrayList)subscription.getConsumers()).size();
        }
        return count;
    }

    public ConcurrentOpenHashMap<String, PersistentSubscription> getSubscriptions() {
        return this.subscriptions;
    }

    @Override
    public PersistentSubscription getSubscription(String subscriptionName) {
        return (PersistentSubscription)this.subscriptions.get((Object)subscriptionName);
    }

    public BrokerService getBrokerService() {
        return this.brokerService;
    }

    public ConcurrentOpenHashMap<String, Replicator> getReplicators() {
        return this.replicators;
    }

    public Replicator getPersistentReplicator(String remoteCluster) {
        return (Replicator)this.replicators.get((Object)remoteCluster);
    }

    @Override
    public String getName() {
        return this.topic;
    }

    public ManagedLedger getManagedLedger() {
        return this.ledger;
    }

    @Override
    public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats, StatsOutputStream topicStatsStream, ClusterReplicationMetrics replStats, String namespace, boolean hydratePublishers) {
        TopicStatsHelper topicStatsHelper = (TopicStatsHelper)threadLocalTopicStats.get();
        topicStatsHelper.reset();
        this.replicators.forEach((region, replicator) -> replicator.updateRates());
        nsStats.producerCount = (int)((long)nsStats.producerCount + this.producers.size());
        bundleStats.producerCount = (int)((long)bundleStats.producerCount + this.producers.size());
        topicStatsStream.startObject(this.topic);
        topicStatsStream.startList("publishers");
        this.producers.forEach(producer -> {
            producer.updateRates();
            PublisherStats publisherStats = producer.getStats();
            topicStatsHelper.aggMsgRateIn += publisherStats.msgRateIn;
            topicStatsHelper.aggMsgThroughputIn += publisherStats.msgThroughputIn;
            if (producer.isRemote()) {
                topicStatsHelper.remotePublishersStats.put((Object)producer.getRemoteCluster(), (Object)publisherStats);
            }
            if (hydratePublishers) {
                StreamingStats.writePublisherStats(topicStatsStream, publisherStats);
            }
        });
        topicStatsStream.endList();
        topicStatsStream.startObject("replication");
        nsStats.replicatorCount += topicStatsHelper.remotePublishersStats.size();
        this.replicators.forEach((cluster, replicator) -> {
            try {
                ((PersistentReplicator)replicator).updateCursorState();
            }
            catch (Exception e) {
                log.warn("[{}] Failed to update cursro state ", (Object)this.topic, (Object)e);
            }
            ReplicatorStats rStat = replicator.getStats();
            PublisherStats pubStats = (PublisherStats)topicStatsHelper.remotePublishersStats.get((Object)replicator.getRemoteCluster());
            if (pubStats != null) {
                rStat.msgRateIn = pubStats.msgRateIn;
                rStat.msgThroughputIn = pubStats.msgThroughputIn;
                rStat.inboundConnection = pubStats.getAddress();
                rStat.inboundConnectedSince = pubStats.getConnectedSince();
            }
            topicStatsHelper.aggMsgRateOut += rStat.msgRateOut;
            topicStatsHelper.aggMsgThroughputOut += rStat.msgThroughputOut;
            topicStatsStream.startObject((String)cluster);
            topicStatsStream.writePair("connected", rStat.connected);
            topicStatsStream.writePair("msgRateExpired", rStat.msgRateExpired);
            topicStatsStream.writePair("msgRateIn", rStat.msgRateIn);
            topicStatsStream.writePair("msgRateOut", rStat.msgRateOut);
            topicStatsStream.writePair("msgThroughputIn", rStat.msgThroughputIn);
            topicStatsStream.writePair("msgThroughputOut", rStat.msgThroughputOut);
            topicStatsStream.writePair("replicationBacklog", rStat.replicationBacklog);
            topicStatsStream.writePair("replicationDelayInSeconds", rStat.replicationDelayInSeconds);
            topicStatsStream.writePair("inboundConnection", rStat.inboundConnection);
            topicStatsStream.writePair("inboundConnectedSince", rStat.inboundConnectedSince);
            topicStatsStream.writePair("outboundConnection", rStat.outboundConnection);
            topicStatsStream.writePair("outboundConnectedSince", rStat.outboundConnectedSince);
            topicStatsStream.endObject();
            namespaceStats.msgReplBacklog += (double)rStat.replicationBacklog;
            if (replStats.isMetricsEnabled()) {
                String namespaceClusterKey = replStats.getKeyName(namespace, (String)cluster);
                ReplicationMetrics replicationMetrics = replStats.get(namespaceClusterKey);
                boolean update = false;
                if (replicationMetrics == null) {
                    replicationMetrics = ReplicationMetrics.get();
                    update = true;
                }
                replicationMetrics.connected = replicationMetrics.connected + (rStat.connected ? 1 : 0);
                replicationMetrics.msgRateOut += rStat.msgRateOut;
                replicationMetrics.msgThroughputOut += rStat.msgThroughputOut;
                replicationMetrics.msgReplBacklog += (double)rStat.replicationBacklog;
                if (update) {
                    replStats.put(namespaceClusterKey, replicationMetrics);
                }
            }
        });
        topicStatsStream.endObject();
        topicStatsStream.startObject("subscriptions");
        nsStats.subsCount = (int)((long)nsStats.subsCount + this.subscriptions.size());
        this.subscriptions.forEach((subscriptionName, subscription) -> {
            double subMsgRateOut = 0.0;
            double subMsgThroughputOut = 0.0;
            double subMsgRateRedeliver = 0.0;
            try {
                topicStatsStream.startObject((String)subscriptionName);
                T[] consumers = ((org.apache.pulsar.utils.CopyOnWriteArrayList)subscription.getConsumers()).array();
                namespaceStats.consumerCount += consumers.length;
                namespaceBundleStats.consumerCount += consumers.length;
                topicStatsStream.startList("consumers");
                T[] TArray = consumers;
                int n = consumers.length;
                int n2 = 0;
                while (n2 < n) {
                    Object consumerObj = TArray[n2];
                    Consumer consumer = (Consumer)consumerObj;
                    consumer.updateRates();
                    ConsumerStats consumerStats = consumer.getStats();
                    subMsgRateOut += consumerStats.msgRateOut;
                    subMsgThroughputOut += consumerStats.msgThroughputOut;
                    subMsgRateRedeliver += consumerStats.msgRateRedeliver;
                    StreamingStats.writeConsumerStats(topicStatsStream, subscription.getType(), consumerStats);
                    ++n2;
                }
                topicStatsStream.endList();
                topicStatsStream.writePair("msgBacklog", subscription.getNumberOfEntriesInBacklog());
                topicStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate());
                topicStatsStream.writePair("msgRateOut", subMsgRateOut);
                topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
                topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
                topicStatsStream.writePair("numberOfEntriesSinceFirstNotAckedMessage", subscription.getNumberOfEntriesSinceFirstNotAckedMessage());
                topicStatsStream.writePair("totalNonContiguousDeletedMessagesRange", subscription.getTotalNonContiguousDeletedMessagesRange());
                topicStatsStream.writePair("type", subscription.getTypeString());
                if (PulsarApi.CommandSubscribe.SubType.Shared.equals((Object)subscription.getType()) && subscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
                    PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers)subscription.getDispatcher();
                    topicStatsStream.writePair("blockedSubscriptionOnUnackedMsgs", dispatcher.isBlockedDispatcherOnUnackedMsgs());
                    topicStatsStream.writePair("unackedMessages", dispatcher.getTotalUnackedMessages());
                }
                topicStatsStream.endObject();
                topicStatsHelper.aggMsgRateOut += subMsgRateOut;
                topicStatsHelper.aggMsgThroughputOut += subMsgThroughputOut;
                namespaceStats.msgBacklog += (double)subscription.getNumberOfEntriesInBacklog();
            }
            catch (Exception e) {
                log.error("Got exception when creating consumer stats for subscription {}: {}", new Object[]{subscriptionName, e.getMessage(), e});
            }
        });
        topicStatsStream.endObject();
        topicStatsHelper.averageMsgSize = topicStatsHelper.aggMsgRateIn == 0.0 ? 0.0 : topicStatsHelper.aggMsgThroughputIn / topicStatsHelper.aggMsgRateIn;
        topicStatsStream.writePair("producerCount", this.producers.size());
        topicStatsStream.writePair("averageMsgSize", topicStatsHelper.averageMsgSize);
        topicStatsStream.writePair("msgRateIn", topicStatsHelper.aggMsgRateIn);
        topicStatsStream.writePair("msgRateOut", topicStatsHelper.aggMsgRateOut);
        topicStatsStream.writePair("msgThroughputIn", topicStatsHelper.aggMsgThroughputIn);
        topicStatsStream.writePair("msgThroughputOut", topicStatsHelper.aggMsgThroughputOut);
        topicStatsStream.writePair("storageSize", this.ledger.getEstimatedBacklogSize());
        topicStatsStream.writePair("pendingAddEntriesCount", ((ManagedLedgerImpl)this.ledger).getPendingAddEntriesCount());
        nsStats.msgRateIn += topicStatsHelper.aggMsgRateIn;
        nsStats.msgRateOut += topicStatsHelper.aggMsgRateOut;
        nsStats.msgThroughputIn += topicStatsHelper.aggMsgThroughputIn;
        nsStats.msgThroughputOut += topicStatsHelper.aggMsgThroughputOut;
        nsStats.storageSize += (double)this.ledger.getEstimatedBacklogSize();
        bundleStats.msgRateIn += topicStatsHelper.aggMsgRateIn;
        bundleStats.msgRateOut += topicStatsHelper.aggMsgRateOut;
        bundleStats.msgThroughputIn += topicStatsHelper.aggMsgThroughputIn;
        bundleStats.msgThroughputOut += topicStatsHelper.aggMsgThroughputOut;
        bundleStats.cacheSize += ((ManagedLedgerImpl)this.ledger).getCacheSize();
        topicStatsStream.endObject();
    }

    @Override
    public TopicStats getStats() {
        TopicStats stats = new TopicStats();
        ObjectObjectHashMap remotePublishersStats = new ObjectObjectHashMap();
        this.producers.forEach(producer -> {
            PublisherStats publisherStats = producer.getStats();
            topicStats.msgRateIn += publisherStats.msgRateIn;
            topicStats.msgThroughputIn += publisherStats.msgThroughputIn;
            if (producer.isRemote()) {
                remotePublishersStats.put((Object)producer.getRemoteCluster(), (Object)publisherStats);
            } else {
                topicStats.publishers.add(publisherStats);
            }
        });
        stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 : stats.msgThroughputIn / stats.msgRateIn;
        this.subscriptions.forEach((name, subscription) -> {
            SubscriptionStats subStats = subscription.getStats();
            topicStats.msgRateOut += subStats.msgRateOut;
            topicStats.msgThroughputOut += subStats.msgThroughputOut;
            topicStats.subscriptions.put(name, subStats);
        });
        this.replicators.forEach((cluster, replicator) -> {
            ReplicatorStats replicatorStats = replicator.getStats();
            PublisherStats pubStats = (PublisherStats)remotePublishersStats.get((Object)replicator.getRemoteCluster());
            if (pubStats != null) {
                replicatorStats.msgRateIn = pubStats.msgRateIn;
                replicatorStats.msgThroughputIn = pubStats.msgThroughputIn;
                replicatorStats.inboundConnection = pubStats.getAddress();
                replicatorStats.inboundConnectedSince = pubStats.getConnectedSince();
            }
            topicStats.msgRateOut += replicatorStats.msgRateOut;
            topicStats.msgThroughputOut += replicatorStats.msgThroughputOut;
            topicStats.replication.put(replicator.getRemoteCluster(), replicatorStats);
        });
        stats.storageSize = this.ledger.getEstimatedBacklogSize();
        stats.deduplicationStatus = this.messageDeduplication.getStatus().toString();
        return stats;
    }

    @Override
    public PersistentTopicInternalStats getInternalStats() {
        PersistentTopicInternalStats stats = new PersistentTopicInternalStats();
        ManagedLedgerImpl ml = (ManagedLedgerImpl)this.ledger;
        stats.entriesAddedCounter = ml.getEntriesAddedCounter();
        stats.numberOfEntries = ml.getNumberOfEntries();
        stats.totalSize = ml.getTotalSize();
        stats.currentLedgerEntries = ml.getCurrentLedgerEntries();
        stats.currentLedgerSize = ml.getCurrentLedgerSize();
        stats.lastLedgerCreatedTimestamp = DateFormatter.format((long)ml.getLastLedgerCreatedTimestamp());
        if (ml.getLastLedgerCreationFailureTimestamp() != 0L) {
            stats.lastLedgerCreationFailureTimestamp = DateFormatter.format((long)ml.getLastLedgerCreationFailureTimestamp());
        }
        stats.waitingCursorsCount = ml.getWaitingCursorsCount();
        stats.pendingAddEntriesCount = ml.getPendingAddEntriesCount();
        stats.lastConfirmedEntry = ml.getLastConfirmedEntry().toString();
        stats.state = ml.getState().toString();
        stats.ledgers = Lists.newArrayList();
        ml.getLedgersInfo().forEach((id, li) -> {
            PersistentTopicInternalStats.LedgerInfo info = new PersistentTopicInternalStats.LedgerInfo();
            info.ledgerId = li.getLedgerId();
            info.entries = li.getEntries();
            info.size = li.getSize();
            info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
            persistentTopicInternalStats.ledgers.add(info);
        });
        stats.cursors = Maps.newTreeMap();
        ml.getCursors().forEach(c -> {
            ManagedCursorImpl cursor = (ManagedCursorImpl)c;
            PersistentTopicInternalStats.CursorStats cs = new PersistentTopicInternalStats.CursorStats();
            cs.markDeletePosition = cursor.getMarkDeletedPosition().toString();
            cs.readPosition = cursor.getReadPosition().toString();
            cs.waitingReadOp = cursor.hasPendingReadRequest();
            cs.pendingReadOps = cursor.getPendingReadOpsCount();
            cs.messagesConsumedCounter = cursor.getMessagesConsumedCounter();
            cs.cursorLedger = cursor.getCursorLedger();
            cs.cursorLedgerLastEntry = cursor.getCursorLedgerLastEntry();
            cs.individuallyDeletedMessages = cursor.getIndividuallyDeletedMessages();
            cs.lastLedgerSwitchTimestamp = DateFormatter.format((long)cursor.getLastLedgerSwitchTimestamp());
            cs.state = cursor.getState();
            cs.numberOfEntriesSinceFirstNotAckedMessage = cursor.getNumberOfEntriesSinceFirstNotAckedMessage();
            cs.totalNonContiguousDeletedMessagesRange = cursor.getTotalNonContiguousDeletedMessagesRange();
            cs.properties = cursor.getProperties();
            persistentTopicInternalStats.cursors.put(cursor.getName(), cs);
        });
        return stats;
    }

    public long getBacklogSize() {
        return this.ledger.getEstimatedBacklogSize();
    }

    public boolean isActive() {
        if (TopicName.get((String)this.topic).isGlobal()) {
            return !this.subscriptions.isEmpty() || this.hasLocalProducers();
        }
        return USAGE_COUNT_UPDATER.get(this) != 0L || !this.subscriptions.isEmpty();
    }

    @Override
    public void checkGC(int gcIntervalInSeconds) {
        if (this.isActive()) {
            this.lastActive = System.nanoTime();
        } else {
            if (System.nanoTime() - this.lastActive < TimeUnit.SECONDS.toNanos(gcIntervalInSeconds)) {
                return;
            }
            if (this.shouldTopicBeRetained()) {
                return;
            }
            CompletableFuture replCloseFuture = new CompletableFuture();
            if (TopicName.get((String)this.topic).isGlobal()) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Global topic inactive for {} seconds, closing repl producers.", (Object)this.topic, (Object)gcIntervalInSeconds);
                }
                ((CompletableFuture)this.closeReplProducersIfNoBacklog().thenRun(() -> {
                    if (this.hasRemoteProducers()) {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] Global topic has connected remote producers. Not a candidate for GC", (Object)this.topic);
                        }
                        replCloseFuture.completeExceptionally(new BrokerServiceException.TopicBusyException("Topic has connected remote producers"));
                    } else {
                        log.info("[{}] Global topic inactive for {} seconds, closed repl producers", (Object)this.topic, (Object)gcIntervalInSeconds);
                        replCloseFuture.complete(null);
                    }
                })).exceptionally(e -> {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Global topic has replication backlog. Not a candidate for GC", (Object)this.topic);
                    }
                    replCloseFuture.completeExceptionally(e.getCause());
                    return null;
                });
            } else {
                replCloseFuture.complete(null);
            }
            ((CompletableFuture)((CompletableFuture)replCloseFuture.thenCompose(v -> this.delete(true))).thenRun(() -> log.info("[{}] Topic deleted successfully due to inactivity", (Object)this.topic))).exceptionally(e -> {
                if (e.getCause() instanceof BrokerServiceException.TopicBusyException) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Did not delete busy topic: {}", (Object)this.topic, (Object)e.getCause().getMessage());
                    }
                } else {
                    log.warn("[{}] Inactive topic deletion failed", (Object)this.topic, e);
                }
                return null;
            });
        }
    }

    @Override
    public void checkInactiveSubscriptions() {
        long expirationTime = TimeUnit.MINUTES.toMillis(this.brokerService.pulsar().getConfiguration().getSubscriptionExpirationTimeMinutes());
        if (expirationTime <= 0L) {
            return;
        }
        this.subscriptions.forEach((subName, sub) -> {
            if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected()) {
                return;
            }
            if (System.currentTimeMillis() - sub.cursor.getLastActive() > expirationTime) {
                sub.delete().thenAccept(v -> log.info("[{}][{}] The subscription was deleted due to expiration", (Object)this.topic, subName));
            }
        });
    }

    private boolean shouldTopicBeRetained() {
        TopicName name = TopicName.get((String)this.topic);
        try {
            Optional policies = this.brokerService.pulsar().getConfigurationCache().policiesCache().get(AdminResource.path("policies", name.getNamespace()));
            return policies.map(p -> p.retention_policies).map(rp -> {
                long retentionTime = TimeUnit.MINUTES.toNanos(rp.getRetentionTimeInMinutes());
                if (retentionTime >= 0L && System.nanoTime() - this.lastActive >= retentionTime) {
                    return false;
                }
                return true;
            }).orElse(false);
        }
        catch (Exception exception) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Error getting policies", (Object)this.topic);
            }
            return true;
        }
    }

    @Override
    public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] isEncryptionRequired changes: {} -> {}", new Object[]{this.topic, this.isEncryptionRequired, data.encryption_required});
        }
        this.isEncryptionRequired = data.encryption_required;
        this.producers.forEach(producer -> {
            producer.checkPermissions();
            producer.checkEncryption();
        });
        this.subscriptions.forEach((subName, sub) -> {
            ((CopyOnWriteArrayList)sub.getConsumers()).forEach(Consumer::checkPermissions);
            if (sub.getDispatcher().getRateLimiter() != null) {
                sub.getDispatcher().getRateLimiter().onPoliciesUpdate(data);
            }
        });
        this.checkMessageExpiry();
        CompletableFuture<Void> replicationFuture = this.checkReplicationAndRetryOnFailure();
        CompletableFuture<Void> dedupFuture = this.checkDeduplicationStatus();
        CompletableFuture<Void> persistentPoliciesFuture = this.checkPersistencePolicies();
        this.dispatchRateLimiter.onPoliciesUpdate(data);
        return CompletableFuture.allOf(replicationFuture, dedupFuture, persistentPoliciesFuture);
    }

    @Override
    public BacklogQuota getBacklogQuota() {
        TopicName topicName = TopicName.get((String)this.getName());
        String namespace = topicName.getNamespace();
        String policyPath = AdminResource.path("policies", namespace);
        BacklogQuota backlogQuota = this.brokerService.getBacklogQuotaManager().getBacklogQuota(namespace, policyPath);
        return backlogQuota;
    }

    @Override
    public boolean isBacklogQuotaExceeded(String producerName) {
        BacklogQuota backlogQuota = this.getBacklogQuota();
        if (backlogQuota != null) {
            BacklogQuota.RetentionPolicy retentionPolicy = backlogQuota.getPolicy();
            if ((retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold || retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) && this.brokerService.isBacklogExceeded(this)) {
                log.info("[{}] Backlog quota exceeded. Cannot create producer [{}]", (Object)this.getName(), (Object)producerName);
                return true;
            }
            return false;
        }
        return false;
    }

    @Override
    public boolean isEncryptionRequired() {
        return this.isEncryptionRequired;
    }

    @Override
    public boolean isReplicated() {
        return !this.replicators.isEmpty();
    }

    public CompletableFuture<MessageId> terminate() {
        final CompletableFuture<MessageId> future = new CompletableFuture<MessageId>();
        this.ledger.asyncTerminate(new AsyncCallbacks.TerminateCallback(){

            public void terminateComplete(Position lastCommittedPosition, Object ctx) {
                PersistentTopic.this.producers.forEach(Producer::disconnect);
                PersistentTopic.this.subscriptions.forEach((name, sub) -> sub.topicTerminated());
                PositionImpl lastPosition = (PositionImpl)lastCommittedPosition;
                MessageIdImpl messageId = new MessageIdImpl(lastPosition.getLedgerId(), lastPosition.getEntryId(), -1);
                log.info("[{}] Topic terminated at {}", (Object)PersistentTopic.this.getName(), (Object)messageId);
                future.complete(messageId);
            }

            public void terminateFailed(ManagedLedgerException exception, Object ctx) {
                future.completeExceptionally((Throwable)exception);
            }
        }, null);
        return future;
    }

    public boolean isOldestMessageExpired(ManagedCursor cursor, long messageTTLInSeconds) {
        MessageImpl msg = null;
        Entry entry = null;
        boolean isOldestMessageExpired = false;
        try {
            try {
                entry = cursor.getNthEntry(1, ManagedCursor.IndividualDeletedEntries.Include);
                if (entry != null) {
                    msg = MessageImpl.deserialize((ByteBuf)entry.getDataBuffer());
                    isOldestMessageExpired = messageTTLInSeconds != 0L && System.currentTimeMillis() > msg.getPublishTime() + TimeUnit.SECONDS.toMillis((long)((double)messageTTLInSeconds * 1.5));
                }
            }
            catch (Exception e) {
                log.warn("[{}] Error while getting the oldest message", (Object)this.topic, (Object)e);
                if (entry != null) {
                    entry.release();
                }
                if (msg != null) {
                    msg.recycle();
                }
            }
        }
        finally {
            if (entry != null) {
                entry.release();
            }
            if (msg != null) {
                msg.recycle();
            }
        }
        return isOldestMessageExpired;
    }

    public CompletableFuture<Void> clearBacklog() {
        log.info("[{}] Clearing backlog on all cursors in the topic.", (Object)this.topic);
        ArrayList futures = Lists.newArrayList();
        List cursors = this.getSubscriptions().keys();
        cursors.addAll(this.getReplicators().keys());
        for (String cursor : cursors) {
            futures.add(this.clearBacklog(cursor));
        }
        return FutureUtil.waitForAll((List)futures);
    }

    public CompletableFuture<Void> clearBacklog(String cursorName) {
        log.info("[{}] Clearing backlog for cursor {} in the topic.", (Object)this.topic, (Object)cursorName);
        PersistentSubscription sub = this.getSubscription(cursorName);
        if (sub != null) {
            return sub.clearBacklog();
        }
        PersistentReplicator repl = (PersistentReplicator)this.getPersistentReplicator(cursorName);
        if (repl != null) {
            return repl.clearBacklog();
        }
        return FutureUtil.failedFuture((Throwable)new BrokerServiceException("Cursor not found"));
    }

    public void markBatchMessagePublished() {
        this.hasBatchMessagePublished = true;
    }

    public DispatchRateLimiter getDispatchRateLimiter() {
        return this.dispatchRateLimiter;
    }

    public long getLastPublishedSequenceId(String producerName) {
        return this.messageDeduplication.getLastPublishedSequenceId(producerName);
    }

    @Override
    public Position getLastMessageId() {
        return this.ledger.getLastConfirmedEntry();
    }

    public synchronized void triggerCompaction() throws PulsarServerException, BrokerServiceException.AlreadyRunningException {
        if (!this.currentCompaction.isDone()) {
            throw new BrokerServiceException.AlreadyRunningException("Compaction already in progress");
        }
        this.currentCompaction = this.brokerService.pulsar().getCompactor().compact(this.topic);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized LongRunningProcessStatus compactionStatus() {
        CompletableFuture<Long> current;
        PersistentTopic persistentTopic = this;
        synchronized (persistentTopic) {
            current = this.currentCompaction;
        }
        if (!current.isDone()) {
            return LongRunningProcessStatus.forStatus((LongRunningProcessStatus.Status)LongRunningProcessStatus.Status.RUNNING);
        }
        try {
            if (current.join() == -4273917950L) {
                return LongRunningProcessStatus.forStatus((LongRunningProcessStatus.Status)LongRunningProcessStatus.Status.NOT_RUN);
            }
            return LongRunningProcessStatus.forStatus((LongRunningProcessStatus.Status)LongRunningProcessStatus.Status.SUCCESS);
        }
        catch (CancellationException | CompletionException e) {
            return LongRunningProcessStatus.forError((String)e.getMessage());
        }
    }

    public synchronized void triggerOffload(MessageIdImpl messageId) throws BrokerServiceException.AlreadyRunningException {
        if (!this.currentOffload.isDone()) {
            throw new BrokerServiceException.AlreadyRunningException("Offload already in progress");
        }
        this.currentOffload = new CompletableFuture();
        final CompletableFuture promise = this.currentOffload;
        this.getManagedLedger().asyncOffloadPrefix((Position)PositionImpl.get((long)messageId.getLedgerId(), (long)messageId.getEntryId()), new AsyncCallbacks.OffloadCallback(){

            public void offloadComplete(Position pos, Object ctx) {
                PositionImpl impl = (PositionImpl)pos;
                promise.complete(new MessageIdImpl(impl.getLedgerId(), impl.getEntryId(), -1));
            }

            public void offloadFailed(ManagedLedgerException exception, Object ctx) {
                promise.completeExceptionally((Throwable)exception);
            }
        }, null);
    }

    public synchronized OffloadProcessStatus offloadStatus() {
        if (!this.currentOffload.isDone()) {
            return OffloadProcessStatus.forStatus((LongRunningProcessStatus.Status)LongRunningProcessStatus.Status.RUNNING);
        }
        try {
            if (this.currentOffload.join() == MessageId.earliest) {
                return OffloadProcessStatus.forStatus((LongRunningProcessStatus.Status)LongRunningProcessStatus.Status.NOT_RUN);
            }
            return OffloadProcessStatus.forSuccess((MessageIdImpl)this.currentOffload.join());
        }
        catch (CancellationException | CompletionException e) {
            return OffloadProcessStatus.forError((String)e.getMessage());
        }
    }

    @Override
    public CompletableFuture<Boolean> hasSchema() {
        String base = TopicName.get((String)this.getName()).getPartitionedTopicName();
        String id = TopicName.get((String)base).getSchemaName();
        return this.brokerService.pulsar().getSchemaRegistryService().getSchema(id).thenApply(schema -> schema != null);
    }

    @Override
    public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
        if (schema == null) {
            return CompletableFuture.completedFuture(SchemaVersion.Empty);
        }
        String base = TopicName.get((String)this.getName()).getPartitionedTopicName();
        String id = TopicName.get((String)base).getSchemaName();
        return this.brokerService.pulsar().getSchemaRegistryService().putSchemaIfAbsent(id, schema);
    }

    @Override
    public CompletableFuture<Boolean> isSchemaCompatible(SchemaData schema) {
        String base = TopicName.get((String)this.getName()).getPartitionedTopicName();
        String id = TopicName.get((String)base).getSchemaName();
        return this.brokerService.pulsar().getSchemaRegistryService().isCompatibleWithLatestVersion(id, schema);
    }

    @Override
    public CompletableFuture<Boolean> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
        return this.hasSchema().thenCompose(hasSchema -> {
            if (hasSchema.booleanValue() || this.isActive() || this.ledger.getTotalSize() != 0L) {
                return this.isSchemaCompatible(schema);
            }
            return this.addSchema(schema).thenApply(ignore -> true);
        });
    }

    private static class TopicStatsHelper {
        public double averageMsgSize;
        public double aggMsgRateIn;
        public double aggMsgThroughputIn;
        public double aggMsgRateOut;
        public double aggMsgThroughputOut;
        public final ObjectObjectHashMap<String, PublisherStats> remotePublishersStats = new ObjectObjectHashMap();

        public TopicStatsHelper() {
            this.reset();
        }

        public void reset() {
            this.averageMsgSize = 0.0;
            this.aggMsgRateIn = 0.0;
            this.aggMsgThroughputIn = 0.0;
            this.aggMsgRateOut = 0.0;
            this.aggMsgThroughputOut = 0.0;
            this.remotePublishersStats.clear();
        }
    }
}

