/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import com.google.common.collect.Sets;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class MessageDispatchThrottlingTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(MessageDispatchThrottlingTest.class);

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
        this.conf.setClusterName("test");
    }

    @Override
    @AfterMethod
    protected void cleanup() throws Exception {
        super.internalCleanup();
        super.resetConfig();
    }

    @DataProvider(name="subscriptions")
    public Object[][] subscriptionsProvider() {
        return new Object[][]{{SubscriptionType.Shared}, {SubscriptionType.Exclusive}};
    }

    @DataProvider(name="dispatchRateType")
    public Object[][] dispatchRateProvider() {
        return new Object[][]{{DispatchRateType.messageRate}, {DispatchRateType.byteRate}};
    }

    @DataProvider(name="subscriptionAndDispatchRateType")
    public Object[][] subDisTypeProvider() {
        LinkedList<Object[]> mergeList = new LinkedList<Object[]>();
        for (Object[] sub : this.subscriptionsProvider()) {
            for (Object[] dispatch : this.dispatchRateProvider()) {
                mergeList.add(MessageDispatchThrottlingTest.merge(sub, dispatch));
            }
        }
        return (Object[][])mergeList.toArray((T[])new Object[0][0]);
    }

    public static <T> T[] merge(T[] first, T[] last) {
        int totalLength = first.length + last.length;
        T[] result = Arrays.copyOf(first, totalLength);
        int offset = first.length;
        System.arraycopy(last, 0, result, offset, first.length);
        return result;
    }

    @Test
    public void testMessageRateDynamicallyChange() throws Exception {
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        String namespace = "my-property/throttling_ns";
        String topicName = "persistent://my-property/throttling_ns/throttlingBlock";
        this.admin.namespaces().createNamespace("my-property/throttling_ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingBlock").create();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingBlock").get();
        Assert.assertFalse((boolean)topic.getDispatchRateLimiter().isPresent());
        int messageRate = 100;
        DispatchRate dispatchRate = new DispatchRate(messageRate, -1L, 360);
        this.admin.namespaces().setDispatchRate("my-property/throttling_ns", dispatchRate);
        boolean isDispatchRateUpdate = false;
        int retry = 5;
        for (i = 0; i < retry; ++i) {
            if (topic.getDispatchRateLimiter().isPresent()) {
                isDispatchRateUpdate = true;
                break;
            }
            if (i == retry - 1) continue;
            Thread.sleep(100L);
        }
        Assert.assertTrue((boolean)isDispatchRateUpdate);
        Assert.assertEquals((Object)this.admin.namespaces().getDispatchRate("my-property/throttling_ns"), (Object)dispatchRate);
        messageRate = 500;
        dispatchRate = new DispatchRate(-1, (long)messageRate, 360);
        this.admin.namespaces().setDispatchRate("my-property/throttling_ns", dispatchRate);
        isDispatchRateUpdate = false;
        for (i = 0; i < retry; ++i) {
            if (((DispatchRateLimiter)topic.getDispatchRateLimiter().get()).getDispatchRateOnByte() == (long)messageRate) {
                isDispatchRateUpdate = true;
                break;
            }
            if (i == retry - 1) continue;
            Thread.sleep(100L);
        }
        Assert.assertTrue((boolean)isDispatchRateUpdate);
        Assert.assertEquals((Object)this.admin.namespaces().getDispatchRate("my-property/throttling_ns"), (Object)dispatchRate);
        producer.close();
    }

    @Test(dataProvider="subscriptionAndDispatchRateType", timeOut=5000L)
    public void testMessageRateLimitingNotReceiveAllMessages(SubscriptionType subscription, DispatchRateType dispatchRateType) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String namespace = "my-property/throttling_ns";
        String topicName = "persistent://my-property/throttling_ns/throttlingBlock";
        int messageRate = 100;
        DispatchRate dispatchRate = null;
        dispatchRate = DispatchRateType.messageRate.equals((Object)dispatchRateType) ? new DispatchRate(100, -1L, 360) : new DispatchRate(-1, 100L, 360);
        this.admin.namespaces().createNamespace("my-property/throttling_ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.namespaces().setDispatchRate("my-property/throttling_ns", dispatchRate);
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingBlock").create();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingBlock").get();
        boolean isMessageRateUpdate = false;
        int retry = 5;
        for (int i = 0; i < retry; ++i) {
            if (((DispatchRateLimiter)topic.getDispatchRateLimiter().get()).getDispatchRateOnMsg() > 0L || ((DispatchRateLimiter)topic.getDispatchRateLimiter().get()).getDispatchRateOnByte() > 0L) {
                isMessageRateUpdate = true;
                break;
            }
            if (i == retry - 1) continue;
            Thread.sleep(100L);
        }
        Assert.assertTrue((boolean)isMessageRateUpdate);
        Assert.assertEquals((Object)this.admin.namespaces().getDispatchRate("my-property/throttling_ns"), (Object)dispatchRate);
        int numMessages = 500;
        AtomicInteger totalReceived = new AtomicInteger(0);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/throttling_ns/throttlingBlock"}).subscriptionName("my-subscriber-name").subscriptionType(subscription).messageListener((MessageListener & Serializable)(c1, msg) -> {
            Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
            String receivedMessage = new String(msg.getData());
            log.debug("Received message [{}] in the listener", (Object)receivedMessage);
            totalReceived.incrementAndGet();
        }).subscribe();
        this.deactiveCursors((ManagedLedgerImpl)topic.getManagedLedger());
        for (int i = 0; i < numMessages; ++i) {
            producer.send((Object)new byte[80]);
        }
        Assert.assertTrue((totalReceived.get() < 200 ? 1 : 0) != 0);
        consumer.close();
        producer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testClusterMsgByteRateLimitingClusterConfig() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String namespace = "my-property/throttling_ns";
        String topicName = "persistent://my-property/throttling_ns/throttlingBlock";
        int messageRate = 5;
        long byteRate = 0x100000L;
        int initValue = this.pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg();
        this.admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg", Integer.toString(5));
        this.admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInByte", Long.toString(0x100000L));
        for (int i = 0; i < 5; ++i) {
            if (this.pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg() != initValue) continue;
            Thread.sleep(50 + i * 10);
        }
        Assert.assertNotEquals((Object)this.pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg(), (Object)initValue);
        this.admin.namespaces().createNamespace("my-property/throttling_ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingBlock").create();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingBlock").get();
        int numMessages = 500;
        AtomicInteger totalReceived = new AtomicInteger(0);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/throttling_ns/throttlingBlock"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).messageListener((MessageListener & Serializable)(c1, msg) -> {
            Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
            String receivedMessage = new String(msg.getData());
            log.debug("Received message [{}] in the listener", (Object)receivedMessage);
            totalReceived.incrementAndGet();
        }).subscribe();
        this.deactiveCursors((ManagedLedgerImpl)topic.getManagedLedger());
        for (int i = 0; i < numMessages; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Thread.sleep(500L);
        Assert.assertNotEquals((Object)totalReceived.get(), (Object)numMessages);
        consumer.close();
        producer.close();
        this.pulsar.getConfiguration().setDispatchThrottlingRatePerTopicInMsg(initValue);
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(dataProvider="subscriptions", timeOut=5000L)
    public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType subscription) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String namespace = "my-property/throttling_ns";
        String topicName = "persistent://my-property/throttling_ns/throttlingAll";
        int messageRate = 10;
        DispatchRate dispatchRate = new DispatchRate(10, -1L, 1);
        this.admin.namespaces().createNamespace("my-property/throttling_ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.namespaces().setDispatchRate("my-property/throttling_ns", dispatchRate);
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingAll").create();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingAll").get();
        boolean isMessageRateUpdate = false;
        int retry = 5;
        for (int i = 0; i < retry; ++i) {
            if (((DispatchRateLimiter)topic.getDispatchRateLimiter().get()).getDispatchRateOnMsg() > 0L) {
                isMessageRateUpdate = true;
                break;
            }
            if (i == retry - 1) continue;
            Thread.sleep(100L);
        }
        Assert.assertTrue((boolean)isMessageRateUpdate);
        Assert.assertEquals((Object)this.admin.namespaces().getDispatchRate("my-property/throttling_ns"), (Object)dispatchRate);
        int numProducedMessages = 20;
        CountDownLatch latch = new CountDownLatch(20);
        AtomicInteger totalReceived = new AtomicInteger(0);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/throttling_ns/throttlingAll"}).subscriptionName("my-subscriber-name").subscriptionType(subscription).messageListener((MessageListener & Serializable)(c1, msg) -> {
            Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
            String receivedMessage = new String(msg.getData());
            log.debug("Received message [{}] in the listener", (Object)receivedMessage);
            totalReceived.incrementAndGet();
            latch.countDown();
        }).subscribe();
        this.deactiveCursors((ManagedLedgerImpl)topic.getManagedLedger());
        for (int i = 0; i < 20; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        latch.await();
        Assert.assertEquals((int)totalReceived.get(), (int)20);
        consumer.close();
        producer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(dataProvider="subscriptions", timeOut=5000L)
    public void testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType subscription) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String namespace = "my-property/throttling_ns";
        String topicName = "persistent://my-property/throttling_ns/throttlingAll";
        int byteRate = 100;
        DispatchRate dispatchRate = new DispatchRate(-1, 100L, 1);
        this.admin.namespaces().createNamespace("my-property/throttling_ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.namespaces().setDispatchRate("my-property/throttling_ns", dispatchRate);
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingAll").create();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingAll").get();
        boolean isMessageRateUpdate = false;
        int retry = 5;
        for (int i = 0; i < retry; ++i) {
            if (((DispatchRateLimiter)topic.getDispatchRateLimiter().get()).getDispatchRateOnByte() > 0L) {
                isMessageRateUpdate = true;
                break;
            }
            if (i == retry - 1) continue;
            Thread.sleep(100L);
        }
        Assert.assertTrue((boolean)isMessageRateUpdate);
        Assert.assertEquals((Object)this.admin.namespaces().getDispatchRate("my-property/throttling_ns"), (Object)dispatchRate);
        int numProducedMessages = 20;
        CountDownLatch latch = new CountDownLatch(20);
        AtomicInteger totalReceived = new AtomicInteger(0);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/throttling_ns/throttlingAll"}).subscriptionName("my-subscriber-name").subscriptionType(subscription).messageListener((MessageListener & Serializable)(c1, msg) -> {
            Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
            String receivedMessage = new String(msg.getData());
            log.debug("Received message [{}] in the listener", (Object)receivedMessage);
            totalReceived.incrementAndGet();
            latch.countDown();
        }).subscribe();
        this.deactiveCursors((ManagedLedgerImpl)topic.getManagedLedger());
        for (int i = 0; i < 20; ++i) {
            producer.send((Object)new byte[10]);
        }
        latch.await();
        Assert.assertEquals((int)totalReceived.get(), (int)20);
        consumer.close();
        producer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(timeOut=5000L)
    public void testRateLimitingMultipleConsumers() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String namespace = "my-property/throttling_ns";
        String topicName = "persistent://my-property/throttling_ns/throttlingMultipleConsumers";
        int messageRate = 5;
        DispatchRate dispatchRate = new DispatchRate(5, -1L, 360);
        this.admin.namespaces().createNamespace("my-property/throttling_ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.namespaces().setDispatchRate("my-property/throttling_ns", dispatchRate);
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingMultipleConsumers").create();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingMultipleConsumers").get();
        boolean isMessageRateUpdate = false;
        int retry = 5;
        for (int i = 0; i < retry; ++i) {
            if (((DispatchRateLimiter)topic.getDispatchRateLimiter().get()).getDispatchRateOnMsg() > 0L) {
                isMessageRateUpdate = true;
                break;
            }
            if (i == retry - 1) continue;
            Thread.sleep(100L);
        }
        Assert.assertTrue((boolean)isMessageRateUpdate);
        Assert.assertEquals((Object)this.admin.namespaces().getDispatchRate("my-property/throttling_ns"), (Object)dispatchRate);
        int numProducedMessages = 500;
        AtomicInteger totalReceived = new AtomicInteger(0);
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/throttling_ns/throttlingMultipleConsumers"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).messageListener((MessageListener & Serializable)(c1, msg) -> {
            Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
            String receivedMessage = new String(msg.getData());
            log.debug("Received message [{}] in the listener", (Object)receivedMessage);
            totalReceived.incrementAndGet();
        });
        Consumer consumer1 = consumerBuilder.subscribe();
        Consumer consumer2 = consumerBuilder.subscribe();
        Consumer consumer3 = consumerBuilder.subscribe();
        Consumer consumer4 = consumerBuilder.subscribe();
        Consumer consumer5 = consumerBuilder.subscribe();
        this.deactiveCursors((ManagedLedgerImpl)topic.getManagedLedger());
        for (int i = 0; i < 500; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Thread.sleep(500L);
        Assert.assertNotEquals((Object)totalReceived.get(), (Object)500);
        consumer1.close();
        consumer2.close();
        consumer3.close();
        consumer4.close();
        consumer5.close();
        producer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(dataProvider="subscriptions", timeOut=5000L)
    public void testClusterRateLimitingConfiguration(SubscriptionType subscription) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String namespace = "my-property/throttling_ns";
        String topicName = "persistent://my-property/throttling_ns/throttlingBlock";
        int messageRate = 5;
        int initValue = this.pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg();
        this.admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg", Integer.toString(5));
        for (int i = 0; i < 5; ++i) {
            if (this.pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg() != initValue) continue;
            Thread.sleep(50 + i * 10);
        }
        Assert.assertNotEquals((Object)this.pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg(), (Object)initValue);
        this.admin.namespaces().createNamespace("my-property/throttling_ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingBlock").create();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingBlock").get();
        int numMessages = 500;
        AtomicInteger totalReceived = new AtomicInteger(0);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/throttling_ns/throttlingBlock"}).subscriptionName("my-subscriber-name").subscriptionType(subscription).messageListener((MessageListener & Serializable)(c1, msg) -> {
            Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
            String receivedMessage = new String(msg.getData());
            log.debug("Received message [{}] in the listener", (Object)receivedMessage);
            totalReceived.incrementAndGet();
        }).subscribe();
        this.deactiveCursors((ManagedLedgerImpl)topic.getManagedLedger());
        for (int i = 0; i < numMessages; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Thread.sleep(500L);
        Assert.assertNotEquals((Object)totalReceived.get(), (Object)numMessages);
        consumer.close();
        producer.close();
        this.pulsar.getConfiguration().setDispatchThrottlingRatePerTopicInMsg(initValue);
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(dataProvider="subscriptions", timeOut=5000L)
    public void testMessageByteRateThrottlingCombined(SubscriptionType subscription) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String namespace = "my-property/throttling_ns";
        String topicName = "persistent://my-property/throttling_ns/throttlingAll";
        int messageRate = 5;
        long byteRate = 10L;
        DispatchRate dispatchRate = new DispatchRate(5, 10L, 360);
        this.admin.namespaces().createNamespace("my-property/throttling_ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.namespaces().setDispatchRate("my-property/throttling_ns", dispatchRate);
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingAll").create();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingAll").get();
        boolean isMessageRateUpdate = false;
        int retry = 5;
        for (int i = 0; i < retry; ++i) {
            if (((DispatchRateLimiter)topic.getDispatchRateLimiter().get()).getDispatchRateOnMsg() > 0L && ((DispatchRateLimiter)topic.getDispatchRateLimiter().get()).getDispatchRateOnByte() > 0L) {
                isMessageRateUpdate = true;
                break;
            }
            if (i == retry - 1) continue;
            Thread.sleep(100L);
        }
        Assert.assertTrue((boolean)isMessageRateUpdate);
        Assert.assertEquals((Object)this.admin.namespaces().getDispatchRate("my-property/throttling_ns"), (Object)dispatchRate);
        int numProducedMessages = 200;
        AtomicInteger totalReceived = new AtomicInteger(0);
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/throttling_ns/throttlingAll"}).subscriptionName("my-subscriber-name").subscriptionType(subscription).messageListener((MessageListener & Serializable)(c1, msg) -> {
            Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
            String receivedMessage = new String(msg.getData());
            log.debug("Received message [{}] in the listener", (Object)receivedMessage);
            totalReceived.incrementAndGet();
        });
        Consumer consumer = consumerBuilder.subscribe();
        this.deactiveCursors((ManagedLedgerImpl)topic.getManagedLedger());
        consumer.close();
        int dataSize = 50;
        byte[] data = new byte[50];
        for (int i = 0; i < 200; ++i) {
            producer.send((Object)data);
        }
        consumer = consumerBuilder.subscribe();
        int totalReceivedBytes = 50 * totalReceived.get();
        Assert.assertNotEquals((Object)totalReceivedBytes, (Object)20L);
        consumer.close();
        producer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testGlobalNamespaceThrottling() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String namespace = "my-property/throttling_ns";
        String topicName = "persistent://my-property/throttling_ns/throttlingBlock";
        int messageRate = 5;
        DispatchRate dispatchRate = new DispatchRate(5, -1L, 360);
        this.admin.clusters().createCluster("global", new ClusterData("http://global:8080"));
        this.admin.namespaces().createNamespace("my-property/throttling_ns");
        this.admin.namespaces().setNamespaceReplicationClusters("my-property/throttling_ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.namespaces().setDispatchRate("my-property/throttling_ns", dispatchRate);
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingBlock").create();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingBlock").get();
        boolean isMessageRateUpdate = false;
        int retry = 5;
        for (int i = 0; i < retry; ++i) {
            if (((DispatchRateLimiter)topic.getDispatchRateLimiter().get()).getDispatchRateOnMsg() > 0L || ((DispatchRateLimiter)topic.getDispatchRateLimiter().get()).getDispatchRateOnByte() > 0L) {
                isMessageRateUpdate = true;
                break;
            }
            if (i == retry - 1) continue;
            Thread.sleep(100L);
        }
        Assert.assertTrue((boolean)isMessageRateUpdate);
        Assert.assertEquals((Object)this.admin.namespaces().getDispatchRate("my-property/throttling_ns"), (Object)dispatchRate);
        int numMessages = 500;
        AtomicInteger totalReceived = new AtomicInteger(0);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/throttling_ns/throttlingBlock"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).messageListener((MessageListener & Serializable)(c1, msg) -> {
            Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
            String receivedMessage = new String(msg.getData());
            log.debug("Received message [{}] in the listener", (Object)receivedMessage);
            totalReceived.incrementAndGet();
        }).subscribe();
        this.deactiveCursors((ManagedLedgerImpl)topic.getManagedLedger());
        for (int i = 0; i < numMessages; ++i) {
            producer.send((Object)new byte[80]);
        }
        Thread.sleep(500L);
        Assert.assertNotEquals((Object)totalReceived.get(), (Object)numMessages);
        consumer.close();
        producer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(dataProvider="subscriptions", timeOut=5000L)
    public void testNonBacklogConsumerWithThrottlingEnabled(SubscriptionType subscription) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String namespace = "my-property/throttling_ns";
        String topicName = "persistent://my-property/throttling_ns/throttlingBlock";
        int messageRate = 10;
        DispatchRate dispatchRate = new DispatchRate(10, -1L, 360);
        this.admin.namespaces().createNamespace("my-property/throttling_ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.namespaces().setDispatchRate("my-property/throttling_ns", dispatchRate);
        this.admin.brokers().updateDynamicConfiguration("dispatchThrottlingOnNonBacklogConsumerEnabled", Boolean.TRUE.toString());
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingBlock").create();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingBlock").get();
        boolean isUpdated = false;
        int retry = 5;
        for (int i = 0; i < retry; ++i) {
            if (((DispatchRateLimiter)topic.getDispatchRateLimiter().get()).getDispatchRateOnMsg() > 0L) {
                isUpdated = true;
                break;
            }
            if (i == retry - 1) continue;
            Thread.sleep(100L);
        }
        Assert.assertTrue((boolean)isUpdated);
        Assert.assertEquals((Object)this.admin.namespaces().getDispatchRate("my-property/throttling_ns"), (Object)dispatchRate);
        this.conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
        int numMessages = 500;
        AtomicInteger totalReceived = new AtomicInteger(0);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/throttling_ns/throttlingBlock"}).subscriptionName("my-subscriber-name").subscriptionType(subscription).messageListener((MessageListener & Serializable)(c1, msg) -> {
            Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
            String receivedMessage = new String(msg.getData());
            log.debug("Received message [{}] in the listener", (Object)receivedMessage);
            totalReceived.incrementAndGet();
        }).subscribe();
        for (int i = 0; i < numMessages; ++i) {
            producer.send((Object)new byte[80]);
        }
        Assert.assertTrue((totalReceived.get() < 20 ? 1 : 0) != 0);
        consumer.close();
        producer.close();
        this.conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(false);
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testClusterPolicyOverrideConfiguration() throws Exception {
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        String namespace = "my-property/throttling_ns";
        String topicName1 = "persistent://my-property/throttling_ns/throttlingOverride1";
        String topicName2 = "persistent://my-property/throttling_ns/throttlingOverride2";
        int clusterMessageRate = 100;
        int initValue = this.pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg();
        this.admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg", Integer.toString(100));
        for (int i2 = 0; i2 < 5; ++i2) {
            if (this.pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg() != initValue) continue;
            Thread.sleep(50 + i2 * 10);
        }
        Assert.assertNotEquals((Object)this.pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg(), (Object)initValue);
        this.admin.namespaces().createNamespace("my-property/throttling_ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingOverride1").create();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingOverride1").get();
        Assert.assertEquals((long)100L, (long)((DispatchRateLimiter)topic.getDispatchRateLimiter().get()).getDispatchRateOnMsg());
        int nsMessageRate = 500;
        DispatchRate dispatchRate = new DispatchRate(nsMessageRate, 0L, 1);
        this.admin.namespaces().setDispatchRate("my-property/throttling_ns", dispatchRate);
        for (i = 0; i < 5; ++i) {
            if (((DispatchRateLimiter)topic.getDispatchRateLimiter().get()).getDispatchRateOnMsg() == (long)nsMessageRate) continue;
            Thread.sleep(50 + i * 10);
        }
        Assert.assertEquals((long)nsMessageRate, (long)((DispatchRateLimiter)topic.getDispatchRateLimiter().get()).getDispatchRateOnMsg());
        dispatchRate = new DispatchRate(0, 0L, 1);
        this.admin.namespaces().setDispatchRate("my-property/throttling_ns", dispatchRate);
        for (i = 0; i < 5; ++i) {
            if (((DispatchRateLimiter)topic.getDispatchRateLimiter().get()).getDispatchRateOnMsg() != (long)nsMessageRate) continue;
            Thread.sleep(50 + i * 10);
        }
        Assert.assertEquals((long)100L, (long)((DispatchRateLimiter)topic.getDispatchRateLimiter().get()).getDispatchRateOnMsg());
        Producer producer2 = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingOverride2").create();
        PersistentTopic topic2 = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingOverride2").get();
        Assert.assertEquals((long)100L, (long)((DispatchRateLimiter)topic2.getDispatchRateLimiter().get()).getDispatchRateOnMsg());
        producer.close();
        producer2.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(dataProvider="subscriptions", timeOut=10000L)
    public void testClosingRateLimiter(SubscriptionType subscription) throws Exception {
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        String namespace = "my-property/throttling_ns";
        String topicName = "persistent://my-property/throttling_ns/closingRateLimiter" + subscription.name();
        String subName = "mySubscription" + subscription.name();
        DispatchRate dispatchRate = new DispatchRate(10, 1024L, 1);
        this.admin.namespaces().createNamespace("my-property/throttling_ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.namespaces().setDispatchRate("my-property/throttling_ns", dispatchRate);
        Producer producer = this.pulsarClient.newProducer().topic(topicName).create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subName).subscriptionType(subscription).subscribe();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic(topicName).get();
        int numProducedMessages = 10;
        for (i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        for (i = 0; i < 10; ++i) {
            Message msg = consumer.receive();
            consumer.acknowledge(msg);
        }
        Assert.assertTrue((boolean)topic.getDispatchRateLimiter().isPresent());
        DispatchRateLimiter dispatchRateLimiter = (DispatchRateLimiter)topic.getDispatchRateLimiter().get();
        producer.close();
        consumer.unsubscribe();
        consumer.close();
        topic.close().get();
        Assert.assertEquals((long)dispatchRateLimiter.getDispatchRateOnMsg(), (long)-1L);
        Assert.assertEquals((long)dispatchRateLimiter.getDispatchRateOnByte(), (long)-1L);
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    protected void deactiveCursors(ManagedLedgerImpl ledger) throws Exception {
        Field statsUpdaterField = BrokerService.class.getDeclaredField("statsUpdater");
        statsUpdaterField.setAccessible(true);
        ScheduledExecutorService statsUpdater = (ScheduledExecutorService)statsUpdaterField.get(this.pulsar.getBrokerService());
        statsUpdater.shutdownNow();
        ledger.getCursors().forEach(cursor -> ledger.deactivateCursor(cursor));
    }

    static enum DispatchRateType {
        messageRate,
        byteRate;

    }
}

