/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DispatchRateLimiter {
    private final String topicName;
    private final String subscriptionName;
    private final BrokerService brokerService;
    private RateLimiter dispatchRateLimiterOnMessage;
    private RateLimiter dispatchRateLimiterOnByte;
    private static final Logger log = LoggerFactory.getLogger(DispatchRateLimiter.class);

    public DispatchRateLimiter(PersistentTopic topic, String subscriptionName) {
        this.topicName = topic.getName();
        this.subscriptionName = subscriptionName;
        this.brokerService = topic.getBrokerService();
        this.updateDispatchRate();
    }

    public DispatchRateLimiter(PersistentTopic topic) {
        this(topic, null);
    }

    public long getAvailableDispatchRateLimitOnMsg() {
        return this.dispatchRateLimiterOnMessage == null ? -1L : this.dispatchRateLimiterOnMessage.getAvailablePermits();
    }

    public boolean tryDispatchPermit(long msgPermits, long bytePermits) {
        boolean acquiredMsgPermit = msgPermits <= 0L || this.dispatchRateLimiterOnMessage == null || this.dispatchRateLimiterOnMessage.tryAcquire(msgPermits);
        boolean acquiredBytePermit = bytePermits <= 0L || this.dispatchRateLimiterOnByte == null || this.dispatchRateLimiterOnByte.tryAcquire(bytePermits);
        return acquiredMsgPermit && acquiredBytePermit;
    }

    public boolean hasMessageDispatchPermit() {
        return !(this.dispatchRateLimiterOnMessage != null && this.dispatchRateLimiterOnMessage.getAvailablePermits() <= 0L || this.dispatchRateLimiterOnByte != null && this.dispatchRateLimiterOnByte.getAvailablePermits() <= 0L);
    }

    public boolean isDispatchRateLimitingEnabled() {
        return this.dispatchRateLimiterOnMessage != null || this.dispatchRateLimiterOnByte != null;
    }

    public void updateDispatchRate() {
        DispatchRate dispatchRate = this.getPoliciesDispatchRate(this.brokerService);
        if (dispatchRate == null) {
            dispatchRate = this.subscriptionName == null ? new DispatchRate(this.brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInMsg(), this.brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInByte(), 1) : new DispatchRate(this.brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg(), this.brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerSubscribeInByte(), 1);
        }
        this.updateDispatchRate(dispatchRate);
        log.info("[{}] [{}] configured message-dispatch rate at broker {}", new Object[]{this.topicName, this.subscriptionName, dispatchRate});
    }

    public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional<Policies> policies, String topicName, String subscriptionName) {
        ServiceConfiguration serviceConfig = brokerService.pulsar().getConfiguration();
        policies = policies.isPresent() ? policies : DispatchRateLimiter.getPolicies(brokerService, topicName);
        return DispatchRateLimiter.isDispatchRateNeeded(serviceConfig, policies, topicName, subscriptionName);
    }

    public static boolean isDispatchRateNeeded(ServiceConfiguration serviceConfig, Optional<Policies> policies, String topicName, String subscriptionName) {
        DispatchRate dispatchRate = DispatchRateLimiter.getPoliciesDispatchRate(serviceConfig.getClusterName(), policies, topicName, subscriptionName);
        if (dispatchRate == null) {
            if (subscriptionName == null) {
                return serviceConfig.getDispatchThrottlingRatePerTopicInMsg() > 0 || serviceConfig.getDispatchThrottlingRatePerTopicInByte() > 0L;
            }
            return serviceConfig.getDispatchThrottlingRatePerSubscriptionInMsg() > 0 || serviceConfig.getDispatchThrottlingRatePerSubscribeInByte() > 0L;
        }
        return true;
    }

    public void onPoliciesUpdate(Policies data) {
        String cluster = this.brokerService.pulsar().getConfiguration().getClusterName();
        DispatchRate dispatchRate = this.subscriptionName == null ? (DispatchRate)data.clusterDispatchRate.get(cluster) : (DispatchRate)data.subscriptionDispatchRate.get(cluster);
        if (dispatchRate != null) {
            int inMsg = this.subscriptionName == null ? this.brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInMsg() : this.brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg();
            long inByte = this.subscriptionName == null ? this.brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInByte() : this.brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerSubscribeInByte();
            DispatchRate newDispatchRate = new DispatchRate(inMsg, inByte, 1);
            if (!DispatchRateLimiter.isDispatchRateEnabled(dispatchRate) && DispatchRateLimiter.isDispatchRateEnabled(newDispatchRate)) {
                dispatchRate = newDispatchRate;
            }
            this.updateDispatchRate(dispatchRate);
        }
    }

    public DispatchRate getPoliciesDispatchRate(BrokerService brokerService) {
        String cluster = brokerService.pulsar().getConfiguration().getClusterName();
        Optional<Policies> policies = DispatchRateLimiter.getPolicies(brokerService, this.topicName);
        return DispatchRateLimiter.getPoliciesDispatchRate(cluster, policies, this.topicName, this.subscriptionName);
    }

    public static Optional<Policies> getPolicies(BrokerService brokerService, String topicName) {
        NamespaceName namespace = TopicName.get((String)topicName).getNamespaceObject();
        String path = PulsarWebResource.path("policies", namespace.toString());
        Optional policies = Optional.empty();
        try {
            policies = (Optional)brokerService.pulsar().getConfigurationCache().policiesCache().getAsync(path).get(brokerService.pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS);
        }
        catch (Exception e) {
            log.warn("Failed to get message-rate for {} ", (Object)topicName, (Object)e);
        }
        return policies;
    }

    public static DispatchRate getPoliciesDispatchRate(String cluster, Optional<Policies> policies, String topicName, String subscriptionName) {
        return policies.map(p -> {
            DispatchRate dispatchRate = subscriptionName == null ? (DispatchRate)p.clusterDispatchRate.get(cluster) : (DispatchRate)p.subscriptionDispatchRate.get(cluster);
            return DispatchRateLimiter.isDispatchRateEnabled(dispatchRate) ? dispatchRate : null;
        }).orElse(null);
    }

    public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
        log.info("[{}] [{}] setting message-dispatch-rate {}", new Object[]{this.topicName, this.subscriptionName, dispatchRate});
        long msgRate = dispatchRate.dispatchThrottlingRateInMsg;
        long byteRate = dispatchRate.dispatchThrottlingRateInByte;
        long ratePeriod = dispatchRate.ratePeriodInSecond;
        if (msgRate > 0L) {
            if (this.dispatchRateLimiterOnMessage == null) {
                this.dispatchRateLimiterOnMessage = new RateLimiter(this.brokerService.pulsar().getExecutor(), msgRate, ratePeriod, TimeUnit.SECONDS);
            } else {
                this.dispatchRateLimiterOnMessage.setRate(msgRate, (long)dispatchRate.ratePeriodInSecond, TimeUnit.SECONDS);
            }
        } else if (this.dispatchRateLimiterOnMessage != null) {
            this.dispatchRateLimiterOnMessage.close();
            this.dispatchRateLimiterOnMessage = null;
        }
        if (byteRate > 0L) {
            if (this.dispatchRateLimiterOnByte == null) {
                this.dispatchRateLimiterOnByte = new RateLimiter(this.brokerService.pulsar().getExecutor(), byteRate, ratePeriod, TimeUnit.SECONDS);
            } else {
                this.dispatchRateLimiterOnByte.setRate(byteRate, (long)dispatchRate.ratePeriodInSecond, TimeUnit.SECONDS);
            }
        } else if (this.dispatchRateLimiterOnByte != null) {
            this.dispatchRateLimiterOnByte.close();
            this.dispatchRateLimiterOnByte = null;
        }
    }

    public long getDispatchRateOnMsg() {
        return this.dispatchRateLimiterOnMessage != null ? this.dispatchRateLimiterOnMessage.getRate() : -1L;
    }

    public long getDispatchRateOnByte() {
        return this.dispatchRateLimiterOnByte != null ? this.dispatchRateLimiterOnByte.getRate() : -1L;
    }

    private static boolean isDispatchRateEnabled(DispatchRate dispatchRate) {
        return dispatchRate != null && (dispatchRate.dispatchThrottlingRateInMsg > 0 || dispatchRate.dispatchThrottlingRateInByte > 0L);
    }

    public void close() {
        if (this.dispatchRateLimiterOnMessage != null) {
            this.dispatchRateLimiterOnMessage.close();
            this.dispatchRateLimiterOnMessage = null;
        }
        if (this.dispatchRateLimiterOnByte != null) {
            this.dispatchRateLimiterOnByte.close();
            this.dispatchRateLimiterOnByte = null;
        }
    }
}

