package org.apache.pulsar.broker.service.nonpersistent;

import com.carrotsearch.hppc.ObjectObjectHashMap;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryCacheManager;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.ZkAdminPaths;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.StreamingStats;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentPublisherStatsImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentReplicatorStatsImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentSubscriptionStatsImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
import org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl;
import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.utils.StatsOutputStream;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.class */
public class NonPersistentTopic extends AbstractTopic implements Topic {
    private final ConcurrentOpenHashMap<String, NonPersistentSubscription> subscriptions;
    private final ConcurrentOpenHashMap<String, NonPersistentReplicator> replicators;
    private volatile long entriesAddedCounter;
    private final LongAdder bytesOutFromRemovedSubscriptions;
    private final LongAdder msgOutFromRemovedSubscriptions;
    private static final AtomicLongFieldUpdater<NonPersistentTopic> ENTRIES_ADDED_COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(NonPersistentTopic.class, "entriesAddedCounter");
    private static final FastThreadLocal<TopicStats> threadLocalTopicStats = new FastThreadLocal<TopicStats>() { // from class: org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic.1
        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: initialValue, reason: merged with bridge method [inline-methods] */
        public TopicStats m138initialValue() {
            return new TopicStats();
        }
    };
    private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic$TopicStats.class */
    public static class TopicStats {
        public double averageMsgSize;
        public double aggMsgRateIn;
        public double aggMsgThroughputIn;
        public double aggMsgRateOut;
        public double aggMsgThroughputOut;
        public final ObjectObjectHashMap<String, PublisherStats> remotePublishersStats = new ObjectObjectHashMap<>();

        public TopicStats() {
            reset();
        }

        public void reset() {
            this.averageMsgSize = 0.0d;
            this.aggMsgRateIn = 0.0d;
            this.aggMsgThroughputIn = 0.0d;
            this.aggMsgRateOut = 0.0d;
            this.aggMsgThroughputOut = 0.0d;
            this.remotePublishersStats.clear();
        }
    }

    public NonPersistentTopic(String str, BrokerService brokerService) {
        super(str, brokerService);
        this.entriesAddedCounter = 0L;
        this.bytesOutFromRemovedSubscriptions = new LongAdder();
        this.msgOutFromRemovedSubscriptions = new LongAdder();
        this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
        this.replicators = new ConcurrentOpenHashMap<>(16, 1);
        this.isFenced = false;
        try {
            Policies policies = (Policies) brokerService.pulsar().getConfigurationCache().policiesCache().get(AdminResource.path(ZkAdminPaths.POLICIES, TopicName.get(str).getNamespace())).orElseThrow(() -> {
                return new KeeperException.NoNodeException();
            });
            this.isEncryptionRequired = policies.encryption_required;
            this.isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
            if (policies.inactive_topic_policies != null) {
                this.inactiveTopicPolicies = policies.inactive_topic_policies;
            }
            setSchemaCompatibilityStrategy(policies);
            this.schemaValidationEnforced = policies.schema_validation_enforced;
        } catch (Exception e) {
            log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", str, e.getMessage());
            this.isEncryptionRequired = false;
        }
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public void publishMessage(ByteBuf byteBuf, Topic.PublishContext publishContext) {
        if (isExceedMaximumMessageSize(byteBuf.readableBytes())) {
            publishContext.completed(new BrokerServiceException.NotAllowedException("Exceed maximum message size"), -1L, -1L);
            return;
        }
        publishContext.completed(null, 0L, 0L);
        ENTRIES_ADDED_COUNTER_UPDATER.incrementAndGet(this);
        this.subscriptions.forEach((str, nonPersistentSubscription) -> {
            ByteBuf retainedDuplicate = byteBuf.retainedDuplicate();
            Entry create = EntryCacheManager.create(0L, 0L, retainedDuplicate);
            retainedDuplicate.release();
            if (nonPersistentSubscription.getDispatcher() != null) {
                nonPersistentSubscription.getDispatcher().sendMessages(Collections.singletonList(create));
            } else {
                create.release();
            }
        });
        if (this.replicators.isEmpty()) {
            return;
        }
        this.replicators.forEach((str2, nonPersistentReplicator) -> {
            ByteBuf retainedDuplicate = byteBuf.retainedDuplicate();
            Entry create = EntryCacheManager.create(0L, 0L, retainedDuplicate);
            retainedDuplicate.release();
            nonPersistentReplicator.sendMessage(create);
        });
    }

    @Override // org.apache.pulsar.broker.service.AbstractTopic
    protected CompletableFuture<Long> incrementTopicEpoch(Optional<Long> optional) {
        return CompletableFuture.completedFuture(Long.valueOf(optional.orElse(-1L).longValue() + 1));
    }

    @Override // org.apache.pulsar.broker.service.AbstractTopic
    protected CompletableFuture<Long> setTopicEpoch(long j) {
        return CompletableFuture.completedFuture(Long.valueOf(j));
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public void checkMessageDeduplicationInfo() {
    }

    @Override // org.apache.pulsar.broker.service.AbstractTopic, org.apache.pulsar.broker.service.Topic
    public void removeProducer(Producer producer) {
        Preconditions.checkArgument(producer.getTopic() == this);
        if (this.producers.remove(producer.getProducerName(), producer)) {
            handleProducerRemoved(producer);
        }
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Consumer> subscribe(TransportCnx transportCnx, String str, long j, CommandSubscribe.SubType subType, int i, String str2, boolean z, MessageId messageId, Map<String, String> map, boolean z2, CommandSubscribe.InitialPosition initialPosition, long j2, boolean z3, KeySharedMeta keySharedMeta) {
        CompletableFuture<Consumer> completableFuture = new CompletableFuture<>();
        try {
            this.brokerService.checkTopicNsOwnership(getName());
            if (this.hasBatchMessagePublished && !transportCnx.isBatchMessageCompatibleVersion()) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Consumer doesn't support batch-message {}", this.topic, str);
                }
                completableFuture.completeExceptionally(new BrokerServiceException.UnsupportedVersionException("Consumer doesn't support batch-message"));
                return completableFuture;
            }
            if (str.startsWith(this.replicatorPrefix)) {
                log.warn("[{}] Failed to create subscription for {}", this.topic, str);
                completableFuture.completeExceptionally(new BrokerServiceException.NamingException("Subscription with reserved subscription name attempted"));
                return completableFuture;
            }
            if (z2) {
                completableFuture.completeExceptionally(new BrokerServiceException.NotAllowedException("readCompacted only valid on persistent topics"));
                return completableFuture;
            }
            this.lock.readLock().lock();
            try {
                if (this.isFenced) {
                    log.warn("[{}] Attempting to subscribe to a fenced topic", this.topic);
                    completableFuture.completeExceptionally(new BrokerServiceException.TopicFencedException("Topic is temporarily unavailable"));
                    this.lock.readLock().unlock();
                    return completableFuture;
                }
                handleConsumerAdded(str, str2);
                this.lock.readLock().unlock();
                NonPersistentSubscription nonPersistentSubscription = (NonPersistentSubscription) this.subscriptions.computeIfAbsent(str, str3 -> {
                    return new NonPersistentSubscription(this, str, z);
                });
                Consumer consumer = new Consumer(nonPersistentSubscription, subType, this.topic, j, i, str2, 0, transportCnx, transportCnx.getAuthRole(), map, z2, initialPosition, keySharedMeta, MessageId.latest);
                addConsumerToSubscription(nonPersistentSubscription, consumer).thenRun(() -> {
                    if (transportCnx.isActive()) {
                        log.info("[{}][{}] Created new subscription for {}", new Object[]{this.topic, str, Long.valueOf(j)});
                        completableFuture.complete(consumer);
                        return;
                    }
                    try {
                        consumer.close();
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", new Object[]{this.topic, str, consumer.consumerName(), Long.valueOf(currentUsageCount())});
                        }
                        completableFuture.completeExceptionally(new BrokerServiceException("Connection was closed while the opening the cursor "));
                    } catch (BrokerServiceException e) {
                        if (e instanceof BrokerServiceException.ConsumerBusyException) {
                            log.warn("[{}][{}] Consumer {} {} already connected", new Object[]{this.topic, str, Long.valueOf(j), str2});
                        } else if (e instanceof BrokerServiceException.SubscriptionBusyException) {
                            log.warn("[{}][{}] {}", new Object[]{this.topic, str, e.getMessage()});
                        }
                        decrementUsageCount();
                        completableFuture.completeExceptionally(e);
                    }
                }).exceptionally(th -> {
                    Throwable cause = th.getCause();
                    if (cause instanceof BrokerServiceException.ConsumerBusyException) {
                        log.warn("[{}][{}] Consumer {} {} already connected", new Object[]{this.topic, str, Long.valueOf(j), str2});
                    } else if (cause instanceof BrokerServiceException.SubscriptionBusyException) {
                        log.warn("[{}][{}] {}", new Object[]{this.topic, str, th.getMessage()});
                    }
                    decrementUsageCount();
                    completableFuture.completeExceptionally(cause);
                    return null;
                });
                return completableFuture;
            } catch (Throwable th2) {
                this.lock.readLock().unlock();
                throw th2;
            }
        } catch (Exception e) {
            completableFuture.completeExceptionally(e);
            return completableFuture;
        }
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Subscription> createSubscription(String str, CommandSubscribe.InitialPosition initialPosition, boolean z) {
        return CompletableFuture.completedFuture(new NonPersistentSubscription(this, str, true));
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Void> delete() {
        return delete(false, false, false);
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Void> deleteForcefully() {
        return delete(false, true, false);
    }

    private CompletableFuture<Void> delete(boolean z, boolean z2, boolean z3) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.lock.writeLock().lock();
        try {
            if (this.isFenced) {
                log.warn("[{}] Topic is already being closed or deleted", this.topic);
                completableFuture.completeExceptionally(new BrokerServiceException.TopicFencedException("Topic is already fenced"));
                this.lock.writeLock().unlock();
                return completableFuture;
            }
            CompletableFuture completableFuture2 = new CompletableFuture();
            if (z2) {
                ArrayList newArrayList = Lists.newArrayList();
                this.replicators.forEach((str, nonPersistentReplicator) -> {
                    newArrayList.add(nonPersistentReplicator.disconnect());
                });
                this.producers.values().forEach(producer -> {
                    newArrayList.add(producer.disconnect());
                });
                this.subscriptions.forEach((str2, nonPersistentSubscription) -> {
                    newArrayList.add(nonPersistentSubscription.disconnect());
                });
                FutureUtil.waitForAll(newArrayList).thenRun(() -> {
                    completableFuture2.complete(null);
                }).exceptionally(th -> {
                    log.error("[{}] Error closing clients", this.topic, th);
                    this.isFenced = false;
                    completableFuture2.completeExceptionally(th);
                    return null;
                });
            } else {
                completableFuture2.complete(null);
            }
            completableFuture2.thenAccept(r11 -> {
                if (currentUsageCount() != 0) {
                    completableFuture.completeExceptionally(new BrokerServiceException.TopicBusyException("Topic has " + currentUsageCount() + " connected producers/consumers"));
                    return;
                }
                this.isFenced = true;
                ArrayList newArrayList2 = Lists.newArrayList();
                if (!z) {
                    this.subscriptions.forEach((str3, nonPersistentSubscription2) -> {
                        newArrayList2.add(nonPersistentSubscription2.delete());
                    });
                } else if (!this.subscriptions.isEmpty()) {
                    this.isFenced = false;
                    completableFuture.completeExceptionally(new BrokerServiceException.TopicBusyException("Topic has subscriptions"));
                    return;
                }
                if (z3) {
                    newArrayList2.add(deleteSchema().thenApply(schemaVersion -> {
                        return null;
                    }));
                }
                newArrayList2.add(deleteTopicPolicies());
                FutureUtil.waitForAll(newArrayList2).whenComplete((r7, th2) -> {
                    if (th2 == null) {
                        this.brokerService.executor().execute(() -> {
                            this.brokerService.removeTopicFromCache(this.topic);
                            log.info("[{}] Topic deleted", this.topic);
                            completableFuture.complete(null);
                        });
                        return;
                    }
                    log.error("[{}] Error deleting topic", this.topic, th2);
                    this.isFenced = false;
                    completableFuture.completeExceptionally(th2);
                });
            }).exceptionally(th2 -> {
                completableFuture.completeExceptionally(new BrokerServiceException.TopicBusyException("Failed to close clients before deleting topic."));
                return null;
            });
            this.lock.writeLock().unlock();
            return completableFuture;
        } catch (Throwable th3) {
            this.lock.writeLock().unlock();
            throw th3;
        }
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Void> close(boolean z) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.lock.writeLock().lock();
        try {
            if (this.isFenced && !z) {
                log.warn("[{}] Topic is already being closed or deleted", this.topic);
                completableFuture.completeExceptionally(new BrokerServiceException.TopicFencedException("Topic is already fenced"));
                this.lock.writeLock().unlock();
                return completableFuture;
            }
            this.isFenced = true;
            this.lock.writeLock().unlock();
            ArrayList newArrayList = Lists.newArrayList();
            this.replicators.forEach((str, nonPersistentReplicator) -> {
                newArrayList.add(nonPersistentReplicator.disconnect());
            });
            this.producers.values().forEach(producer -> {
                newArrayList.add(producer.disconnect());
            });
            if (this.topicPublishRateLimiter != null) {
                try {
                    this.topicPublishRateLimiter.close();
                } catch (Exception e) {
                    log.warn("Error closing topicPublishRateLimiter for topic {}", this.topic, e);
                }
            }
            this.subscriptions.forEach((str2, nonPersistentSubscription) -> {
                newArrayList.add(nonPersistentSubscription.disconnect());
            });
            (z ? CompletableFuture.completedFuture(null) : FutureUtil.waitForAll(newArrayList)).thenRun(() -> {
                log.info("[{}] Topic closed", this.topic);
                this.brokerService.executor().execute(() -> {
                    this.brokerService.removeTopicFromCache(this.topic);
                    completableFuture.complete(null);
                });
            }).exceptionally(th -> {
                log.error("[{}] Error closing topic", this.topic, th);
                this.isFenced = false;
                completableFuture.completeExceptionally(th);
                return null;
            });
            return completableFuture;
        } catch (Throwable th2) {
            this.lock.writeLock().unlock();
            throw th2;
        }
    }

    public CompletableFuture<Void> stopReplProducers() {
        ArrayList newArrayList = Lists.newArrayList();
        this.replicators.forEach((str, nonPersistentReplicator) -> {
            newArrayList.add(nonPersistentReplicator.disconnect());
        });
        return FutureUtil.waitForAll(newArrayList);
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Void> checkReplication() {
        TopicName topicName = TopicName.get(this.topic);
        if (!topicName.isGlobal()) {
            return CompletableFuture.completedFuture(null);
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Checking replication status", topicName);
        }
        try {
            Policies policies = (Policies) this.brokerService.pulsar().getConfigurationCache().policiesCache().get(AdminResource.path(ZkAdminPaths.POLICIES, topicName.getNamespace())).orElseThrow(() -> {
                return new KeeperException.NoNodeException();
            });
            Set<String> emptySet = policies.replication_clusters != null ? policies.replication_clusters : Collections.emptySet();
            String clusterName = this.brokerService.pulsar().getConfiguration().getClusterName();
            ArrayList newArrayList = Lists.newArrayList();
            for (String str : emptySet) {
                if (!str.equals(clusterName) && !this.replicators.containsKey(str) && !startReplicator(str)) {
                    return FutureUtil.failedFuture(new BrokerServiceException.NamingException(this.topic + " failed to start replicator for " + str));
                }
            }
            Set set = emptySet;
            this.replicators.forEach((str2, nonPersistentReplicator) -> {
                if (str2.equals(clusterName) || set.contains(str2)) {
                    return;
                }
                newArrayList.add(removeReplicator(str2));
            });
            return FutureUtil.waitForAll(newArrayList);
        } catch (Exception e) {
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            completableFuture.completeExceptionally(new BrokerServiceException.ServerMetadataException(e));
            return completableFuture;
        }
    }

    boolean startReplicator(String str) {
        log.info("[{}] Starting replicator to remote: {}", this.topic, str);
        return addReplicationCluster(str, this, this.brokerService.pulsar().getConfiguration().getClusterName());
    }

    protected boolean addReplicationCluster(String str, NonPersistentTopic nonPersistentTopic, String str2) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        this.replicators.computeIfAbsent(str, str3 -> {
            try {
                return new NonPersistentReplicator(this, str2, str, this.brokerService);
            } catch (BrokerServiceException.NamingException e) {
                atomicBoolean.set(false);
                log.error("[{}] Replicator startup failed due to partitioned-topic {}", this.topic, str);
                return null;
            }
        });
        if (!atomicBoolean.get()) {
            this.replicators.remove(str);
        }
        return atomicBoolean.get();
    }

    CompletableFuture<Void> removeReplicator(String str) {
        log.info("[{}] Removing replicator to {}", this.topic, str);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        String replicatorName = NonPersistentReplicator.getReplicatorName(this.replicatorPrefix, str);
        ((NonPersistentReplicator) this.replicators.get(str)).disconnect().thenRun(() -> {
            log.info("[{}] Successfully removed replicator {}", replicatorName, str);
        }).exceptionally(th -> {
            log.error("[{}] Failed to close replication producer {} {}", new Object[]{this.topic, replicatorName, th.getMessage(), th});
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    private CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        checkReplication().thenAccept(r6 -> {
            log.info("[{}] Policies updated successfully", this.topic);
            completableFuture.complete(null);
        }).exceptionally(th -> {
            log.error("[{}] Policies update failed {}, scheduled retry in {} seconds", new Object[]{this.topic, th.getMessage(), 60L, th});
            this.brokerService.executor().schedule(this::checkReplicationAndRetryOnFailure, 60L, TimeUnit.SECONDS);
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public void checkMessageExpiry() {
    }

    @Override // org.apache.pulsar.broker.service.AbstractTopic
    public int getNumberOfConsumers() {
        int i = 0;
        Iterator it = this.subscriptions.values().iterator();
        while (it.hasNext()) {
            i += ((NonPersistentSubscription) it.next()).getConsumers().size();
        }
        return i;
    }

    @Override // org.apache.pulsar.broker.service.AbstractTopic
    public int getNumberOfSameAddressConsumers(String str) {
        return getNumberOfSameAddressConsumers(str, this.subscriptions.values());
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public ConcurrentOpenHashMap<String, NonPersistentSubscription> getSubscriptions() {
        return this.subscriptions;
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public ConcurrentOpenHashMap<String, NonPersistentReplicator> getReplicators() {
        return this.replicators;
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public Subscription getSubscription(String str) {
        return (Subscription) this.subscriptions.get(str);
    }

    public Replicator getPersistentReplicator(String str) {
        return (Replicator) this.replicators.get(str);
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public void updateRates(NamespaceStats namespaceStats, NamespaceBundleStats namespaceBundleStats, StatsOutputStream statsOutputStream, ClusterReplicationMetrics clusterReplicationMetrics, String str, boolean z) {
        TopicStats topicStats = (TopicStats) threadLocalTopicStats.get();
        topicStats.reset();
        this.replicators.forEach((str2, nonPersistentReplicator) -> {
            nonPersistentReplicator.updateRates();
        });
        namespaceStats.producerCount += this.producers.size();
        namespaceBundleStats.producerCount += this.producers.size();
        statsOutputStream.startObject(this.topic);
        statsOutputStream.startList("publishers");
        this.producers.values().forEach(producer -> {
            producer.updateRates();
            PublisherStatsImpl stats = producer.getStats();
            topicStats.aggMsgRateIn += stats.msgRateIn;
            topicStats.aggMsgThroughputIn += stats.msgThroughputIn;
            if (producer.isRemote()) {
                topicStats.remotePublishersStats.put(producer.getRemoteCluster(), stats);
            }
            if (z) {
                StreamingStats.writePublisherStats(statsOutputStream, stats);
            }
        });
        statsOutputStream.endList();
        statsOutputStream.startObject("replication");
        namespaceStats.replicatorCount += topicStats.remotePublishersStats.size();
        statsOutputStream.endObject();
        statsOutputStream.startObject("subscriptions");
        namespaceStats.subsCount = (int) (namespaceStats.subsCount + this.subscriptions.size());
        this.subscriptions.forEach((str3, nonPersistentSubscription) -> {
            double d = 0.0d;
            double d2 = 0.0d;
            double d3 = 0.0d;
            try {
                statsOutputStream.startObject(str3);
                statsOutputStream.startList("consumers");
                for (Consumer consumer : nonPersistentSubscription.getConsumers()) {
                    namespaceStats.consumerCount++;
                    namespaceBundleStats.consumerCount++;
                    consumer.updateRates();
                    ConsumerStatsImpl stats = consumer.getStats();
                    d += stats.msgRateOut;
                    d2 += stats.msgThroughputOut;
                    d3 += stats.msgRateRedeliver;
                    StreamingStats.writeConsumerStats(statsOutputStream, nonPersistentSubscription.getType(), stats);
                }
                statsOutputStream.endList();
                statsOutputStream.writePair("msgBacklog", nonPersistentSubscription.getNumberOfEntriesInBacklog(false));
                statsOutputStream.writePair("msgRateExpired", nonPersistentSubscription.getExpiredMessageRate());
                statsOutputStream.writePair("msgRateOut", d);
                statsOutputStream.writePair("msgThroughputOut", d2);
                statsOutputStream.writePair("msgRateRedeliver", d3);
                statsOutputStream.writePair("type", nonPersistentSubscription.getTypeString());
                if (nonPersistentSubscription.getDispatcher() != null) {
                    nonPersistentSubscription.getDispatcher().getMessageDropRate().calculateRate();
                    statsOutputStream.writePair("msgDropRate", nonPersistentSubscription.getDispatcher().getMessageDropRate().getValueRate());
                }
                statsOutputStream.endObject();
                topicStats.aggMsgRateOut += d;
                topicStats.aggMsgThroughputOut += d2;
                namespaceStats.msgBacklog += nonPersistentSubscription.getNumberOfEntriesInBacklog(false);
            } catch (Exception e) {
                log.error("Got exception when creating consumer stats for subscription {}: {}", new Object[]{str3, e.getMessage(), e});
            }
        });
        statsOutputStream.endObject();
        topicStats.averageMsgSize = topicStats.aggMsgRateIn == 0.0d ? 0.0d : topicStats.aggMsgThroughputIn / topicStats.aggMsgRateIn;
        statsOutputStream.writePair("producerCount", this.producers.size());
        statsOutputStream.writePair("averageMsgSize", topicStats.averageMsgSize);
        statsOutputStream.writePair("msgRateIn", topicStats.aggMsgRateIn);
        statsOutputStream.writePair("msgRateOut", topicStats.aggMsgRateOut);
        statsOutputStream.writePair("msgThroughputIn", topicStats.aggMsgThroughputIn);
        statsOutputStream.writePair("msgThroughputOut", topicStats.aggMsgThroughputOut);
        statsOutputStream.writePair("msgInCount", getMsgInCounter());
        statsOutputStream.writePair("bytesInCount", getBytesInCounter());
        statsOutputStream.writePair("msgOutCount", getMsgOutCounter());
        statsOutputStream.writePair("bytesOutCount", getBytesOutCounter());
        namespaceStats.msgRateIn += topicStats.aggMsgRateIn;
        namespaceStats.msgRateOut += topicStats.aggMsgRateOut;
        namespaceStats.msgThroughputIn += topicStats.aggMsgThroughputIn;
        namespaceStats.msgThroughputOut += topicStats.aggMsgThroughputOut;
        namespaceBundleStats.msgRateIn += topicStats.aggMsgRateIn;
        namespaceBundleStats.msgRateOut += topicStats.aggMsgRateOut;
        namespaceBundleStats.msgThroughputIn += topicStats.aggMsgThroughputIn;
        namespaceBundleStats.msgThroughputOut += topicStats.aggMsgThroughputOut;
        this.addEntryLatencyStatsUsec.refresh();
        NamespaceStats.add(this.addEntryLatencyStatsUsec.getBuckets(), namespaceStats.addLatencyBucket);
        this.addEntryLatencyStatsUsec.reset();
        statsOutputStream.endObject();
    }

    @Override // org.apache.pulsar.broker.service.Topic
    /* renamed from: getStats, reason: merged with bridge method [inline-methods] */
    public NonPersistentTopicStatsImpl mo137getStats(boolean z, boolean z2) {
        NonPersistentTopicStatsImpl nonPersistentTopicStatsImpl = new NonPersistentTopicStatsImpl();
        ObjectObjectHashMap objectObjectHashMap = new ObjectObjectHashMap();
        this.producers.values().forEach(producer -> {
            NonPersistentPublisherStatsImpl stats = producer.getStats();
            nonPersistentTopicStatsImpl.msgRateIn += stats.msgRateIn;
            nonPersistentTopicStatsImpl.msgThroughputIn += stats.msgThroughputIn;
            if (producer.isRemote()) {
                objectObjectHashMap.put(producer.getRemoteCluster(), stats);
            } else {
                nonPersistentTopicStatsImpl.getPublishers().add(stats);
            }
        });
        nonPersistentTopicStatsImpl.averageMsgSize = nonPersistentTopicStatsImpl.msgRateIn == 0.0d ? 0.0d : nonPersistentTopicStatsImpl.msgThroughputIn / nonPersistentTopicStatsImpl.msgRateIn;
        nonPersistentTopicStatsImpl.msgInCounter = getMsgInCounter();
        nonPersistentTopicStatsImpl.bytesInCounter = getBytesInCounter();
        nonPersistentTopicStatsImpl.waitingPublishers = getWaitingProducersCount();
        nonPersistentTopicStatsImpl.bytesOutCounter = this.bytesOutFromRemovedSubscriptions.longValue();
        nonPersistentTopicStatsImpl.msgOutCounter = this.msgOutFromRemovedSubscriptions.longValue();
        this.subscriptions.forEach((str, nonPersistentSubscription) -> {
            NonPersistentSubscriptionStatsImpl stats = nonPersistentSubscription.getStats();
            nonPersistentTopicStatsImpl.msgRateOut += stats.msgRateOut;
            nonPersistentTopicStatsImpl.msgThroughputOut += stats.msgThroughputOut;
            nonPersistentTopicStatsImpl.bytesOutCounter += stats.bytesOutCounter;
            nonPersistentTopicStatsImpl.msgOutCounter += stats.msgOutCounter;
            nonPersistentTopicStatsImpl.getSubscriptions().put(str, stats);
        });
        this.replicators.forEach((str2, nonPersistentReplicator) -> {
            NonPersistentReplicatorStatsImpl mo129getStats = nonPersistentReplicator.mo129getStats();
            PublisherStatsImpl publisherStatsImpl = (PublisherStatsImpl) objectObjectHashMap.get(nonPersistentReplicator.getRemoteCluster());
            if (publisherStatsImpl != null) {
                mo129getStats.msgRateIn = publisherStatsImpl.msgRateIn;
                mo129getStats.msgThroughputIn = publisherStatsImpl.msgThroughputIn;
                mo129getStats.inboundConnection = publisherStatsImpl.getAddress();
                mo129getStats.inboundConnectedSince = publisherStatsImpl.getConnectedSince();
            }
            nonPersistentTopicStatsImpl.msgRateOut += mo129getStats.msgRateOut;
            nonPersistentTopicStatsImpl.msgThroughputOut += mo129getStats.msgThroughputOut;
            nonPersistentTopicStatsImpl.getReplication().put(nonPersistentReplicator.getRemoteCluster(), mo129getStats);
        });
        nonPersistentTopicStatsImpl.topicEpoch = this.topicEpoch.orElse(null);
        return nonPersistentTopicStatsImpl;
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean z) {
        PersistentTopicInternalStats persistentTopicInternalStats = new PersistentTopicInternalStats();
        persistentTopicInternalStats.entriesAddedCounter = ENTRIES_ADDED_COUNTER_UPDATER.get(this);
        persistentTopicInternalStats.cursors = Maps.newTreeMap();
        this.subscriptions.forEach((str, nonPersistentSubscription) -> {
            persistentTopicInternalStats.cursors.put(str, new ManagedLedgerInternalStats.CursorStats());
        });
        this.replicators.forEach((str2, nonPersistentReplicator) -> {
            persistentTopicInternalStats.cursors.put(str2, new ManagedLedgerInternalStats.CursorStats());
        });
        return CompletableFuture.completedFuture(persistentTopicInternalStats);
    }

    public boolean isActive() {
        return TopicName.get(this.topic).isGlobal() ? !this.subscriptions.isEmpty() || hasLocalProducers() : (currentUsageCount() == 0 && this.subscriptions.isEmpty()) ? false : true;
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public void checkGC() {
        if (isDeleteWhileInactive()) {
            int maxInactiveDurationSeconds = this.inactiveTopicPolicies.getMaxInactiveDurationSeconds();
            if (isActive()) {
                this.lastActive = System.nanoTime();
            } else {
                if (System.nanoTime() - this.lastActive <= TimeUnit.SECONDS.toNanos(maxInactiveDurationSeconds) || !TopicName.get(this.topic).isGlobal()) {
                    return;
                }
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Global topic inactive for {} seconds, closing repl producers.", this.topic, Integer.valueOf(maxInactiveDurationSeconds));
                }
                stopReplProducers().thenCompose(r6 -> {
                    return delete(true, false, true);
                }).thenRun(() -> {
                    log.info("[{}] Topic deleted successfully due to inactivity", this.topic);
                }).exceptionally(th -> {
                    Throwable cause = th.getCause();
                    if (!(cause instanceof BrokerServiceException.TopicBusyException)) {
                        log.warn("[{}] Inactive topic deletion failed", this.topic, th);
                        return null;
                    }
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Did not delete busy topic: {}", this.topic, cause.getMessage());
                    }
                    this.replicators.forEach((str, nonPersistentReplicator) -> {
                        nonPersistentReplicator.startProducer();
                    });
                    return null;
                });
            }
        }
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public void checkInactiveSubscriptions() {
        try {
            Policies policies = (Policies) this.brokerService.pulsar().getConfigurationCache().policiesCache().get(AdminResource.path(ZkAdminPaths.POLICIES, TopicName.get(this.topic).getNamespace())).orElseThrow(KeeperException.NoNodeException::new);
            int subscriptionExpirationTimeMinutes = this.brokerService.pulsar().getConfiguration().getSubscriptionExpirationTimeMinutes();
            long millis = TimeUnit.MINUTES.toMillis(policies.subscription_expiration_time_minutes == null ? subscriptionExpirationTimeMinutes : r0.intValue());
            if (millis > 0) {
                this.subscriptions.forEach((str, nonPersistentSubscription) -> {
                    if ((nonPersistentSubscription.getDispatcher() == null || !nonPersistentSubscription.getDispatcher().isConsumerConnected()) && !nonPersistentSubscription.isReplicated() && System.currentTimeMillis() - nonPersistentSubscription.getLastActive() > millis) {
                        nonPersistentSubscription.delete().thenAccept(r7 -> {
                            log.info("[{}][{}] The subscription was deleted due to expiration", this.topic, str);
                        });
                    }
                });
            }
        } catch (Exception e) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Error getting policies", this.topic);
            }
        }
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public void checkBackloggedCursors() {
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public void checkDeduplicationSnapshot() {
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Void> onPoliciesUpdate(Policies policies) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] isEncryptionRequired changes: {} -> {}", new Object[]{this.topic, Boolean.valueOf(this.isEncryptionRequired), Boolean.valueOf(policies.encryption_required)});
        }
        this.isEncryptionRequired = policies.encryption_required;
        setSchemaCompatibilityStrategy(policies);
        this.isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
        this.schemaValidationEnforced = policies.schema_validation_enforced;
        this.producers.values().forEach(producer -> {
            producer.checkPermissions();
            producer.checkEncryption();
        });
        this.subscriptions.forEach((str, nonPersistentSubscription) -> {
            nonPersistentSubscription.getConsumers().forEach((v0) -> {
                v0.checkPermissions();
            });
        });
        if (policies.inactive_topic_policies != null) {
            this.inactiveTopicPolicies = policies.inactive_topic_policies;
        } else {
            ServiceConfiguration configuration = this.brokerService.getPulsar().getConfiguration();
            resetInactiveTopicPolicies(configuration.getBrokerDeleteInactiveTopicsMode(), configuration.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), configuration.isBrokerDeleteInactiveTopicsEnabled());
        }
        return checkReplicationAndRetryOnFailure();
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public BacklogQuota getBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType) {
        throw new UnsupportedOperationException("getBacklogQuota method is not supported on non-persistent topic");
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public boolean isBacklogQuotaExceeded(String str, BacklogQuota.BacklogQuotaType backlogQuotaType) {
        return false;
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public boolean isReplicated() {
        return this.replicators.size() > 1;
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Void> unsubscribe(String str) {
        return CompletableFuture.runAsync(() -> {
            NonPersistentSubscriptionStatsImpl stats = ((NonPersistentSubscription) this.subscriptions.remove(str)).getStats();
            this.bytesOutFromRemovedSubscriptions.add(((SubscriptionStatsImpl) stats).bytesOutCounter);
            this.msgOutFromRemovedSubscriptions.add(((SubscriptionStatsImpl) stats).msgOutCounter);
        }, this.brokerService.executor());
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public Position getLastPosition() {
        throw new UnsupportedOperationException("getLastPosition is not supported on non-persistent topic");
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<MessageId> getLastMessageId() {
        throw new UnsupportedOperationException("getLastMessageId is not supported on non-persistent topic");
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schemaData) {
        return hasSchema().thenCompose(bool -> {
            return (!bool.booleanValue() && this.producers.isEmpty() && this.subscriptions.values().stream().mapToInt(nonPersistentSubscription -> {
                return nonPersistentSubscription.getConsumers().size();
            }).sum() == 0 && ENTRIES_ADDED_COUNTER_UPDATER.get(this) == 0) ? addSchema(schemaData).thenCompose(schemaVersion -> {
                return CompletableFuture.completedFuture(null);
            }) : checkSchemaCompatibleForConsumer(schemaData);
        });
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public void publishTxnMessage(TxnID txnID, ByteBuf byteBuf, Topic.PublishContext publishContext) {
        throw new UnsupportedOperationException("PublishTxnMessage is not supported by non-persistent topic");
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Void> endTxn(TxnID txnID, int i, long j) {
        return FutureUtil.failedFuture(new Exception("Unsupported operation endTxn in non-persistent topic."));
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Void> truncate() {
        return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException("Unsupported truncate"));
    }

    @Override // org.apache.pulsar.broker.service.AbstractTopic
    protected boolean isTerminated() {
        return false;
    }
}
