/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.nonpersistent;

import com.carrotsearch.hppc.ObjectObjectHashMap;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.EntryCacheManager;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSubscription;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats;
import org.apache.pulsar.common.policies.data.NonPersistentReplicatorStats;
import org.apache.pulsar.common.policies.data.NonPersistentSubscriptionStats;
import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.utils.CopyOnWriteArrayList;
import org.apache.pulsar.utils.StatsOutputStream;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NonPersistentTopic
implements Topic {
    private final String topic;
    private final ConcurrentOpenHashSet<Producer> producers;
    private final ConcurrentOpenHashMap<String, NonPersistentSubscription> subscriptions;
    private final ConcurrentOpenHashMap<String, NonPersistentReplicator> replicators;
    private final BrokerService brokerService;
    private volatile boolean isFenced;
    public final String replicatorPrefix;
    protected static final AtomicLongFieldUpdater<NonPersistentTopic> USAGE_COUNT_UPDATER = AtomicLongFieldUpdater.newUpdater(NonPersistentTopic.class, "usageCount");
    private volatile long usageCount = 0L;
    private final OrderedSafeExecutor executor;
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private volatile long lastActive;
    private volatile boolean hasBatchMessagePublished = false;
    static final AtomicLongFieldUpdater<NonPersistentTopic> ENTRIES_ADDED_COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(NonPersistentTopic.class, "entriesAddedCounter");
    private volatile long entriesAddedCounter = 0L;
    private static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60L;
    private static final FastThreadLocal<TopicStats> threadLocalTopicStats = new FastThreadLocal<TopicStats>(){

        protected TopicStats initialValue() {
            return new TopicStats();
        }
    };
    private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class);

    public NonPersistentTopic(String topic, BrokerService brokerService) {
        this.topic = topic;
        this.brokerService = brokerService;
        this.producers = new ConcurrentOpenHashSet(16, 1);
        this.subscriptions = new ConcurrentOpenHashMap(16, 1);
        this.replicators = new ConcurrentOpenHashMap(16, 1);
        this.isFenced = false;
        this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix();
        this.executor = brokerService.getTopicOrderedExecutor();
        USAGE_COUNT_UPDATER.set(this, 0L);
        this.lastActive = System.nanoTime();
    }

    @Override
    public void publishMessage(ByteBuf data, Topic.PublishContext callback) {
        AtomicInteger msgDeliveryCount = new AtomicInteger(2);
        ENTRIES_ADDED_COUNTER_UPDATER.incrementAndGet(this);
        data.retain(2);
        this.executor.submitOrdered((Object)this.topic, SafeRun.safeRun(() -> {
            this.subscriptions.forEach((name, subscription) -> {
                ByteBuf duplicateBuffer = data.retainedDuplicate();
                Entry entry = EntryCacheManager.create((long)0L, (long)0L, (ByteBuf)duplicateBuffer);
                duplicateBuffer.release();
                if (subscription.getDispatcher() != null) {
                    subscription.getDispatcher().sendMessages(Lists.newArrayList((Object[])new Entry[]{entry}));
                } else {
                    entry.release();
                }
            });
            data.release();
            if (msgDeliveryCount.decrementAndGet() == 0) {
                callback.completed(null, 0L, 0L);
            }
        }));
        this.executor.submitOrdered((Object)this.topic, SafeRun.safeRun(() -> {
            this.replicators.forEach((name, replicator) -> {
                ByteBuf duplicateBuffer = data.retainedDuplicate();
                Entry entry = EntryCacheManager.create((long)0L, (long)0L, (ByteBuf)duplicateBuffer);
                duplicateBuffer.release();
                replicator.sendMessage(entry);
            });
            data.release();
            if (msgDeliveryCount.decrementAndGet() == 0) {
                callback.completed(null, 0L, 0L);
            }
        }));
    }

    @Override
    public void addProducer(Producer producer) throws BrokerServiceException {
        Preconditions.checkArgument((producer.getTopic() == this ? 1 : 0) != 0);
        this.lock.readLock().lock();
        try {
            if (this.isFenced) {
                log.warn("[{}] Attempting to add producer to a fenced topic", (Object)this.topic);
                throw new BrokerServiceException.TopicFencedException("Topic is temporarily unavailable");
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] {} Got request to create producer ", (Object)this.topic, (Object)producer.getProducerName());
            }
            if (!this.producers.add((Object)producer)) {
                throw new BrokerServiceException.NamingException("Producer with name '" + producer.getProducerName() + "' is already connected to topic");
            }
            USAGE_COUNT_UPDATER.incrementAndGet(this);
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Added producer -- count: {}", new Object[]{this.topic, producer.getProducerName(), USAGE_COUNT_UPDATER.get(this)});
            }
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    @Override
    public void checkMessageDeduplicationInfo() {
    }

    private boolean hasLocalProducers() {
        AtomicBoolean foundLocal = new AtomicBoolean(false);
        this.producers.forEach(producer -> {
            if (!producer.isRemote()) {
                foundLocal.set(true);
            }
        });
        return foundLocal.get();
    }

    private boolean hasRemoteProducers() {
        AtomicBoolean foundRemote = new AtomicBoolean(false);
        this.producers.forEach(producer -> {
            if (producer.isRemote()) {
                foundRemote.set(true);
            }
        });
        return foundRemote.get();
    }

    @Override
    public void removeProducer(Producer producer) {
        Preconditions.checkArgument((producer.getTopic() == this ? 1 : 0) != 0);
        if (this.producers.remove((Object)producer)) {
            USAGE_COUNT_UPDATER.decrementAndGet(this);
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Removed producer -- count: {}", new Object[]{this.topic, producer.getProducerName(), USAGE_COUNT_UPDATER.get(this)});
            }
            this.lastActive = System.nanoTime();
        }
    }

    @Override
    public CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, long consumerId, PulsarApi.CommandSubscribe.SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId) {
        CompletableFuture<Consumer> future = new CompletableFuture<Consumer>();
        if (this.hasBatchMessagePublished && !cnx.isBatchMessageCompatibleVersion()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Consumer doesn't support batch-message {}", (Object)this.topic, (Object)subscriptionName);
            }
            future.completeExceptionally(new BrokerServiceException.UnsupportedVersionException("Consumer doesn't support batch-message"));
            return future;
        }
        if (subscriptionName.startsWith(this.replicatorPrefix)) {
            log.warn("[{}] Failed to create subscription for {}", (Object)this.topic, (Object)subscriptionName);
            future.completeExceptionally(new BrokerServiceException.NamingException("Subscription with reserved subscription name attempted"));
            return future;
        }
        this.lock.readLock().lock();
        try {
            if (this.isFenced) {
                log.warn("[{}] Attempting to subscribe to a fenced topic", (Object)this.topic);
                future.completeExceptionally(new BrokerServiceException.TopicFencedException("Topic is temporarily unavailable"));
                CompletableFuture<Consumer> completableFuture = future;
                return completableFuture;
            }
            USAGE_COUNT_UPDATER.incrementAndGet(this);
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] [{}] Added consumer -- count: {}", new Object[]{this.topic, subscriptionName, consumerName, USAGE_COUNT_UPDATER.get(this)});
            }
        }
        finally {
            this.lock.readLock().unlock();
        }
        NonPersistentSubscription subscription = (NonPersistentSubscription)this.subscriptions.computeIfAbsent((Object)subscriptionName, name -> new NonPersistentSubscription(this, subscriptionName));
        try {
            Consumer consumer = new Consumer(subscription, subType, this.topic, consumerId, priorityLevel, consumerName, 0, cnx, cnx.getRole());
            subscription.addConsumer(consumer);
            if (!cnx.isActive()) {
                consumer.close();
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", new Object[]{this.topic, subscriptionName, consumer.consumerName(), USAGE_COUNT_UPDATER.get(this)});
                }
                future.completeExceptionally(new BrokerServiceException("Connection was closed while the opening the cursor "));
            } else {
                log.info("[{}][{}] Created new subscription for {}", new Object[]{this.topic, subscriptionName, consumerId});
                future.complete(consumer);
            }
        }
        catch (BrokerServiceException e) {
            if (e instanceof BrokerServiceException.ConsumerBusyException) {
                log.warn("[{}][{}] Consumer {} {} already connected", new Object[]{this.topic, subscriptionName, consumerId, consumerName});
            } else if (e instanceof BrokerServiceException.SubscriptionBusyException) {
                log.warn("[{}][{}] {}", new Object[]{this.topic, subscriptionName, e.getMessage()});
            }
            USAGE_COUNT_UPDATER.decrementAndGet(this);
            future.completeExceptionally(e);
        }
        return future;
    }

    @Override
    public CompletableFuture<Subscription> createSubscription(String subscriptionName) {
        return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName));
    }

    void removeSubscription(String subscriptionName) {
        this.subscriptions.remove((Object)subscriptionName);
    }

    @Override
    public CompletableFuture<Void> delete() {
        return this.delete(false);
    }

    private CompletableFuture<Void> delete(boolean failIfHasSubscriptions) {
        CompletableFuture<Void> deleteFuture = new CompletableFuture<Void>();
        this.lock.writeLock().lock();
        try {
            if (this.isFenced) {
                log.warn("[{}] Topic is already being closed or deleted", (Object)this.topic);
                deleteFuture.completeExceptionally(new BrokerServiceException.TopicFencedException("Topic is already fenced"));
                CompletableFuture<Void> completableFuture = deleteFuture;
                return completableFuture;
            }
            if (USAGE_COUNT_UPDATER.get(this) == 0L) {
                this.isFenced = true;
                ArrayList futures = Lists.newArrayList();
                if (failIfHasSubscriptions) {
                    if (!this.subscriptions.isEmpty()) {
                        this.isFenced = false;
                        deleteFuture.completeExceptionally(new BrokerServiceException.TopicBusyException("Topic has subscriptions"));
                        CompletableFuture<Void> completableFuture = deleteFuture;
                        return completableFuture;
                    }
                } else {
                    this.subscriptions.forEach((s, sub) -> {
                        boolean bl = futures.add(sub.delete());
                    });
                }
                FutureUtil.waitForAll((List)futures).whenComplete((v, ex) -> {
                    if (ex != null) {
                        log.error("[{}] Error deleting topic", (Object)this.topic, ex);
                        this.isFenced = false;
                        deleteFuture.completeExceptionally((Throwable)ex);
                    } else {
                        this.brokerService.removeTopicFromCache(this.topic);
                        log.info("[{}] Topic deleted", (Object)this.topic);
                        deleteFuture.complete(null);
                    }
                });
            } else {
                deleteFuture.completeExceptionally(new BrokerServiceException.TopicBusyException("Topic has " + USAGE_COUNT_UPDATER.get(this) + " connected producers/consumers"));
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
        return deleteFuture;
    }

    @Override
    public CompletableFuture<Void> close() {
        CompletableFuture<Void> closeFuture;
        block4: {
            closeFuture = new CompletableFuture<Void>();
            this.lock.writeLock().lock();
            try {
                if (!this.isFenced) {
                    this.isFenced = true;
                    break block4;
                }
                log.warn("[{}] Topic is already being closed or deleted", (Object)this.topic);
                closeFuture.completeExceptionally(new BrokerServiceException.TopicFencedException("Topic is already fenced"));
                CompletableFuture<Void> completableFuture = closeFuture;
                return completableFuture;
            }
            finally {
                this.lock.writeLock().unlock();
            }
        }
        ArrayList futures = Lists.newArrayList();
        this.replicators.forEach((cluster, replicator) -> {
            boolean bl = futures.add(replicator.disconnect());
        });
        this.producers.forEach(producer -> {
            boolean bl = futures.add(producer.disconnect());
        });
        this.subscriptions.forEach((s, sub) -> {
            boolean bl = futures.add(sub.disconnect());
        });
        ((CompletableFuture)FutureUtil.waitForAll((List)futures).thenRun(() -> {
            log.info("[{}] Topic closed", (Object)this.topic);
            this.brokerService.removeTopicFromCache(this.topic);
            closeFuture.complete(null);
        })).exceptionally(exception -> {
            log.error("[{}] Error closing topic", (Object)this.topic, exception);
            this.isFenced = false;
            closeFuture.completeExceptionally((Throwable)exception);
            return null;
        });
        return closeFuture;
    }

    public CompletableFuture<Void> stopReplProducers() {
        ArrayList closeFutures = Lists.newArrayList();
        this.replicators.forEach((region, replicator) -> {
            boolean bl = closeFutures.add(replicator.disconnect());
        });
        return FutureUtil.waitForAll((List)closeFutures);
    }

    @Override
    public CompletableFuture<Void> checkReplication() {
        DestinationName name = DestinationName.get((String)this.topic);
        if (!name.isGlobal()) {
            return CompletableFuture.completedFuture(null);
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Checking replication status", (Object)name);
        }
        Policies policies = null;
        try {
            policies = (Policies)this.brokerService.pulsar().getConfigurationCache().policiesCache().get(AdminResource.path("policies", name.getNamespace())).orElseThrow(() -> new KeeperException.NoNodeException());
        }
        catch (Exception e) {
            CompletableFuture<Void> future = new CompletableFuture<Void>();
            future.completeExceptionally(new BrokerServiceException.ServerMetadataException(e));
            return future;
        }
        TreeSet configuredClusters = policies.replication_clusters != null ? Sets.newTreeSet((Iterable)policies.replication_clusters) : Collections.emptySet();
        String localCluster = this.brokerService.pulsar().getConfiguration().getClusterName();
        ArrayList futures = Lists.newArrayList();
        for (String cluster2 : configuredClusters) {
            if (cluster2.equals(localCluster) || this.replicators.containsKey((Object)cluster2)) continue;
            this.startReplicator(cluster2);
        }
        this.replicators.forEach((cluster, replicator) -> {
            if (!cluster.equals(localCluster) && !configuredClusters.contains(cluster)) {
                futures.add(this.removeReplicator((String)cluster));
            }
        });
        return FutureUtil.waitForAll((List)futures);
    }

    void startReplicator(String remoteCluster) {
        log.info("[{}] Starting replicator to remote: {}", (Object)this.topic, (Object)remoteCluster);
        String localCluster = this.brokerService.pulsar().getConfiguration().getClusterName();
        this.replicators.computeIfAbsent((Object)remoteCluster, r -> new NonPersistentReplicator(this, localCluster, remoteCluster, this.brokerService));
    }

    CompletableFuture<Void> removeReplicator(String remoteCluster) {
        log.info("[{}] Removing replicator to {}", (Object)this.topic, (Object)remoteCluster);
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        String name = NonPersistentReplicator.getReplicatorName(this.replicatorPrefix, remoteCluster);
        ((CompletableFuture)((NonPersistentReplicator)this.replicators.get((Object)remoteCluster)).disconnect().thenRun(() -> log.info("[{}] Successfully removed replicator {}", (Object)name, (Object)remoteCluster))).exceptionally(e -> {
            log.error("[{}] Failed to close replication producer {} {}", new Object[]{this.topic, name, e.getMessage(), e});
            future.completeExceptionally((Throwable)e);
            return null;
        });
        return future;
    }

    private CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        ((CompletableFuture)this.checkReplication().thenAccept(res -> {
            log.info("[{}] Policies updated successfully", (Object)this.topic);
            result.complete(null);
        })).exceptionally(th -> {
            log.error("[{}] Policies update failed {}, scheduled retry in {} seconds", new Object[]{this.topic, th.getMessage(), 60L, th});
            this.brokerService.executor().schedule(this::checkReplicationAndRetryOnFailure, 60L, TimeUnit.SECONDS);
            result.completeExceptionally((Throwable)th);
            return null;
        });
        return result;
    }

    @Override
    public void checkMessageExpiry() {
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).add("topic", (Object)this.topic).toString();
    }

    @Override
    public ConcurrentOpenHashSet<Producer> getProducers() {
        return this.producers;
    }

    public ConcurrentOpenHashMap<String, NonPersistentSubscription> getSubscriptions() {
        return this.subscriptions;
    }

    public ConcurrentOpenHashMap<String, NonPersistentReplicator> getReplicators() {
        return this.replicators;
    }

    @Override
    public Subscription getSubscription(String subscription) {
        return (Subscription)this.subscriptions.get((Object)subscription);
    }

    public Replicator getPersistentReplicator(String remoteCluster) {
        return (Replicator)this.replicators.get((Object)remoteCluster);
    }

    public BrokerService getBrokerService() {
        return this.brokerService;
    }

    @Override
    public String getName() {
        return this.topic;
    }

    @Override
    public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats, StatsOutputStream destStatsStream, ClusterReplicationMetrics replStats, String namespace) {
        TopicStats topicStats = (TopicStats)threadLocalTopicStats.get();
        topicStats.reset();
        this.replicators.forEach((region, replicator) -> replicator.updateRates());
        nsStats.producerCount = (int)((long)nsStats.producerCount + this.producers.size());
        bundleStats.producerCount = (int)((long)bundleStats.producerCount + this.producers.size());
        destStatsStream.startObject(this.topic);
        this.producers.forEach(producer -> {
            producer.updateRates();
            PublisherStats PublisherStats2 = producer.getStats();
            topicStats.aggMsgRateIn += PublisherStats2.msgRateIn;
            topicStats.aggMsgThroughputIn += PublisherStats2.msgThroughputIn;
            if (producer.isRemote()) {
                topicStats.remotePublishersStats.put((Object)producer.getRemoteCluster(), (Object)PublisherStats2);
            }
        });
        destStatsStream.startList("publishers");
        destStatsStream.endList();
        destStatsStream.startObject("replication");
        nsStats.replicatorCount += topicStats.remotePublishersStats.size();
        destStatsStream.endObject();
        destStatsStream.startObject("subscriptions");
        nsStats.subsCount = (int)((long)nsStats.subsCount + this.subscriptions.size());
        this.subscriptions.forEach((subscriptionName, subscription) -> {
            double subMsgRateOut = 0.0;
            double subMsgThroughputOut = 0.0;
            double subMsgRateRedeliver = 0.0;
            try {
                destStatsStream.startObject((String)subscriptionName);
                T[] consumers = ((CopyOnWriteArrayList)subscription.getConsumers()).array();
                namespaceStats.consumerCount += consumers.length;
                namespaceBundleStats.consumerCount += consumers.length;
                destStatsStream.startList("consumers");
                subscription.getDispatcher().getMesssageDropRate().calculateRate();
                T[] TArray = consumers;
                int n = consumers.length;
                int n2 = 0;
                while (n2 < n) {
                    Object consumerObj = TArray[n2];
                    Consumer consumer = (Consumer)consumerObj;
                    consumer.updateRates();
                    ConsumerStats consumerStats = consumer.getStats();
                    subMsgRateOut += consumerStats.msgRateOut;
                    subMsgThroughputOut += consumerStats.msgThroughputOut;
                    subMsgRateRedeliver += consumerStats.msgRateRedeliver;
                    destStatsStream.startObject();
                    destStatsStream.writePair("address", consumerStats.address);
                    destStatsStream.writePair("consumerName", consumerStats.consumerName);
                    destStatsStream.writePair("availablePermits", consumerStats.availablePermits);
                    destStatsStream.writePair("connectedSince", consumerStats.connectedSince);
                    destStatsStream.writePair("msgRateOut", consumerStats.msgRateOut);
                    destStatsStream.writePair("msgThroughputOut", consumerStats.msgThroughputOut);
                    destStatsStream.writePair("msgRateRedeliver", consumerStats.msgRateRedeliver);
                    if (PulsarApi.CommandSubscribe.SubType.Shared.equals((Object)subscription.getType())) {
                        destStatsStream.writePair("unackedMessages", consumerStats.unackedMessages);
                        destStatsStream.writePair("blockedConsumerOnUnackedMsgs", consumerStats.blockedConsumerOnUnackedMsgs);
                    }
                    if (consumerStats.clientVersion != null) {
                        destStatsStream.writePair("clientVersion", consumerStats.clientVersion);
                    }
                    destStatsStream.endObject();
                    ++n2;
                }
                destStatsStream.endList();
                destStatsStream.writePair("msgBacklog", subscription.getNumberOfEntriesInBacklog());
                destStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate());
                destStatsStream.writePair("msgRateOut", subMsgRateOut);
                destStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
                destStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
                destStatsStream.writePair("type", subscription.getTypeString());
                if (subscription.getDispatcher() != null) {
                    destStatsStream.writePair("msgDropRate", subscription.getDispatcher().getMesssageDropRate().getRate());
                }
                destStatsStream.endObject();
                topicStats.aggMsgRateOut += subMsgRateOut;
                topicStats.aggMsgThroughputOut += subMsgThroughputOut;
                namespaceStats.msgBacklog += (double)subscription.getNumberOfEntriesInBacklog();
            }
            catch (Exception e) {
                log.error("Got exception when creating consumer stats for subscription {}: {}", new Object[]{subscriptionName, e.getMessage(), e});
            }
        });
        destStatsStream.endObject();
        topicStats.averageMsgSize = topicStats.aggMsgRateIn == 0.0 ? 0.0 : topicStats.aggMsgThroughputIn / topicStats.aggMsgRateIn;
        destStatsStream.writePair("producerCount", this.producers.size());
        destStatsStream.writePair("averageMsgSize", topicStats.averageMsgSize);
        destStatsStream.writePair("msgRateIn", topicStats.aggMsgRateIn);
        destStatsStream.writePair("msgRateOut", topicStats.aggMsgRateOut);
        destStatsStream.writePair("msgThroughputIn", topicStats.aggMsgThroughputIn);
        destStatsStream.writePair("msgThroughputOut", topicStats.aggMsgThroughputOut);
        nsStats.msgRateIn += topicStats.aggMsgRateIn;
        nsStats.msgRateOut += topicStats.aggMsgRateOut;
        nsStats.msgThroughputIn += topicStats.aggMsgThroughputIn;
        nsStats.msgThroughputOut += topicStats.aggMsgThroughputOut;
        bundleStats.msgRateIn += topicStats.aggMsgRateIn;
        bundleStats.msgRateOut += topicStats.aggMsgRateOut;
        bundleStats.msgThroughputIn += topicStats.aggMsgThroughputIn;
        bundleStats.msgThroughputOut += topicStats.aggMsgThroughputOut;
        destStatsStream.endObject();
    }

    public NonPersistentTopicStats getStats() {
        NonPersistentTopicStats stats = new NonPersistentTopicStats();
        ObjectObjectHashMap remotePublishersStats = new ObjectObjectHashMap();
        this.producers.forEach(producer -> {
            NonPersistentPublisherStats publisherStats = (NonPersistentPublisherStats)producer.getStats();
            nonPersistentTopicStats.msgRateIn += publisherStats.msgRateIn;
            nonPersistentTopicStats.msgThroughputIn += publisherStats.msgThroughputIn;
            if (producer.isRemote()) {
                remotePublishersStats.put((Object)producer.getRemoteCluster(), (Object)publisherStats);
            } else {
                stats.getPublishers().add(publisherStats);
            }
        });
        stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 : stats.msgThroughputIn / stats.msgRateIn;
        this.subscriptions.forEach((name, subscription) -> {
            NonPersistentSubscriptionStats subStats = subscription.getStats();
            nonPersistentTopicStats.msgRateOut += subStats.msgRateOut;
            nonPersistentTopicStats.msgThroughputOut += subStats.msgThroughputOut;
            stats.getSubscriptions().put(name, subStats);
        });
        this.replicators.forEach((cluster, replicator) -> {
            NonPersistentReplicatorStats ReplicatorStats2 = replicator.getStats();
            PublisherStats pubStats = (PublisherStats)remotePublishersStats.get((Object)replicator.getRemoteCluster());
            if (pubStats != null) {
                ReplicatorStats2.msgRateIn = pubStats.msgRateIn;
                ReplicatorStats2.msgThroughputIn = pubStats.msgThroughputIn;
                ReplicatorStats2.inboundConnection = pubStats.address;
                ReplicatorStats2.inboundConnectedSince = pubStats.connectedSince;
            }
            nonPersistentTopicStats.msgRateOut += ReplicatorStats2.msgRateOut;
            nonPersistentTopicStats.msgThroughputOut += ReplicatorStats2.msgThroughputOut;
            stats.getReplication().put(replicator.getRemoteCluster(), ReplicatorStats2);
        });
        return stats;
    }

    @Override
    public PersistentTopicInternalStats getInternalStats() {
        PersistentTopicInternalStats stats = new PersistentTopicInternalStats();
        stats.entriesAddedCounter = ENTRIES_ADDED_COUNTER_UPDATER.get(this);
        stats.cursors = Maps.newTreeMap();
        this.subscriptions.forEach((name, subs) -> {
            PersistentTopicInternalStats.CursorStats cursorStats = persistentTopicInternalStats.cursors.put(name, new PersistentTopicInternalStats.CursorStats());
        });
        this.replicators.forEach((name, subs) -> {
            PersistentTopicInternalStats.CursorStats cursorStats = persistentTopicInternalStats.cursors.put(name, new PersistentTopicInternalStats.CursorStats());
        });
        return stats;
    }

    public boolean isActive() {
        if (DestinationName.get((String)this.topic).isGlobal()) {
            return !this.subscriptions.isEmpty() || this.hasLocalProducers();
        }
        return USAGE_COUNT_UPDATER.get(this) != 0L || !this.subscriptions.isEmpty();
    }

    @Override
    public void checkGC(int gcIntervalInSeconds) {
        if (this.isActive()) {
            this.lastActive = System.nanoTime();
        } else if (System.nanoTime() - this.lastActive > TimeUnit.SECONDS.toNanos(gcIntervalInSeconds) && DestinationName.get((String)this.topic).isGlobal()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Global topic inactive for {} seconds, closing repl producers.", (Object)this.topic, (Object)gcIntervalInSeconds);
            }
            ((CompletableFuture)((CompletableFuture)this.stopReplProducers().thenCompose(v -> this.delete(true))).thenRun(() -> log.info("[{}] Topic deleted successfully due to inactivity", (Object)this.topic))).exceptionally(e -> {
                if (e.getCause() instanceof BrokerServiceException.TopicBusyException) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Did not delete busy topic: {}", (Object)this.topic, (Object)e.getCause().getMessage());
                    }
                    this.replicators.forEach((region, replicator) -> replicator.startProducer());
                } else {
                    log.warn("[{}] Inactive topic deletion failed", (Object)this.topic, e);
                }
                return null;
            });
        }
    }

    @Override
    public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
        this.producers.forEach(Producer::checkPermissions);
        this.subscriptions.forEach((subName, sub) -> ((java.util.concurrent.CopyOnWriteArrayList)sub.getConsumers()).forEach(Consumer::checkPermissions));
        return this.checkReplicationAndRetryOnFailure();
    }

    @Override
    public BacklogQuota getBacklogQuota() {
        throw new UnsupportedOperationException("getBacklogQuota method is not supported on non-persistent topic");
    }

    @Override
    public boolean isBacklogQuotaExceeded(String producerName) {
        return false;
    }

    @Override
    public CompletableFuture<Void> unsubscribe(String subName) {
        return CompletableFuture.completedFuture(null);
    }

    public void markBatchMessagePublished() {
        this.hasBatchMessagePublished = true;
    }

    private static class TopicStats {
        public double averageMsgSize;
        public double aggMsgRateIn;
        public double aggMsgThroughputIn;
        public double aggMsgRateOut;
        public double aggMsgThroughputOut;
        public final ObjectObjectHashMap<String, PublisherStats> remotePublishersStats = new ObjectObjectHashMap();

        public TopicStats() {
            this.reset();
        }

        public void reset() {
            this.averageMsgSize = 0.0;
            this.aggMsgRateIn = 0.0;
            this.aggMsgThroughputIn = 0.0;
            this.aggMsgRateOut = 0.0;
            this.aggMsgThroughputOut = 0.0;
            this.remotePublishersStats.clear();
        }
    }
}

