/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.qos.AsyncTokenBucket;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.Policies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DispatchRateLimiter {
    private final PersistentTopic topic;
    private final String topicName;
    private final String subscriptionName;
    private final Type type;
    private final BrokerService brokerService;
    private volatile AsyncTokenBucket dispatchRateLimiterOnMessage;
    private volatile AsyncTokenBucket dispatchRateLimiterOnByte;
    private static final Logger log = LoggerFactory.getLogger(DispatchRateLimiter.class);

    public DispatchRateLimiter(PersistentTopic topic, Type type) {
        this(topic, null, type);
    }

    public DispatchRateLimiter(PersistentTopic topic, String subscriptionName, Type type) {
        this.topic = topic;
        this.topicName = topic.getName();
        this.subscriptionName = subscriptionName;
        this.brokerService = topic.getBrokerService();
        this.type = type;
        this.updateDispatchRate();
    }

    public DispatchRateLimiter(BrokerService brokerService) {
        this.topic = null;
        this.topicName = null;
        this.subscriptionName = null;
        this.brokerService = brokerService;
        this.type = Type.BROKER;
        this.updateDispatchRate();
    }

    public long getAvailableDispatchRateLimitOnMsg() {
        return this.dispatchRateLimiterOnMessage == null ? -1L : Math.max(this.dispatchRateLimiterOnMessage.getTokens(), 0L);
    }

    public long getAvailableDispatchRateLimitOnByte() {
        return this.dispatchRateLimiterOnByte == null ? -1L : Math.max(this.dispatchRateLimiterOnByte.getTokens(), 0L);
    }

    public void consumeDispatchQuota(long numberOfMessages, long byteSize) {
        if (numberOfMessages > 0L && this.dispatchRateLimiterOnMessage != null) {
            this.dispatchRateLimiterOnMessage.consumeTokens(numberOfMessages);
        }
        if (byteSize > 0L && this.dispatchRateLimiterOnByte != null) {
            this.dispatchRateLimiterOnByte.consumeTokens(byteSize);
        }
    }

    public boolean isDispatchRateLimitingEnabled() {
        return this.dispatchRateLimiterOnMessage != null || this.dispatchRateLimiterOnByte != null;
    }

    private DispatchRate createDispatchRate() {
        int dispatchThrottlingRateInMsg;
        ServiceConfiguration config = this.brokerService.pulsar().getConfiguration();
        return DispatchRate.builder().dispatchThrottlingRateInMsg(dispatchThrottlingRateInMsg).dispatchThrottlingRateInByte(switch (this.type) {
            case Type.TOPIC -> {
                dispatchThrottlingRateInMsg = config.getDispatchThrottlingRatePerTopicInMsg();
                yield config.getDispatchThrottlingRatePerTopicInByte();
            }
            case Type.SUBSCRIPTION -> {
                dispatchThrottlingRateInMsg = config.getDispatchThrottlingRatePerSubscriptionInMsg();
                yield config.getDispatchThrottlingRatePerSubscriptionInByte();
            }
            case Type.REPLICATOR -> {
                dispatchThrottlingRateInMsg = config.getDispatchThrottlingRatePerReplicatorInMsg();
                yield config.getDispatchThrottlingRatePerReplicatorInByte();
            }
            case Type.BROKER -> {
                dispatchThrottlingRateInMsg = config.getDispatchThrottlingRateInMsg();
                yield config.getDispatchThrottlingRateInByte();
            }
            default -> {
                dispatchThrottlingRateInMsg = -1;
                yield -1L;
            }
        }).ratePeriodInSecond(1).relativeToPublishRate(this.type != Type.BROKER && config.isDispatchThrottlingRateRelativeToPublishRate()).build();
    }

    public void updateDispatchRate() {
        DispatchRate dispatchRate;
        switch (this.type) {
            case TOPIC: {
                dispatchRate = this.topic.getDispatchRate();
                break;
            }
            case SUBSCRIPTION: {
                dispatchRate = this.topic.getSubscriptionDispatchRate(this.subscriptionName);
                break;
            }
            case REPLICATOR: {
                dispatchRate = this.topic.getReplicatorDispatchRate();
                break;
            }
            case BROKER: {
                dispatchRate = this.createDispatchRate();
                break;
            }
            default: {
                log.warn("ignore configured dispatch rate for type {}", (Object)this.type);
                return;
            }
        }
        if (this.type == Type.BROKER) {
            log.info("configured broker message-dispatch rate {}", (Object)dispatchRate);
        } else {
            log.info("[{}] configured {} message-dispatch rate at broker {} subscriptionName [{}]", new Object[]{this.topicName, this.type, this.subscriptionName == null ? "null" : this.subscriptionName, dispatchRate});
        }
        this.updateDispatchRate(dispatchRate);
    }

    public static CompletableFuture<Optional<Policies>> getPoliciesAsync(BrokerService brokerService, String topicName) {
        NamespaceName namespace = TopicName.get((String)topicName).getNamespaceObject();
        return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace);
    }

    @Deprecated
    public static Optional<Policies> getPolicies(BrokerService brokerService, String topicName) {
        NamespaceName namespace = TopicName.get((String)topicName).getNamespaceObject();
        return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(namespace);
    }

    public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
        log.info("setting message-dispatch-rate {}", (Object)dispatchRate);
        long msgRate = dispatchRate.getDispatchThrottlingRateInMsg();
        long byteRate = dispatchRate.getDispatchThrottlingRateInByte();
        long ratePeriodNanos = TimeUnit.SECONDS.toNanos(Math.max(dispatchRate.getRatePeriodInSecond(), 1));
        this.dispatchRateLimiterOnMessage = msgRate > 0L ? (dispatchRate.isRelativeToPublishRate() ? AsyncTokenBucket.builderForDynamicRate().rateFunction(() -> this.getRelativeDispatchRateInMsg(dispatchRate)).ratePeriodNanosFunction(() -> ratePeriodNanos).build() : AsyncTokenBucket.builder().rate(msgRate).ratePeriodNanos(ratePeriodNanos).build()) : null;
        this.dispatchRateLimiterOnByte = byteRate > 0L ? (dispatchRate.isRelativeToPublishRate() ? AsyncTokenBucket.builderForDynamicRate().rateFunction(() -> this.getRelativeDispatchRateInByte(dispatchRate)).ratePeriodNanosFunction(() -> ratePeriodNanos).build() : AsyncTokenBucket.builder().rate(byteRate).ratePeriodNanos(ratePeriodNanos).build()) : null;
    }

    private long getRelativeDispatchRateInMsg(DispatchRate dispatchRate) {
        return this.topic != null && dispatchRate != null ? (long)this.topic.getLastUpdatedAvgPublishRateInMsg() + (long)dispatchRate.getDispatchThrottlingRateInMsg() : 0L;
    }

    private long getRelativeDispatchRateInByte(DispatchRate dispatchRate) {
        return this.topic != null && dispatchRate != null ? (long)this.topic.getLastUpdatedAvgPublishRateInByte() + dispatchRate.getDispatchThrottlingRateInByte() : 0L;
    }

    public long getDispatchRateOnMsg() {
        return this.dispatchRateLimiterOnMessage != null ? this.dispatchRateLimiterOnMessage.getRate() : -1L;
    }

    public long getDispatchRateOnByte() {
        return this.dispatchRateLimiterOnByte != null ? this.dispatchRateLimiterOnByte.getRate() : -1L;
    }

    public static boolean isDispatchRateEnabled(DispatchRate dispatchRate) {
        return dispatchRate != null && (dispatchRate.getDispatchThrottlingRateInMsg() > 0 || dispatchRate.getDispatchThrottlingRateInByte() > 0L);
    }

    public void close() {
        if (this.dispatchRateLimiterOnMessage != null) {
            this.dispatchRateLimiterOnMessage = null;
        }
        if (this.dispatchRateLimiterOnByte != null) {
            this.dispatchRateLimiterOnByte = null;
        }
    }

    public static enum Type {
        TOPIC,
        SUBSCRIPTION,
        REPLICATOR,
        BROKER;

    }
}

