/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BacklogQuotaManager {
    private static final Logger log = LoggerFactory.getLogger(BacklogQuotaManager.class);
    private final BacklogQuota defaultQuota;
    private final ZooKeeperDataCache<Policies> zkCache;

    public BacklogQuotaManager(PulsarService pulsar) {
        this.defaultQuota = new BacklogQuota(pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB() * 1024L * 1024L * 1024L, pulsar.getConfiguration().getBacklogQuotaDefaultRetentionPolicy());
        this.zkCache = pulsar.getConfigurationCache().policiesCache();
    }

    public BacklogQuota getDefaultQuota() {
        return this.defaultQuota;
    }

    public BacklogQuota getBacklogQuota(String namespace, String policyPath) {
        try {
            return this.zkCache.get(policyPath).map(p -> p.backlog_quota_map.getOrDefault(BacklogQuota.BacklogQuotaType.destination_storage, this.defaultQuota)).orElse(this.defaultQuota);
        }
        catch (Exception e) {
            log.error(String.format("Failed to read policies data, will apply the default backlog quota: namespace=%s", namespace), (Throwable)e);
            return this.defaultQuota;
        }
    }

    public long getBacklogQuotaLimit(String namespace) {
        String policyPath = AdminResource.path("policies", namespace);
        return this.getBacklogQuota(namespace, policyPath).getLimit();
    }

    public void handleExceededBacklogQuota(PersistentTopic persistentTopic) {
        TopicName topicName = TopicName.get((String)persistentTopic.getName());
        String namespace = topicName.getNamespace();
        String policyPath = AdminResource.path("policies", namespace);
        BacklogQuota quota = this.getBacklogQuota(namespace, policyPath);
        log.info("Backlog quota exceeded for topic [{}]. Applying [{}] policy", (Object)persistentTopic.getName(), (Object)quota.getPolicy());
        switch (quota.getPolicy()) {
            case consumer_backlog_eviction: {
                this.dropBacklog(persistentTopic, quota);
                break;
            }
            case producer_request_hold: 
            case producer_exception: {
                this.disconnectProducers(persistentTopic);
                break;
            }
        }
    }

    private void dropBacklog(PersistentTopic persistentTopic, BacklogQuota quota) {
        double reductionFactor = 0.9;
        double targetSize = reductionFactor * (double)quota.getLimit();
        ManagedLedgerImpl mLedger = (ManagedLedgerImpl)persistentTopic.getManagedLedger();
        long backlogSize = mLedger.getEstimatedBacklogSize();
        if (log.isDebugEnabled()) {
            log.debug("[{}] target size is [{}] for quota limit [{}], backlog size is [{}]", new Object[]{persistentTopic.getName(), targetSize, targetSize / reductionFactor, backlogSize});
        }
        ManagedCursor previousSlowestConsumer = null;
        while ((double)backlogSize > targetSize) {
            ManagedCursor slowestConsumer = mLedger.getSlowestConsumer();
            if (slowestConsumer == null) {
                if (!log.isDebugEnabled()) break;
                log.debug("[{}] slowest consumer null.", (Object)persistentTopic.getName());
                break;
            }
            double messageSkipFactor = ((double)backlogSize - targetSize) / (double)backlogSize;
            if (slowestConsumer == previousSlowestConsumer) {
                log.info("[{}] Cursors not progressing, target size is [{}] for quota limit [{}], backlog size is [{}]", new Object[]{persistentTopic.getName(), targetSize, targetSize / reductionFactor, backlogSize});
                break;
            }
            long entriesInBacklog = slowestConsumer.getNumberOfEntriesInBacklog();
            int messagesToSkip = (int)(messageSkipFactor * (double)entriesInBacklog);
            try {
                if (messagesToSkip == 0) {
                    if (!log.isDebugEnabled()) break;
                    log.debug("no messages to skip for [{}]", (Object)slowestConsumer);
                    break;
                }
                if (log.isDebugEnabled()) {
                    log.debug("Skipping [{}] messages on slowest consumer [{}] having backlog entries : [{}]", new Object[]{messagesToSkip, slowestConsumer.getName(), entriesInBacklog});
                }
                slowestConsumer.skipEntries(messagesToSkip, ManagedCursor.IndividualDeletedEntries.Include);
            }
            catch (Exception exception) {
                log.error("Error skipping [{}] messages from slowest consumer : [{}]", (Object)messagesToSkip, (Object)slowestConsumer.getName());
            }
            backlogSize = mLedger.getEstimatedBacklogSize();
            previousSlowestConsumer = slowestConsumer;
            if (!log.isDebugEnabled()) continue;
            log.debug("[{}] Updated unconsumed size = [{}]. skipFactor: [{}]", new Object[]{persistentTopic.getName(), backlogSize, messageSkipFactor});
        }
    }

    private void disconnectProducers(PersistentTopic persistentTopic) {
        ArrayList futures = Lists.newArrayList();
        ConcurrentOpenHashSet<Producer> producers = persistentTopic.getProducers();
        producers.forEach(producer -> {
            log.info("Producer [{}] has exceeded backlog quota on topic [{}]. Disconnecting producer", (Object)producer.getProducerName(), (Object)persistentTopic.getName());
            futures.add(producer.disconnect());
        });
        ((CompletableFuture)FutureUtil.waitForAll((List)futures).thenRun(() -> log.info("All producers on topic [{}] are disconnected", (Object)persistentTopic.getName()))).exceptionally(exception -> {
            log.error("Error in disconnecting producers on topic [{}] [{}]", (Object)persistentTopic.getName(), exception);
            return null;
        });
    }
}

