/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.base.MoreObjects;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.PrecisPublishLimiter;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.PublishRateLimiter;
import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractTopic
implements Topic {
    protected static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60L;
    protected final String topic;
    protected final ConcurrentHashMap<String, Producer> producers;
    protected final BrokerService brokerService;
    protected final String replicatorPrefix;
    protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    protected volatile boolean isFenced;
    protected InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies();
    protected volatile long lastActive;
    protected volatile boolean hasBatchMessagePublished = false;
    protected StatsBuckets addEntryLatencyStatsUsec = new StatsBuckets(ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC);
    protected volatile boolean isEncryptionRequired = false;
    protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy.FULL;
    protected volatile boolean isAllowAutoUpdateSchema = true;
    protected volatile boolean schemaValidationEnforced = false;
    protected volatile int maxUnackedMessagesOnConsumer = -1;
    protected volatile PublishRateLimiter topicPublishRateLimiter;
    protected boolean preciseTopicPublishRateLimitingEnable;
    private LongAdder bytesInCounter = new LongAdder();
    private LongAdder msgInCounter = new LongAdder();
    private static final Summary PUBLISH_LATENCY = (Summary)Summary.build("pulsar_broker_publish_latency", "-").quantile(0.0).quantile(0.5).quantile(0.95).quantile(0.99).quantile(0.999).quantile(0.9999).quantile(1.0).register();
    private static final Logger log = LoggerFactory.getLogger(AbstractTopic.class);

    public AbstractTopic(String topic, BrokerService brokerService) {
        this.topic = topic;
        this.brokerService = brokerService;
        this.producers = new ConcurrentHashMap();
        this.isFenced = false;
        this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix();
        this.inactiveTopicPolicies.setDeleteWhileInactive(brokerService.pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled());
        this.inactiveTopicPolicies.setMaxInactiveDurationSeconds(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds());
        this.inactiveTopicPolicies.setInactiveTopicDeleteMode(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMode());
        this.lastActive = System.nanoTime();
        Policies policies = null;
        try {
            policies = brokerService.pulsar().getConfigurationCache().policiesCache().get(AdminResource.path("policies", TopicName.get((String)topic).getNamespace())).orElseGet(() -> new Policies());
        }
        catch (Exception e) {
            log.warn("[{}] Error getting policies {} and publish throttling will be disabled", (Object)topic, (Object)e.getMessage());
        }
        this.preciseTopicPublishRateLimitingEnable = brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable();
        this.updatePublishDispatcher(policies);
    }

    protected boolean isProducersExceeded() {
        int maxProducers;
        Policies policies;
        try {
            policies = this.brokerService.pulsar().getConfigurationCache().policiesCache().get(AdminResource.path("policies", TopicName.get((String)this.topic).getNamespace())).orElseGet(() -> new Policies());
        }
        catch (Exception e) {
            log.warn("[{}] Failed to get namespace policies that include max number of producers: {}", (Object)this.topic, (Object)e.getMessage());
            policies = new Policies();
        }
        int n = maxProducers = policies.max_producers_per_topic > 0 ? policies.max_producers_per_topic : this.brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
        return maxProducers > 0 && maxProducers <= this.producers.size();
    }

    protected boolean isConsumersExceededOnTopic() {
        int maxConsumersPerTopic;
        Policies policies;
        try {
            policies = (Policies)this.brokerService.pulsar().getConfigurationCache().policiesCache().getDataIfPresent(AdminResource.path("policies", TopicName.get((String)this.topic).getNamespace()));
            if (policies == null) {
                policies = new Policies();
            }
        }
        catch (Exception e) {
            log.warn("[{}] Failed to get namespace policies that include max number of consumers: {}", (Object)this.topic, (Object)e.getMessage());
            policies = new Policies();
        }
        int n = maxConsumersPerTopic = policies.max_consumers_per_topic > 0 ? policies.max_consumers_per_topic : this.brokerService.pulsar().getConfiguration().getMaxConsumersPerTopic();
        return maxConsumersPerTopic > 0 && maxConsumersPerTopic <= this.getNumberOfConsumers();
    }

    public abstract int getNumberOfConsumers();

    protected void addConsumerToSubscription(Subscription subscription, Consumer consumer) throws BrokerServiceException {
        if (this.isConsumersExceededOnTopic()) {
            log.warn("[{}] Attempting to add consumer to topic which reached max consumers limit", (Object)this.topic);
            throw new BrokerServiceException.ConsumerBusyException("Topic reached max consumers limit");
        }
        subscription.addConsumer(consumer);
    }

    @Override
    public void disableCnxAutoRead() {
        if (this.producers != null) {
            this.producers.values().forEach(producer -> producer.getCnx().disableCnxAutoRead());
        }
    }

    @Override
    public void enableCnxAutoRead() {
        if (this.producers != null) {
            this.producers.values().forEach(producer -> producer.getCnx().enableCnxAutoRead());
        }
    }

    protected boolean hasLocalProducers() {
        AtomicBoolean foundLocal = new AtomicBoolean(false);
        this.producers.values().forEach(producer -> {
            if (!producer.isRemote()) {
                foundLocal.set(true);
            }
        });
        return foundLocal.get();
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).add("topic", (Object)this.topic).toString();
    }

    @Override
    public Map<String, Producer> getProducers() {
        return this.producers;
    }

    public BrokerService getBrokerService() {
        return this.brokerService;
    }

    @Override
    public String getName() {
        return this.topic;
    }

    @Override
    public boolean isEncryptionRequired() {
        return this.isEncryptionRequired;
    }

    @Override
    public boolean getSchemaValidationEnforced() {
        return this.schemaValidationEnforced;
    }

    public void markBatchMessagePublished() {
        this.hasBatchMessagePublished = true;
    }

    public String getReplicatorPrefix() {
        return this.replicatorPrefix;
    }

    @Override
    public CompletableFuture<Boolean> hasSchema() {
        String base = TopicName.get((String)this.getName()).getPartitionedTopicName();
        String id = TopicName.get((String)base).getSchemaName();
        return this.brokerService.pulsar().getSchemaRegistryService().getSchema(id).thenApply(Objects::nonNull);
    }

    @Override
    public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
        if (schema == null) {
            return CompletableFuture.completedFuture(SchemaVersion.Empty);
        }
        String base = TopicName.get((String)this.getName()).getPartitionedTopicName();
        String id = TopicName.get((String)base).getSchemaName();
        SchemaRegistryService schemaRegistryService = this.brokerService.pulsar().getSchemaRegistryService();
        return this.isAllowAutoUpdateSchema ? schemaRegistryService.putSchemaIfAbsent(id, schema, this.schemaCompatibilityStrategy) : schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList -> schemaRegistryService.getSchemaVersionBySchemaData((List<SchemaRegistry.SchemaAndMetadata>)schemaAndMetadataList, schema).thenCompose(schemaVersion -> {
            if (schemaVersion == null) {
                return FutureUtil.failedFuture((Throwable)new IncompatibleSchemaException("Schema not found and schema auto updating is disabled."));
            }
            return CompletableFuture.completedFuture(schemaVersion);
        }));
    }

    @Override
    public CompletableFuture<SchemaVersion> deleteSchema() {
        String base = TopicName.get((String)this.getName()).getPartitionedTopicName();
        String id = TopicName.get((String)base).getSchemaName();
        SchemaRegistryService schemaRegistryService = this.brokerService.pulsar().getSchemaRegistryService();
        return schemaRegistryService.getSchema(id).thenCompose(schema -> {
            if (schema != null) {
                log.info("Delete schema storage of id: {}", (Object)id);
                return schemaRegistryService.deleteSchemaStorage(id);
            }
            return CompletableFuture.completedFuture(null);
        });
    }

    @Override
    public CompletableFuture<Void> checkSchemaCompatibleForConsumer(SchemaData schema) {
        String base = TopicName.get((String)this.getName()).getPartitionedTopicName();
        String id = TopicName.get((String)base).getSchemaName();
        return this.brokerService.pulsar().getSchemaRegistryService().checkConsumerCompatibility(id, schema, this.schemaCompatibilityStrategy);
    }

    @Override
    public void recordAddLatency(long latency, TimeUnit unit) {
        this.addEntryLatencyStatsUsec.addValue(unit.toMicros(latency));
        PUBLISH_LATENCY.observe(latency, unit);
    }

    protected void setSchemaCompatibilityStrategy(Policies policies) {
        this.schemaCompatibilityStrategy = policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED ? SchemaCompatibilityStrategy.fromAutoUpdatePolicy((SchemaAutoUpdateCompatibilityStrategy)policies.schema_auto_update_compatibility_strategy) : policies.schema_compatibility_strategy;
    }

    @Override
    public void checkTopicPublishThrottlingRate() {
        this.topicPublishRateLimiter.checkPublishRate();
    }

    @Override
    public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
        this.topicPublishRateLimiter.incrementPublishCount(numOfMessages, msgSizeInBytes);
        this.getBrokerPublishRateLimiter().incrementPublishCount(numOfMessages, msgSizeInBytes);
        this.bytesInCounter.add(msgSizeInBytes);
        this.msgInCounter.add(numOfMessages);
    }

    @Override
    public void resetTopicPublishCountAndEnableReadIfRequired() {
        if (!this.getBrokerPublishRateLimiter().isPublishRateExceeded() && this.topicPublishRateLimiter.resetPublishCount()) {
            this.enableProducerReadForPublishRateLimiting();
        }
    }

    @Override
    public void resetBrokerPublishCountAndEnableReadIfRequired(boolean doneBrokerReset) {
        if (!this.topicPublishRateLimiter.isPublishRateExceeded() && doneBrokerReset) {
            this.enableProducerReadForPublishRateLimiting();
        }
    }

    protected void enableProducerReadForPublishRateLimiting() {
        if (this.producers != null) {
            this.producers.values().forEach(producer -> {
                producer.getCnx().cancelPublishRateLimiting();
                producer.getCnx().enableCnxAutoRead();
            });
        }
    }

    protected void enableProducerReadForPublishBufferLimiting() {
        if (this.producers != null) {
            this.producers.values().forEach(producer -> {
                producer.getCnx().cancelPublishBufferLimiting();
                producer.getCnx().enableCnxAutoRead();
            });
        }
    }

    protected void disableProducerRead() {
        if (this.producers != null) {
            this.producers.values().forEach(producer -> producer.getCnx().disableCnxAutoRead());
        }
    }

    protected void checkTopicFenced() throws BrokerServiceException {
        if (this.isFenced) {
            log.warn("[{}] Attempting to add producer to a fenced topic", (Object)this.topic);
            throw new BrokerServiceException.TopicFencedException("Topic is temporarily unavailable");
        }
    }

    protected void internalAddProducer(Producer producer) throws BrokerServiceException {
        Producer existProducer;
        if (this.isProducersExceeded()) {
            log.warn("[{}] Attempting to add producer to topic which reached max producers limit", (Object)this.topic);
            throw new BrokerServiceException.ProducerBusyException("Topic reached max producers limit");
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] {} Got request to create producer ", (Object)this.topic, (Object)producer.getProducerName());
        }
        if ((existProducer = this.producers.putIfAbsent(producer.getProducerName(), producer)) != null) {
            this.tryOverwriteOldProducer(existProducer, producer);
        }
    }

    private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer) throws BrokerServiceException {
        boolean canOverwrite = false;
        if (oldProducer.equals(newProducer) && !this.isUserProvidedProducerName(oldProducer) && !this.isUserProvidedProducerName(newProducer) && newProducer.getEpoch() > oldProducer.getEpoch()) {
            oldProducer.close(false);
            canOverwrite = true;
        }
        if (canOverwrite) {
            if (!this.producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) {
                throw new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName() + "' replace concurrency error");
            }
        } else {
            throw new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName() + "' is already connected to topic");
        }
        this.handleProducerRemoved(oldProducer);
    }

    private boolean isUserProvidedProducerName(Producer producer) {
        return producer.isUserProvidedProducerName() && !producer.getProducerName().startsWith(this.replicatorPrefix);
    }

    protected abstract void handleProducerRemoved(Producer var1);

    @Override
    public boolean isPublishRateExceeded() {
        return this.topicPublishRateLimiter.isPublishRateExceeded() || this.getBrokerPublishRateLimiter().isPublishRateExceeded();
    }

    @Override
    public boolean isTopicPublishRateExceeded(int numberMessages, int bytes) {
        return this.preciseTopicPublishRateLimitingEnable && !this.topicPublishRateLimiter.tryAcquire(numberMessages, bytes);
    }

    @Override
    public boolean isBrokerPublishRateExceeded() {
        return this.getBrokerPublishRateLimiter().isPublishRateExceeded();
    }

    public PublishRateLimiter getTopicPublishRateLimiter() {
        return this.topicPublishRateLimiter;
    }

    public PublishRateLimiter getBrokerPublishRateLimiter() {
        return this.brokerService.getBrokerPublishRateLimiter();
    }

    public void updateMaxPublishRate(Policies policies) {
        this.updatePublishDispatcher(policies);
    }

    private void updatePublishDispatcher(Policies policies) {
        PublishRate publishRate;
        String clusterName = this.brokerService.pulsar().getConfiguration().getClusterName();
        PublishRate publishRate2 = publishRate = policies != null && policies.publishMaxMessageRate != null ? (PublishRate)policies.publishMaxMessageRate.get(clusterName) : null;
        if (publishRate != null && (publishRate.publishThrottlingRateInByte > 0L || publishRate.publishThrottlingRateInMsg > 0)) {
            log.info("Enabling publish rate limiting {} on topic {}", (Object)publishRate, (Object)this.topic);
            if (!this.preciseTopicPublishRateLimitingEnable) {
                this.brokerService.setupTopicPublishRateLimiterMonitor();
            }
            if (this.topicPublishRateLimiter == null || this.topicPublishRateLimiter == PublishRateLimiter.DISABLED_RATE_LIMITER) {
                this.topicPublishRateLimiter = this.preciseTopicPublishRateLimitingEnable ? new PrecisPublishLimiter(policies, clusterName, () -> this.enableCnxAutoRead()) : new PublishRateLimiterImpl(policies, clusterName);
            } else {
                this.topicPublishRateLimiter.update(policies, clusterName);
            }
        } else {
            log.info("Disabling publish throttling for {}", (Object)this.topic);
            this.topicPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
            this.enableProducerReadForPublishRateLimiting();
        }
    }

    public long getMsgInCounter() {
        return this.msgInCounter.longValue();
    }

    public long getBytesInCounter() {
        return this.bytesInCounter.longValue();
    }

    public long getMsgOutCounter() {
        return this.getStats((boolean)false).msgOutCounter;
    }

    public long getBytesOutCounter() {
        return this.getStats((boolean)false).bytesOutCounter;
    }

    public boolean isDeleteWhileInactive() {
        return this.inactiveTopicPolicies.isDeleteWhileInactive();
    }

    public void setDeleteWhileInactive(boolean deleteWhileInactive) {
        this.inactiveTopicPolicies.setDeleteWhileInactive(deleteWhileInactive);
    }

    public InactiveTopicPolicies getInactiveTopicPolicies() {
        return this.inactiveTopicPolicies;
    }

    public void resetInactiveTopicPolicies(InactiveTopicDeleteMode inactiveTopicDeleteMode, int maxInactiveDurationSeconds, boolean deleteWhileInactive) {
        this.inactiveTopicPolicies.setInactiveTopicDeleteMode(inactiveTopicDeleteMode);
        this.inactiveTopicPolicies.setMaxInactiveDurationSeconds(maxInactiveDurationSeconds);
        this.inactiveTopicPolicies.setDeleteWhileInactive(deleteWhileInactive);
    }
}

