/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.ToLongFunction;
import javax.annotation.Nonnull;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupPublishLimiter;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.AbstractSubscription;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.PrecisPublishLimiter;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.PublishRateLimiter;
import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPolicyListener;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PolicyHierarchyValue;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractTopic
implements Topic,
TopicPolicyListener<TopicPolicies> {
    protected static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60L;
    protected final String topic;
    protected final ConcurrentHashMap<String, Producer> producers;
    protected final BrokerService brokerService;
    protected final String replicatorPrefix;
    protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    protected volatile boolean isFenced;
    protected final HierarchyTopicPolicies topicPolicies;
    protected volatile long lastActive;
    protected volatile boolean hasBatchMessagePublished = false;
    protected StatsBuckets addEntryLatencyStatsUsec = new StatsBuckets(ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC);
    protected volatile boolean isEncryptionRequired = false;
    protected volatile Boolean isAllowAutoUpdateSchema;
    protected volatile boolean schemaValidationEnforced = false;
    protected volatile PublishRateLimiter topicPublishRateLimiter;
    private final Object topicPublishRateLimiterLock = new Object();
    protected volatile ResourceGroupPublishLimiter resourceGroupPublishLimiter;
    protected boolean preciseTopicPublishRateLimitingEnable;
    protected boolean resourceGroupRateLimitingEnabled;
    private LongAdder bytesInCounter = new LongAdder();
    private LongAdder msgInCounter = new LongAdder();
    private final LongAdder filteredEntriesCounter = new LongAdder();
    private static final AtomicLongFieldUpdater<AbstractTopic> RATE_LIMITED_UPDATER = AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, "publishRateLimitedTimes");
    protected volatile long publishRateLimitedTimes = 0L;
    private static final AtomicIntegerFieldUpdater<AbstractTopic> USER_CREATED_PRODUCER_COUNTER_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AbstractTopic.class, "userCreatedProducerCount");
    protected volatile int userCreatedProducerCount = 0;
    protected volatile Optional<Long> topicEpoch = Optional.empty();
    private volatile boolean hasExclusiveProducer;
    private volatile String exclusiveProducerName;
    private final Queue<Pair<Producer, CompletableFuture<Optional<Long>>>> waitingExclusiveProducers = new ConcurrentLinkedQueue<Pair<Producer, CompletableFuture<Optional<Long>>>>();
    private static final AtomicLongFieldUpdater<AbstractTopic> USAGE_COUNT_UPDATER = AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, "usageCount");
    private volatile long usageCount = 0L;
    protected final LongAdder msgOutFromRemovedSubscriptions = new LongAdder();
    protected final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder();
    private static final Summary PUBLISH_LATENCY = (Summary)Summary.build("pulsar_broker_publish_latency", "-").quantile(0.0).quantile(0.5).quantile(0.95).quantile(0.99).quantile(0.999).quantile(0.9999).quantile(1.0).register();
    private static final Logger log = LoggerFactory.getLogger(AbstractTopic.class);

    public AbstractTopic(String topic, BrokerService brokerService) {
        this.topic = topic;
        this.brokerService = brokerService;
        this.producers = new ConcurrentHashMap();
        this.isFenced = false;
        ServiceConfiguration config = brokerService.pulsar().getConfiguration();
        this.replicatorPrefix = config.getReplicatorPrefix();
        this.topicPolicies = new HierarchyTopicPolicies();
        this.updateTopicPolicyByBrokerConfig();
        this.lastActive = System.nanoTime();
        this.preciseTopicPublishRateLimitingEnable = config.isPreciseTopicPublishRateLimiterEnable();
    }

    public SubscribeRate getSubscribeRate() {
        return (SubscribeRate)this.topicPolicies.getSubscribeRate().get();
    }

    public DispatchRateImpl getSubscriptionDispatchRate() {
        return (DispatchRateImpl)this.topicPolicies.getSubscriptionDispatchRate().get();
    }

    public SchemaCompatibilityStrategy getSchemaCompatibilityStrategy() {
        return (SchemaCompatibilityStrategy)this.topicPolicies.getSchemaCompatibilityStrategy().get();
    }

    public DispatchRateImpl getReplicatorDispatchRate() {
        return (DispatchRateImpl)this.topicPolicies.getReplicatorDispatchRate().get();
    }

    public DispatchRateImpl getDispatchRate() {
        return (DispatchRateImpl)this.topicPolicies.getDispatchRate().get();
    }

    private SchemaCompatibilityStrategy formatSchemaCompatibilityStrategy(SchemaCompatibilityStrategy strategy) {
        return strategy == SchemaCompatibilityStrategy.UNDEFINED ? null : strategy;
    }

    protected void updateTopicPolicy(TopicPolicies data) {
        if (!this.isSystemTopic()) {
            this.topicPolicies.getReplicationClusters().updateTopicValue((Object)data.getReplicationClusters());
            this.topicPolicies.getSchemaCompatibilityStrategy().updateTopicValue((Object)this.formatSchemaCompatibilityStrategy(data.getSchemaCompatibilityStrategy()));
        }
        this.topicPolicies.getRetentionPolicies().updateTopicValue((Object)data.getRetentionPolicies());
        this.topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue((Object)data.getMaxSubscriptionsPerTopic());
        this.topicPolicies.getMaxUnackedMessagesOnConsumer().updateTopicValue((Object)data.getMaxUnackedMessagesOnConsumer());
        this.topicPolicies.getMaxUnackedMessagesOnSubscription().updateTopicValue((Object)data.getMaxUnackedMessagesOnSubscription());
        this.topicPolicies.getMaxProducersPerTopic().updateTopicValue((Object)data.getMaxProducerPerTopic());
        this.topicPolicies.getMaxConsumerPerTopic().updateTopicValue((Object)data.getMaxConsumerPerTopic());
        this.topicPolicies.getMaxConsumersPerSubscription().updateTopicValue((Object)data.getMaxConsumersPerSubscription());
        this.topicPolicies.getInactiveTopicPolicies().updateTopicValue((Object)data.getInactiveTopicPolicies());
        this.topicPolicies.getDeduplicationEnabled().updateTopicValue((Object)data.getDeduplicationEnabled());
        this.topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateTopicValue((Object)data.getDeduplicationSnapshotIntervalSeconds());
        this.topicPolicies.getSubscriptionTypesEnabled().updateTopicValue(CollectionUtils.isEmpty((Collection)data.getSubscriptionTypesEnabled()) ? null : EnumSet.copyOf(data.getSubscriptionTypesEnabled()));
        Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(type -> ((PolicyHierarchyValue)this.topicPolicies.getBackLogQuotaMap().get(type)).updateTopicValue(data.getBackLogQuotaMap() == null ? null : (BacklogQuota)data.getBackLogQuotaMap().get(type.toString())));
        this.topicPolicies.getTopicMaxMessageSize().updateTopicValue((Object)data.getMaxMessageSize());
        this.topicPolicies.getMessageTTLInSeconds().updateTopicValue((Object)data.getMessageTTLInSeconds());
        this.topicPolicies.getPublishRate().updateTopicValue((Object)PublishRate.normalize((PublishRate)data.getPublishRate()));
        this.topicPolicies.getDelayedDeliveryEnabled().updateTopicValue((Object)data.getDelayedDeliveryEnabled());
        this.topicPolicies.getReplicatorDispatchRate().updateTopicValue((Object)this.normalize(data.getReplicatorDispatchRate()));
        this.topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue((Object)data.getDelayedDeliveryTickTimeMillis());
        this.topicPolicies.getSubscribeRate().updateTopicValue((Object)SubscribeRate.normalize((SubscribeRate)data.getSubscribeRate()));
        this.topicPolicies.getSubscriptionDispatchRate().updateTopicValue((Object)this.normalize(data.getSubscriptionDispatchRate()));
        this.topicPolicies.getCompactionThreshold().updateTopicValue((Object)data.getCompactionThreshold());
        this.topicPolicies.getDispatchRate().updateTopicValue((Object)this.normalize(data.getDispatchRate()));
    }

    protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
        if (log.isDebugEnabled()) {
            log.debug("[{}]updateTopicPolicyByNamespacePolicy,data={}", (Object)this.topic, (Object)namespacePolicies);
        }
        if (namespacePolicies.deleted) {
            return;
        }
        this.topicPolicies.getRetentionPolicies().updateNamespaceValue((Object)namespacePolicies.retention_policies);
        this.topicPolicies.getCompactionThreshold().updateNamespaceValue((Object)namespacePolicies.compaction_threshold);
        this.topicPolicies.getReplicationClusters().updateNamespaceValue((Object)Lists.newArrayList((Iterable)CollectionUtils.emptyIfNull((Collection)namespacePolicies.replication_clusters)));
        this.topicPolicies.getMaxUnackedMessagesOnConsumer().updateNamespaceValue((Object)namespacePolicies.max_unacked_messages_per_consumer);
        this.topicPolicies.getMaxUnackedMessagesOnSubscription().updateNamespaceValue((Object)namespacePolicies.max_unacked_messages_per_subscription);
        this.topicPolicies.getMessageTTLInSeconds().updateNamespaceValue((Object)namespacePolicies.message_ttl_in_seconds);
        this.topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue((Object)namespacePolicies.max_subscriptions_per_topic);
        this.topicPolicies.getMaxProducersPerTopic().updateNamespaceValue((Object)namespacePolicies.max_producers_per_topic);
        this.topicPolicies.getMaxConsumerPerTopic().updateNamespaceValue((Object)namespacePolicies.max_consumers_per_topic);
        this.topicPolicies.getMaxConsumersPerSubscription().updateNamespaceValue((Object)namespacePolicies.max_consumers_per_subscription);
        this.topicPolicies.getInactiveTopicPolicies().updateNamespaceValue((Object)namespacePolicies.inactive_topic_policies);
        this.topicPolicies.getDeduplicationEnabled().updateNamespaceValue((Object)namespacePolicies.deduplicationEnabled);
        this.topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateNamespaceValue((Object)namespacePolicies.deduplicationSnapshotIntervalSeconds);
        this.updateNamespacePublishRate(namespacePolicies, this.brokerService.getPulsar().getConfig().getClusterName());
        this.topicPolicies.getDelayedDeliveryEnabled().updateNamespaceValue((Object)Optional.ofNullable(namespacePolicies.delayed_delivery_policies).map(DelayedDeliveryPolicies::isActive).orElse(null));
        this.topicPolicies.getDelayedDeliveryTickTimeMillis().updateNamespaceValue((Object)Optional.ofNullable(namespacePolicies.delayed_delivery_policies).map(DelayedDeliveryPolicies::getTickTime).orElse(null));
        this.topicPolicies.getSubscriptionTypesEnabled().updateNamespaceValue(this.subTypeStringsToEnumSet(namespacePolicies.subscription_types_enabled));
        this.updateNamespaceReplicatorDispatchRate(namespacePolicies, this.brokerService.getPulsar().getConfig().getClusterName());
        Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(type -> ((PolicyHierarchyValue)this.topicPolicies.getBackLogQuotaMap().get(type)).updateNamespaceValue((Object)((BacklogQuota)MapUtils.getObject((Map)namespacePolicies.backlog_quota_map, (Object)type))));
        this.updateNamespaceSubscribeRate(namespacePolicies, this.brokerService.getPulsar().getConfig().getClusterName());
        this.updateNamespaceSubscriptionDispatchRate(namespacePolicies, this.brokerService.getPulsar().getConfig().getClusterName());
        this.updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies);
        this.updateNamespaceDispatchRate(namespacePolicies, this.brokerService.getPulsar().getConfig().getClusterName());
    }

    private void updateNamespaceDispatchRate(Policies namespacePolicies, String cluster) {
        DispatchRateImpl dispatchRate = (DispatchRateImpl)namespacePolicies.topicDispatchRate.get(cluster);
        if (dispatchRate == null) {
            dispatchRate = (DispatchRateImpl)namespacePolicies.clusterDispatchRate.get(cluster);
        }
        this.topicPolicies.getDispatchRate().updateNamespaceValue((Object)this.normalize(dispatchRate));
    }

    private void updateNamespaceSubscribeRate(Policies namespacePolicies, String cluster) {
        this.topicPolicies.getSubscribeRate().updateNamespaceValue((Object)SubscribeRate.normalize((SubscribeRate)((SubscribeRate)namespacePolicies.clusterSubscribeRate.get(cluster))));
    }

    private void updateNamespaceSubscriptionDispatchRate(Policies namespacePolicies, String cluster) {
        this.topicPolicies.getSubscriptionDispatchRate().updateNamespaceValue((Object)this.normalize((DispatchRateImpl)namespacePolicies.subscriptionDispatchRate.get(cluster)));
    }

    private void updateNamespaceReplicatorDispatchRate(Policies namespacePolicies, String cluster) {
        this.topicPolicies.getReplicatorDispatchRate().updateNamespaceValue((Object)this.normalize((DispatchRateImpl)namespacePolicies.replicatorDispatchRate.get(cluster)));
    }

    private DispatchRateImpl normalize(DispatchRateImpl dispatchRate) {
        if (dispatchRate != null && (dispatchRate.getDispatchThrottlingRateInMsg() > 0 || dispatchRate.getDispatchThrottlingRateInByte() > 0L)) {
            return dispatchRate;
        }
        return null;
    }

    private void updateSchemaCompatibilityStrategyNamespaceValue(Policies namespacePolicies) {
        if (this.isSystemTopic()) {
            return;
        }
        SchemaCompatibilityStrategy strategy = namespacePolicies.schema_compatibility_strategy;
        if (SchemaCompatibilityStrategy.isUndefined((SchemaCompatibilityStrategy)namespacePolicies.schema_compatibility_strategy)) {
            strategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy((SchemaAutoUpdateCompatibilityStrategy)namespacePolicies.schema_auto_update_compatibility_strategy);
        }
        this.topicPolicies.getSchemaCompatibilityStrategy().updateNamespaceValue((Object)this.formatSchemaCompatibilityStrategy(strategy));
    }

    private void updateNamespacePublishRate(Policies namespacePolicies, String cluster) {
        this.topicPolicies.getPublishRate().updateNamespaceValue((Object)PublishRate.normalize(namespacePolicies.publishMaxMessageRate != null ? (PublishRate)namespacePolicies.publishMaxMessageRate.get(cluster) : null));
    }

    private void updateTopicPolicyByBrokerConfig() {
        ServiceConfiguration config = this.brokerService.pulsar().getConfiguration();
        this.topicPolicies.getInactiveTopicPolicies().updateBrokerValue((Object)new InactiveTopicPolicies(config.getBrokerDeleteInactiveTopicsMode(), config.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), config.isBrokerDeleteInactiveTopicsEnabled()));
        this.updateBrokerSubscriptionTypesEnabled();
        this.topicPolicies.getMaxSubscriptionsPerTopic().updateBrokerValue((Object)config.getMaxSubscriptionsPerTopic());
        this.topicPolicies.getMaxProducersPerTopic().updateBrokerValue((Object)config.getMaxProducersPerTopic());
        this.topicPolicies.getMaxConsumerPerTopic().updateBrokerValue((Object)config.getMaxConsumersPerTopic());
        this.topicPolicies.getMaxConsumersPerSubscription().updateBrokerValue((Object)config.getMaxConsumersPerSubscription());
        this.topicPolicies.getDeduplicationEnabled().updateBrokerValue((Object)config.isBrokerDeduplicationEnabled());
        this.topicPolicies.getRetentionPolicies().updateBrokerValue((Object)new RetentionPolicies(config.getDefaultRetentionTimeInMinutes(), config.getDefaultRetentionSizeInMB()));
        this.topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateBrokerValue((Object)config.getBrokerDeduplicationSnapshotIntervalSeconds());
        this.topicPolicies.getMaxUnackedMessagesOnConsumer().updateBrokerValue((Object)config.getMaxUnackedMessagesPerConsumer());
        this.topicPolicies.getMaxUnackedMessagesOnSubscription().updateBrokerValue((Object)config.getMaxUnackedMessagesPerSubscription());
        ((PolicyHierarchyValue)this.topicPolicies.getBackLogQuotaMap().get(BacklogQuota.BacklogQuotaType.destination_storage)).updateBrokerValue((Object)this.brokerService.getBacklogQuotaManager().getDefaultQuota());
        ((PolicyHierarchyValue)this.topicPolicies.getBackLogQuotaMap().get(BacklogQuota.BacklogQuotaType.message_age)).updateBrokerValue((Object)this.brokerService.getBacklogQuotaManager().getDefaultQuota());
        this.topicPolicies.getTopicMaxMessageSize().updateBrokerValue((Object)config.getMaxMessageSize());
        this.topicPolicies.getMessageTTLInSeconds().updateBrokerValue((Object)config.getTtlDurationDefaultInSeconds());
        this.topicPolicies.getPublishRate().updateBrokerValue((Object)this.publishRateInBroker(config));
        this.topicPolicies.getDelayedDeliveryEnabled().updateBrokerValue((Object)config.isDelayedDeliveryEnabled());
        this.topicPolicies.getDelayedDeliveryTickTimeMillis().updateBrokerValue((Object)config.getDelayedDeliveryTickTimeMillis());
        this.topicPolicies.getCompactionThreshold().updateBrokerValue((Object)config.getBrokerServiceCompactionThresholdInBytes());
        this.topicPolicies.getReplicationClusters().updateBrokerValue(Collections.emptyList());
        SchemaCompatibilityStrategy schemaCompatibilityStrategy = config.getSchemaCompatibilityStrategy();
        this.topicPolicies.getReplicatorDispatchRate().updateBrokerValue((Object)this.replicatorDispatchRateInBroker(config));
        if (this.isSystemTopic()) {
            schemaCompatibilityStrategy = config.getSystemTopicSchemaCompatibilityStrategy();
        }
        this.topicPolicies.getSubscribeRate().updateBrokerValue((Object)this.subscribeRateInBroker(config));
        this.topicPolicies.getSubscriptionDispatchRate().updateBrokerValue((Object)this.subscriptionDispatchRateInBroker(config));
        this.topicPolicies.getSchemaCompatibilityStrategy().updateBrokerValue((Object)this.formatSchemaCompatibilityStrategy(schemaCompatibilityStrategy));
        this.topicPolicies.getDispatchRate().updateBrokerValue((Object)this.dispatchRateInBroker(config));
    }

    private DispatchRateImpl dispatchRateInBroker(ServiceConfiguration config) {
        return DispatchRateImpl.builder().dispatchThrottlingRateInMsg(config.getDispatchThrottlingRatePerTopicInMsg()).dispatchThrottlingRateInByte(config.getDispatchThrottlingRatePerTopicInByte()).ratePeriodInSecond(1).build();
    }

    private SubscribeRate subscribeRateInBroker(ServiceConfiguration config) {
        return new SubscribeRate(config.getSubscribeThrottlingRatePerConsumer(), config.getSubscribeRatePeriodPerConsumerInSecond());
    }

    private DispatchRateImpl subscriptionDispatchRateInBroker(ServiceConfiguration config) {
        return DispatchRateImpl.builder().dispatchThrottlingRateInMsg(config.getDispatchThrottlingRatePerSubscriptionInMsg()).dispatchThrottlingRateInByte(config.getDispatchThrottlingRatePerSubscriptionInByte()).ratePeriodInSecond(1).build();
    }

    private DispatchRateImpl replicatorDispatchRateInBroker(ServiceConfiguration config) {
        return DispatchRateImpl.builder().dispatchThrottlingRateInMsg(config.getDispatchThrottlingRatePerReplicatorInMsg()).dispatchThrottlingRateInByte(config.getDispatchThrottlingRatePerReplicatorInByte()).ratePeriodInSecond(1).build();
    }

    private EnumSet<CommandSubscribe.SubType> subTypeStringsToEnumSet(Set<String> getSubscriptionTypesEnabled) {
        EnumSet<CommandSubscribe.SubType> subTypes = EnumSet.noneOf(CommandSubscribe.SubType.class);
        for (String subTypeStr : CollectionUtils.emptyIfNull(getSubscriptionTypesEnabled)) {
            try {
                CommandSubscribe.SubType subType = CommandSubscribe.SubType.valueOf((String)subTypeStr);
                subTypes.add(subType);
            }
            catch (Throwable throwable) {}
        }
        if (subTypes.isEmpty()) {
            return null;
        }
        return subTypes;
    }

    private PublishRate publishRateInBroker(ServiceConfiguration config) {
        return new PublishRate(config.getMaxPublishRatePerTopicInMessages(), config.getMaxPublishRatePerTopicInBytes());
    }

    protected boolean isProducersExceeded(Producer producer) {
        if (this.isSystemTopic() || producer.isRemote()) {
            return false;
        }
        Integer maxProducers = (Integer)this.topicPolicies.getMaxProducersPerTopic().get();
        return maxProducers != null && maxProducers > 0 && maxProducers <= USER_CREATED_PRODUCER_COUNTER_UPDATER.get(this);
    }

    protected void registerTopicPolicyListener() {
        if (this.brokerService.pulsar().getConfig().isSystemTopicEnabled() && this.brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
            this.brokerService.getPulsar().getTopicPoliciesService().registerListener(TopicName.getPartitionedTopicName((String)this.topic), this);
        }
    }

    protected void unregisterTopicPolicyListener() {
        if (this.brokerService.pulsar().getConfig().isSystemTopicEnabled() && this.brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
            this.brokerService.getPulsar().getTopicPoliciesService().unregisterListener(TopicName.getPartitionedTopicName((String)this.topic), this);
        }
    }

    protected boolean isSameAddressProducersExceeded(Producer producer) {
        if (this.isSystemTopic() || producer.isRemote()) {
            return false;
        }
        int maxSameAddressProducers = this.brokerService.pulsar().getConfiguration().getMaxSameAddressProducersPerTopic();
        return maxSameAddressProducers > 0 && this.getNumberOfSameAddressProducers(producer.getClientAddress()) >= maxSameAddressProducers;
    }

    public int getNumberOfSameAddressProducers(String clientAddress) {
        int count = 0;
        if (clientAddress != null) {
            for (Producer producer : this.producers.values()) {
                if (!clientAddress.equals(producer.getClientAddress())) continue;
                ++count;
            }
        }
        return count;
    }

    protected boolean isConsumersExceededOnTopic() {
        if (this.isSystemTopic()) {
            return false;
        }
        Integer maxConsumersPerTopic = (Integer)this.topicPolicies.getMaxConsumerPerTopic().get();
        return maxConsumersPerTopic != null && maxConsumersPerTopic > 0 && maxConsumersPerTopic <= this.getNumberOfConsumers();
    }

    protected boolean isSameAddressConsumersExceededOnTopic(Consumer consumer) {
        if (this.isSystemTopic()) {
            return false;
        }
        int maxSameAddressConsumers = this.brokerService.pulsar().getConfiguration().getMaxSameAddressConsumersPerTopic();
        return maxSameAddressConsumers > 0 && this.getNumberOfSameAddressConsumers(consumer.getClientAddress()) >= maxSameAddressConsumers;
    }

    public abstract int getNumberOfConsumers();

    public abstract int getNumberOfSameAddressConsumers(String var1);

    protected int getNumberOfSameAddressConsumers(String clientAddress, List<? extends Subscription> subscriptions) {
        int count = 0;
        if (clientAddress != null) {
            for (Subscription subscription : subscriptions) {
                count += subscription.getNumberOfSameAddressConsumers(clientAddress);
            }
        }
        return count;
    }

    protected CompletableFuture<Void> addConsumerToSubscription(Subscription subscription, Consumer consumer) {
        if (this.isConsumersExceededOnTopic()) {
            log.warn("[{}] Attempting to add consumer to topic which reached max consumers limit", (Object)this.topic);
            return FutureUtil.failedFuture((Throwable)new BrokerServiceException.ConsumerBusyException("Topic reached max consumers limit"));
        }
        if (this.isSameAddressConsumersExceededOnTopic(consumer)) {
            log.warn("[{}] Attempting to add consumer to topic which reached max same address consumers limit", (Object)this.topic);
            return FutureUtil.failedFuture((Throwable)new BrokerServiceException.ConsumerBusyException("Topic reached max same address consumers limit"));
        }
        return subscription.addConsumer(consumer);
    }

    protected Consumer getActiveConsumer(Subscription subscription) {
        Dispatcher dispatcher = subscription.getDispatcher();
        if (dispatcher instanceof AbstractDispatcherSingleActiveConsumer) {
            return ((AbstractDispatcherSingleActiveConsumer)dispatcher).getActiveConsumer();
        }
        return null;
    }

    @Override
    public void disableCnxAutoRead() {
        this.producers.values().forEach(producer -> producer.getCnx().disableCnxAutoRead());
    }

    @Override
    public void enableCnxAutoRead() {
        this.producers.values().forEach(producer -> producer.getCnx().enableCnxAutoRead());
    }

    protected boolean hasLocalProducers() {
        if (this.producers.isEmpty()) {
            return false;
        }
        for (Producer producer : this.producers.values()) {
            if (producer.isRemote()) continue;
            return true;
        }
        return false;
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).add("topic", (Object)this.topic).toString();
    }

    @Override
    public Map<String, Producer> getProducers() {
        return this.producers;
    }

    @Override
    public BrokerService getBrokerService() {
        return this.brokerService;
    }

    @Override
    public String getName() {
        return this.topic;
    }

    @Override
    public boolean isEncryptionRequired() {
        return this.isEncryptionRequired;
    }

    @Override
    public boolean getSchemaValidationEnforced() {
        return this.schemaValidationEnforced;
    }

    public void markBatchMessagePublished() {
        this.hasBatchMessagePublished = true;
    }

    public String getReplicatorPrefix() {
        return this.replicatorPrefix;
    }

    @Override
    public CompletableFuture<Boolean> hasSchema() {
        String base = TopicName.get((String)this.getName()).getPartitionedTopicName();
        String id = TopicName.get((String)base).getSchemaName();
        return this.brokerService.pulsar().getSchemaRegistryService().getSchema(id).thenApply(Objects::nonNull);
    }

    @Override
    public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
        if (schema == null) {
            return CompletableFuture.completedFuture(SchemaVersion.Empty);
        }
        String base = TopicName.get((String)this.getName()).getPartitionedTopicName();
        String id = TopicName.get((String)base).getSchemaName();
        SchemaRegistryService schemaRegistryService = this.brokerService.pulsar().getSchemaRegistryService();
        if (this.allowAutoUpdateSchema()) {
            return schemaRegistryService.putSchemaIfAbsent(id, schema, this.getSchemaCompatibilityStrategy());
        }
        return schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList -> schemaRegistryService.getSchemaVersionBySchemaData((List<SchemaRegistry.SchemaAndMetadata>)schemaAndMetadataList, schema).thenCompose(schemaVersion -> {
            if (schemaVersion == null) {
                return FutureUtil.failedFuture((Throwable)new IncompatibleSchemaException("Schema not found and schema auto updating is disabled."));
            }
            return CompletableFuture.completedFuture(schemaVersion);
        }));
    }

    private boolean allowAutoUpdateSchema() {
        if (this.brokerService.isSystemTopic(this.topic)) {
            return true;
        }
        if (this.isAllowAutoUpdateSchema == null) {
            return this.brokerService.pulsar().getConfig().isAllowAutoUpdateSchemaEnabled();
        }
        return this.isAllowAutoUpdateSchema;
    }

    @Override
    public CompletableFuture<SchemaVersion> deleteSchema() {
        String base = TopicName.get((String)this.getName()).getPartitionedTopicName();
        String id = TopicName.get((String)base).getSchemaName();
        SchemaRegistryService schemaRegistryService = this.brokerService.pulsar().getSchemaRegistryService();
        return BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id)).thenCompose(schema -> {
            if (schema != null) {
                log.info("Delete schema storage of id: {}", (Object)id);
                return schemaRegistryService.deleteSchemaStorage(id);
            }
            return CompletableFuture.completedFuture(null);
        });
    }

    @Override
    public CompletableFuture<Void> checkSchemaCompatibleForConsumer(SchemaData schema) {
        String base = TopicName.get((String)this.getName()).getPartitionedTopicName();
        String id = TopicName.get((String)base).getSchemaName();
        return this.brokerService.pulsar().getSchemaRegistryService().checkConsumerCompatibility(id, schema, this.getSchemaCompatibilityStrategy());
    }

    @Override
    public CompletableFuture<Optional<Long>> addProducer(Producer producer, CompletableFuture<Void> producerQueuedFuture) {
        Preconditions.checkArgument((producer.getTopic() == this ? 1 : 0) != 0);
        return ((CompletableFuture)this.brokerService.checkTopicNsOwnership(this.getName()).thenCompose(__ -> this.incrementTopicEpochIfNeeded(producer, producerQueuedFuture))).thenCompose(producerEpoch -> {
            this.lock.writeLock().lock();
            try {
                this.checkTopicFenced();
                if (this.isTerminated()) {
                    log.warn("[{}] Attempting to add producer to a terminated topic", (Object)this.topic);
                    throw new BrokerServiceException.TopicTerminatedException("Topic was already terminated");
                }
                CompletionStage completionStage = this.internalAddProducer(producer).thenApply(ignore -> {
                    USAGE_COUNT_UPDATER.incrementAndGet(this);
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] Added producer -- count: {}", new Object[]{this.topic, producer.getProducerName(), USAGE_COUNT_UPDATER.get(this)});
                    }
                    return producerEpoch;
                });
                return completionStage;
            }
            catch (BrokerServiceException e) {
                CompletableFuture completableFuture = FutureUtil.failedFuture((Throwable)e);
                return completableFuture;
            }
            finally {
                this.lock.writeLock().unlock();
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer producer, CompletableFuture<Void> producerQueuedFuture) {
        this.lock.writeLock().lock();
        try {
            switch (producer.getAccessMode()) {
                case Shared: {
                    if (this.hasExclusiveProducer || !this.waitingExclusiveProducers.isEmpty()) {
                        CompletableFuture completableFuture = FutureUtil.failedFuture((Throwable)new BrokerServiceException.ProducerBusyException("Topic has an existing exclusive producer: " + this.exclusiveProducerName));
                        return completableFuture;
                    }
                    CompletableFuture<Optional<Long>> completableFuture = CompletableFuture.completedFuture(this.topicEpoch);
                    return completableFuture;
                }
                case Exclusive: {
                    if (this.hasExclusiveProducer || !this.waitingExclusiveProducers.isEmpty()) {
                        CompletableFuture completableFuture = FutureUtil.failedFuture((Throwable)new BrokerServiceException.ProducerFencedException("Topic has an existing exclusive producer: " + this.exclusiveProducerName));
                        return completableFuture;
                    }
                    if (!this.producers.isEmpty()) {
                        CompletableFuture completableFuture = FutureUtil.failedFuture((Throwable)new BrokerServiceException.ProducerFencedException("Topic has existing shared producers"));
                        return completableFuture;
                    }
                    if (producer.getTopicEpoch().isPresent() && producer.getTopicEpoch().get() < this.topicEpoch.orElse(-1L)) {
                        CompletableFuture completableFuture = FutureUtil.failedFuture((Throwable)new BrokerServiceException.ProducerFencedException(String.format("Topic epoch has already moved. Current epoch: %d, Producer epoch: %d", this.topicEpoch.get(), producer.getTopicEpoch().get())));
                        return completableFuture;
                    }
                    this.hasExclusiveProducer = true;
                    this.exclusiveProducerName = producer.getProducerName();
                    CompletableFuture<Long> future = producer.getTopicEpoch().isPresent() ? this.setTopicEpoch(producer.getTopicEpoch().get()) : this.incrementTopicEpoch(this.topicEpoch);
                    future.exceptionally(ex -> {
                        this.hasExclusiveProducer = false;
                        this.exclusiveProducerName = null;
                        return null;
                    });
                    CompletionStage completionStage = future.thenApply(epoch -> {
                        this.topicEpoch = Optional.of(epoch);
                        return this.topicEpoch;
                    });
                    return completionStage;
                }
                case WaitForExclusive: {
                    if (this.hasExclusiveProducer || !this.producers.isEmpty()) {
                        CompletableFuture<Optional<Long>> future = new CompletableFuture<Optional<Long>>();
                        log.info("[{}] Queuing producer {} since there's already a producer", (Object)this.topic, (Object)producer);
                        this.waitingExclusiveProducers.add((Pair<Producer, CompletableFuture<Optional<Long>>>)Pair.of((Object)producer, future));
                        producerQueuedFuture.complete(null);
                        CompletableFuture<Optional<Long>> completableFuture = future;
                        return completableFuture;
                    }
                    if (producer.getTopicEpoch().isPresent() && producer.getTopicEpoch().get() < this.topicEpoch.orElse(-1L)) {
                        CompletableFuture future = FutureUtil.failedFuture((Throwable)new BrokerServiceException.ProducerFencedException(String.format("Topic epoch has already moved. Current epoch: %d, Producer epoch: %d", this.topicEpoch.get(), producer.getTopicEpoch().get())));
                        return future;
                    }
                    this.hasExclusiveProducer = true;
                    this.exclusiveProducerName = producer.getProducerName();
                    CompletableFuture<Long> future = producer.getTopicEpoch().isPresent() ? this.setTopicEpoch(producer.getTopicEpoch().get()) : this.incrementTopicEpoch(this.topicEpoch);
                    future.exceptionally(ex -> {
                        this.hasExclusiveProducer = false;
                        this.exclusiveProducerName = null;
                        return null;
                    });
                    CompletionStage completionStage = future.thenApply(epoch -> {
                        this.topicEpoch = Optional.of(epoch);
                        return this.topicEpoch;
                    });
                    return completionStage;
                }
            }
            CompletableFuture future = FutureUtil.failedFuture((Throwable)new BrokerServiceException("Invalid producer access mode: " + producer.getAccessMode()));
            return future;
        }
        catch (Exception e) {
            log.error("Encountered unexpected error during exclusive producer creation", (Throwable)e);
            CompletableFuture completableFuture = FutureUtil.failedFuture((Throwable)new BrokerServiceException(e));
            return completableFuture;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    protected abstract CompletableFuture<Long> setTopicEpoch(long var1);

    protected abstract CompletableFuture<Long> incrementTopicEpoch(Optional<Long> var1);

    @Override
    public void recordAddLatency(long latency, TimeUnit unit) {
        this.addEntryLatencyStatsUsec.addValue(unit.toMicros(latency));
        PUBLISH_LATENCY.observe(latency, unit);
    }

    @Override
    public long increasePublishLimitedTimes() {
        return RATE_LIMITED_UPDATER.incrementAndGet(this);
    }

    @Override
    public void checkTopicPublishThrottlingRate() {
        this.topicPublishRateLimiter.checkPublishRate();
    }

    @Override
    public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
        this.topicPublishRateLimiter.incrementPublishCount(numOfMessages, msgSizeInBytes);
        this.getBrokerPublishRateLimiter().incrementPublishCount(numOfMessages, msgSizeInBytes);
        this.bytesInCounter.add(msgSizeInBytes);
        this.msgInCounter.add(numOfMessages);
    }

    @Override
    public void resetTopicPublishCountAndEnableReadIfRequired() {
        if (!this.getBrokerPublishRateLimiter().isPublishRateExceeded() && this.topicPublishRateLimiter.resetPublishCount()) {
            this.enableProducerReadForPublishRateLimiting();
        }
    }

    public void updateDispatchRateLimiter() {
    }

    @Override
    public void resetBrokerPublishCountAndEnableReadIfRequired(boolean doneBrokerReset) {
        if (!this.topicPublishRateLimiter.isPublishRateExceeded() && doneBrokerReset) {
            this.enableProducerReadForPublishRateLimiting();
        }
    }

    protected void enableProducerReadForPublishRateLimiting() {
        if (this.producers != null) {
            this.producers.values().forEach(producer -> {
                producer.getCnx().cancelPublishRateLimiting();
                producer.getCnx().enableCnxAutoRead();
            });
        }
    }

    protected void enableProducerReadForPublishBufferLimiting() {
        if (this.producers != null) {
            this.producers.values().forEach(producer -> {
                producer.getCnx().cancelPublishBufferLimiting();
                producer.getCnx().enableCnxAutoRead();
            });
        }
    }

    protected void disableProducerRead() {
        if (this.producers != null) {
            this.producers.values().forEach(producer -> producer.getCnx().disableCnxAutoRead());
        }
    }

    protected void checkTopicFenced() throws BrokerServiceException {
        if (this.isFenced) {
            log.warn("[{}] Attempting to add producer to a fenced topic", (Object)this.topic);
            throw new BrokerServiceException.TopicFencedException("Topic is temporarily unavailable");
        }
    }

    protected CompletableFuture<Void> internalAddProducer(Producer producer) {
        Producer existProducer;
        if (this.isProducersExceeded(producer)) {
            log.warn("[{}] Attempting to add producer to topic which reached max producers limit", (Object)this.topic);
            CompletableFuture<Void> res = new CompletableFuture<Void>();
            res.completeExceptionally(new BrokerServiceException.ProducerBusyException("Topic reached max producers limit"));
            return res;
        }
        if (this.isSameAddressProducersExceeded(producer)) {
            log.warn("[{}] Attempting to add producer to topic which reached max same address producers limit", (Object)this.topic);
            CompletableFuture<Void> res = new CompletableFuture<Void>();
            res.completeExceptionally(new BrokerServiceException.ProducerBusyException("Topic reached max same address producers limit"));
            return res;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] {} Got request to create producer ", (Object)this.topic, (Object)producer.getProducerName());
        }
        if ((existProducer = this.producers.putIfAbsent(producer.getProducerName(), producer)) != null) {
            return this.tryOverwriteOldProducer(existProducer, producer);
        }
        if (!producer.isRemote()) {
            USER_CREATED_PRODUCER_COUNTER_UPDATER.incrementAndGet(this);
        }
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> tryOverwriteOldProducer(Producer oldProducer, Producer newProducer) {
        if (newProducer.isSuccessorTo(oldProducer)) {
            oldProducer.close(false);
            if (!this.producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) {
                CompletableFuture<Void> res = new CompletableFuture<Void>();
                res.completeExceptionally(new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName() + "' replace concurrency error"));
                return res;
            }
            this.handleProducerRemoved(oldProducer);
            return CompletableFuture.completedFuture(null);
        }
        if (!Objects.equals(oldProducer.getCnx(), newProducer.getCnx())) {
            return oldProducer.getCnx().checkConnectionLiveness().thenCompose(previousIsActive -> {
                if (previousIsActive.booleanValue()) {
                    CompletableFuture res = new CompletableFuture();
                    res.completeExceptionally(new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName() + "' is already connected to topic"));
                    return res;
                }
                return this.internalAddProducer(newProducer);
            });
        }
        CompletableFuture<Void> res = new CompletableFuture<Void>();
        res.completeExceptionally(new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName() + "' is already connected to topic"));
        return res;
    }

    private boolean isUserProvidedProducerName(Producer producer) {
        return producer.isUserProvidedProducerName() && !producer.getProducerName().startsWith(this.replicatorPrefix);
    }

    @Override
    public void removeProducer(Producer producer) {
        Preconditions.checkArgument((producer.getTopic() == this ? 1 : 0) != 0);
        if (this.producers.remove(producer.getProducerName(), producer)) {
            if (!producer.isRemote()) {
                USER_CREATED_PRODUCER_COUNTER_UPDATER.decrementAndGet(this);
            }
            this.handleProducerRemoved(producer);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleProducerRemoved(Producer producer) {
        USAGE_COUNT_UPDATER.decrementAndGet(this);
        if (this.hasExclusiveProducer) {
            this.lock.writeLock().lock();
            try {
                this.hasExclusiveProducer = false;
                this.exclusiveProducerName = null;
                Pair<Producer, CompletableFuture<Optional<Long>>> nextWaitingProducer = this.waitingExclusiveProducers.poll();
                if (nextWaitingProducer != null) {
                    Producer nextProducer = (Producer)nextWaitingProducer.getKey();
                    CompletableFuture producerFuture = (CompletableFuture)nextWaitingProducer.getValue();
                    this.hasExclusiveProducer = true;
                    this.exclusiveProducerName = nextProducer.getProducerName();
                    CompletableFuture<Long> future = nextProducer.getTopicEpoch().isPresent() ? this.setTopicEpoch(nextProducer.getTopicEpoch().get()) : this.incrementTopicEpoch(this.topicEpoch);
                    ((CompletableFuture)future.thenAccept(epoch -> {
                        this.topicEpoch = Optional.of(epoch);
                        producerFuture.complete(this.topicEpoch);
                    })).exceptionally(ex -> {
                        this.hasExclusiveProducer = false;
                        this.exclusiveProducerName = null;
                        producerFuture.completeExceptionally((Throwable)ex);
                        return null;
                    });
                }
            }
            finally {
                this.lock.writeLock().unlock();
            }
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] Removed producer -- count: {}", new Object[]{this.topic, producer.getProducerName(), USAGE_COUNT_UPDATER.get(this)});
        }
        this.lastActive = System.nanoTime();
    }

    public void handleConsumerAdded(String subscriptionName, String consumerName) {
        USAGE_COUNT_UPDATER.incrementAndGet(this);
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] [{}] Added consumer -- count: {}", new Object[]{this.topic, subscriptionName, consumerName, USAGE_COUNT_UPDATER.get(this)});
        }
    }

    public void decrementUsageCount() {
        USAGE_COUNT_UPDATER.decrementAndGet(this);
    }

    public long currentUsageCount() {
        return this.usageCount;
    }

    @Override
    public boolean isPublishRateExceeded() {
        return this.topicPublishRateLimiter.isPublishRateExceeded() || this.getBrokerPublishRateLimiter().isPublishRateExceeded();
    }

    @Override
    public boolean isResourceGroupPublishRateExceeded(int numMessages, int bytes) {
        return this.resourceGroupRateLimitingEnabled && !this.resourceGroupPublishLimiter.tryAcquire(numMessages, bytes);
    }

    @Override
    public boolean isResourceGroupRateLimitingEnabled() {
        return this.resourceGroupRateLimitingEnabled;
    }

    @Override
    public boolean isTopicPublishRateExceeded(int numberMessages, int bytes) {
        return this.preciseTopicPublishRateLimitingEnable && !this.topicPublishRateLimiter.tryAcquire(numberMessages, bytes);
    }

    @Override
    public boolean isBrokerPublishRateExceeded() {
        return this.getBrokerPublishRateLimiter().isPublishRateExceeded();
    }

    public PublishRateLimiter getTopicPublishRateLimiter() {
        return this.topicPublishRateLimiter;
    }

    public PublishRateLimiter getBrokerPublishRateLimiter() {
        return this.brokerService.getBrokerPublishRateLimiter();
    }

    @Deprecated
    public void updateResourceGroupLimiter(Optional<Policies> optPolicies) {
        Policies policies;
        try {
            policies = optPolicies.orElseGet(() -> this.brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(TopicName.get((String)this.topic).getNamespaceObject()).orElseGet(Policies::new));
        }
        catch (Exception e) {
            log.warn("[{}] Error getting policies {} and publish throttling will be disabled", (Object)this.topic, (Object)e.getMessage());
            policies = new Policies();
        }
        this.updateResourceGroupLimiter(policies);
    }

    public void updateResourceGroupLimiter(@Nonnull Policies namespacePolicies) {
        Objects.requireNonNull(namespacePolicies);
        String rgName = namespacePolicies.resource_group_name;
        if (rgName != null) {
            ResourceGroup resourceGroup = this.brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName);
            if (resourceGroup != null) {
                this.resourceGroupRateLimitingEnabled = true;
                this.resourceGroupPublishLimiter = resourceGroup.getResourceGroupPublishLimiter();
                this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(), this::enableCnxAutoRead);
                log.info("Using resource group {} rate limiter for topic {}", (Object)rgName, (Object)this.topic);
                return;
            }
        } else {
            if (this.resourceGroupRateLimitingEnabled) {
                this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName());
                this.resourceGroupPublishLimiter = null;
                this.resourceGroupRateLimitingEnabled = false;
            }
            this.enableProducerReadForPublishRateLimiting();
        }
    }

    public long getMsgInCounter() {
        return this.msgInCounter.longValue();
    }

    public long getBytesInCounter() {
        return this.bytesInCounter.longValue();
    }

    public long getMsgOutCounter() {
        return this.msgOutFromRemovedSubscriptions.longValue() + this.sumSubscriptions(AbstractSubscription::getMsgOutCounter);
    }

    public long getBytesOutCounter() {
        return this.bytesOutFromRemovedSubscriptions.longValue() + this.sumSubscriptions(AbstractSubscription::getBytesOutCounter);
    }

    private long sumSubscriptions(ToLongFunction<AbstractSubscription> toCounter) {
        return this.getSubscriptions().values().stream().map(AbstractSubscription.class::cast).mapToLong(toCounter).sum();
    }

    public boolean isDeleteWhileInactive() {
        return ((InactiveTopicPolicies)this.topicPolicies.getInactiveTopicPolicies().get()).isDeleteWhileInactive();
    }

    public boolean deletePartitionedTopicMetadataWhileInactive() {
        return this.brokerService.pulsar().getConfiguration().isBrokerDeleteInactivePartitionedTopicMetadataEnabled();
    }

    protected abstract boolean isTerminated();

    public InactiveTopicPolicies getInactiveTopicPolicies() {
        return (InactiveTopicPolicies)this.topicPolicies.getInactiveTopicPolicies().get();
    }

    public Optional<TopicPolicies> getTopicPolicies() {
        return this.brokerService.getTopicPolicies(TopicName.get((String)this.topic));
    }

    public CompletableFuture<Void> deleteTopicPolicies() {
        return this.brokerService.deleteTopicPolicies(TopicName.get((String)this.topic));
    }

    protected int getWaitingProducersCount() {
        return this.waitingExclusiveProducers.size();
    }

    protected boolean isExceedMaximumMessageSize(int size, Topic.PublishContext publishContext) {
        if (publishContext.isChunked()) {
            return false;
        }
        int topicMaxMessageSize = (Integer)this.topicPolicies.getTopicMaxMessageSize().get();
        if (topicMaxMessageSize <= 0) {
            return false;
        }
        if (topicMaxMessageSize >= this.brokerService.pulsar().getConfiguration().getMaxMessageSize()) {
            return false;
        }
        return size > topicMaxMessageSize;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updatePublishRateLimiter() {
        Object object = this.topicPublishRateLimiterLock;
        synchronized (object) {
            PublishRate publishRate = (PublishRate)this.topicPolicies.getPublishRate().get();
            if (publishRate.publishThrottlingRateInByte > 0L || publishRate.publishThrottlingRateInMsg > 0) {
                log.info("Enabling publish rate limiting {} ", (Object)publishRate);
                if (!this.preciseTopicPublishRateLimitingEnable) {
                    this.brokerService.setupTopicPublishRateLimiterMonitor();
                }
                if (this.topicPublishRateLimiter == null || this.topicPublishRateLimiter == PublishRateLimiter.DISABLED_RATE_LIMITER) {
                    this.topicPublishRateLimiter = this.preciseTopicPublishRateLimitingEnable ? new PrecisPublishLimiter(publishRate, () -> this.enableCnxAutoRead(), this.brokerService.pulsar().getExecutor()) : new PublishRateLimiterImpl(publishRate);
                } else {
                    this.topicPublishRateLimiter.update(publishRate);
                }
            } else {
                log.info("Disabling publish throttling for {}", (Object)this.topic);
                if (this.topicPublishRateLimiter != null) {
                    this.topicPublishRateLimiter.close();
                }
                this.topicPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
                this.enableProducerReadForPublishRateLimiting();
            }
        }
    }

    public void updateBrokerSubscriptionTypesEnabled() {
        this.topicPolicies.getSubscriptionTypesEnabled().updateBrokerValue(this.subTypeStringsToEnumSet(this.brokerService.pulsar().getConfiguration().getSubscriptionTypesEnabled()));
    }

    @Override
    public HierarchyTopicPolicies getHierarchyTopicPolicies() {
        return this.topicPolicies;
    }

    public void updateBrokerSubscriptionDispatchRate() {
        this.topicPolicies.getSubscriptionDispatchRate().updateBrokerValue((Object)this.subscriptionDispatchRateInBroker(this.brokerService.pulsar().getConfiguration()));
    }

    public void addFilteredEntriesCount(int filtered) {
        this.filteredEntriesCounter.add(filtered);
    }

    public long getFilteredEntriesCount() {
        return this.filteredEntriesCounter.longValue();
    }

    public void updateBrokerReplicatorDispatchRate() {
        this.topicPolicies.getReplicatorDispatchRate().updateBrokerValue((Object)this.replicatorDispatchRateInBroker(this.brokerService.pulsar().getConfiguration()));
    }

    public void updateBrokerDispatchRate() {
        this.topicPolicies.getDispatchRate().updateBrokerValue((Object)this.dispatchRateInBroker(this.brokerService.pulsar().getConfiguration()));
    }

    public void updateBrokerPublishRate() {
        this.topicPolicies.getPublishRate().updateBrokerValue((Object)this.publishRateInBroker(this.brokerService.pulsar().getConfiguration()));
    }

    public void updateBrokerSubscribeRate() {
        this.topicPolicies.getSubscribeRate().updateBrokerValue((Object)this.subscribeRateInBroker(this.brokerService.pulsar().getConfiguration()));
    }
}

