/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import java.net.URL;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.ConfigHelper;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class BacklogQuotaManagerTest {
    protected static int BROKER_SERVICE_PORT = PortManager.nextFreePort();
    PulsarService pulsar;
    ServiceConfiguration config;
    URL adminUrl;
    PulsarAdmin admin;
    LocalBookkeeperEnsemble bkEnsemble;
    private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
    protected final int BROKER_WEBSERVICE_PORT = PortManager.nextFreePort();
    private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
    private static final Logger LOG = LoggerFactory.getLogger(BacklogQuotaManagerTest.class);

    @BeforeMethod
    void setup() throws Exception {
        try {
            this.bkEnsemble = new LocalBookkeeperEnsemble(3, this.ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
            this.bkEnsemble.start();
            this.config = new ServiceConfiguration();
            this.config.setZookeeperServers("127.0.0.1:" + this.ZOOKEEPER_PORT);
            this.config.setAdvertisedAddress("localhost");
            this.config.setWebServicePort(Optional.ofNullable(this.BROKER_WEBSERVICE_PORT));
            this.config.setClusterName("usc");
            this.config.setBrokerServicePort(Optional.ofNullable(BROKER_SERVICE_PORT));
            this.config.setAuthorizationEnabled(false);
            this.config.setAuthenticationEnabled(false);
            this.config.setBacklogQuotaCheckIntervalInSeconds(5);
            this.config.setManagedLedgerMaxEntriesPerLedger(5);
            this.config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
            this.config.setAllowAutoTopicCreationType("non-partitioned");
            this.pulsar = new PulsarService(this.config);
            this.pulsar.start();
            this.adminUrl = new URL("http://127.0.0.1:" + this.BROKER_WEBSERVICE_PORT);
            this.admin = PulsarAdmin.builder().serviceHttpUrl(this.adminUrl.toString()).build();
            this.admin.clusters().createCluster("usc", new ClusterData(this.adminUrl.toString()));
            this.admin.tenants().createTenant("prop", new TenantInfo((Set)Sets.newHashSet((Object[])new String[]{"appid1"}), (Set)Sets.newHashSet((Object[])new String[]{"usc"})));
            this.admin.namespaces().createNamespace("prop/ns-quota");
            this.admin.namespaces().setNamespaceReplicationClusters("prop/ns-quota", (Set)Sets.newHashSet((Object[])new String[]{"usc"}));
            this.admin.namespaces().createNamespace("prop/quotahold");
            this.admin.namespaces().setNamespaceReplicationClusters("prop/quotahold", (Set)Sets.newHashSet((Object[])new String[]{"usc"}));
            this.admin.namespaces().createNamespace("prop/quotaholdasync");
            this.admin.namespaces().setNamespaceReplicationClusters("prop/quotaholdasync", (Set)Sets.newHashSet((Object[])new String[]{"usc"}));
        }
        catch (Throwable t) {
            LOG.error("Error setting up broker test", t);
            Assert.fail((String)"Broker test setup failed");
        }
    }

    @AfterMethod
    void shutdown() throws Exception {
        try {
            this.admin.close();
            this.pulsar.close();
            this.bkEnsemble.stop();
        }
        catch (Throwable t) {
            LOG.error("Error cleaning up broker test setup state", t);
            Assert.fail((String)"Broker test cleanup failed");
        }
    }

    private void rolloverStats() {
        this.pulsar.getBrokerService().updateRates();
    }

    @Test
    public void testConsumerBacklogEviction() throws Exception {
        Assert.assertEquals((Object)this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), ConfigHelper.backlogQuotaMap(this.config));
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", new BacklogQuota(10240L, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
        PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        String topic1 = "persistent://prop/ns-quota/topic1";
        String subName1 = "c1";
        String subName2 = "c2";
        int numMsgs = 20;
        Consumer consumer1 = client.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic1"}).subscriptionName("c1").subscribe();
        Consumer consumer2 = client.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic1"}).subscriptionName("c2").subscribe();
        Producer producer = client.newProducer().topic("persistent://prop/ns-quota/topic1").create();
        byte[] content = new byte[1024];
        for (int i = 0; i < 20; ++i) {
            producer.send((Object)content);
            consumer1.receive();
            consumer2.receive();
        }
        Thread.sleep(6000L);
        this.rolloverStats();
        TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic1");
        Assert.assertTrue((stats.backlogSize < 10240L ? 1 : 0) != 0, (String)("Storage size is [" + stats.storageSize + "]"));
        client.close();
    }

    @Test
    public void testConsumerBacklogEvictionWithAck() throws Exception {
        Assert.assertEquals((Object)this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), ConfigHelper.backlogQuotaMap(this.config));
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", new BacklogQuota(10240L, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
        PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).build();
        String topic1 = "persistent://prop/ns-quota/topic11";
        String subName1 = "c11";
        String subName2 = "c21";
        int numMsgs = 20;
        Consumer consumer1 = client.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic11"}).subscriptionName("c11").subscribe();
        Consumer consumer2 = client.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic11"}).subscriptionName("c21").subscribe();
        Producer producer = client.newProducer().topic("persistent://prop/ns-quota/topic11").create();
        byte[] content = new byte[1024];
        for (int i = 0; i < 20; ++i) {
            producer.send((Object)content);
            consumer1.acknowledge(consumer1.receive());
            consumer2.receive();
        }
        Thread.sleep(6000L);
        this.rolloverStats();
        TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic11");
        Assert.assertTrue((stats.backlogSize <= 10240L ? 1 : 0) != 0, (String)("Storage size is [" + stats.storageSize + "]"));
        client.close();
    }

    @Test
    public void testConcurrentAckAndEviction() throws Exception {
        Assert.assertEquals((Object)this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), ConfigHelper.backlogQuotaMap(this.config));
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", new BacklogQuota(10240L, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
        String topic1 = "persistent://prop/ns-quota/topic12";
        String subName1 = "c12";
        String subName2 = "c22";
        int numMsgs = 20;
        final CyclicBarrier barrier = new CyclicBarrier(2);
        final CountDownLatch counter = new CountDownLatch(2);
        final AtomicBoolean gotException = new AtomicBoolean(false);
        final PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        PulsarClient client2 = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        final Consumer consumer1 = client2.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic12"}).subscriptionName("c12").subscribe();
        final Consumer consumer2 = client2.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic12"}).subscriptionName("c22").subscribe();
        Thread producerThread = new Thread(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    barrier.await();
                    Producer producer = client.newProducer().topic("persistent://prop/ns-quota/topic12").create();
                    byte[] content = new byte[1024];
                    for (int i = 0; i < 20; ++i) {
                        producer.send((Object)content);
                    }
                    producer.close();
                }
                catch (Exception e) {
                    gotException.set(true);
                }
                finally {
                    counter.countDown();
                }
            }
        };
        Thread ConsumerThread = new Thread(){

            @Override
            public void run() {
                try {
                    barrier.await();
                    for (int i = 0; i < 20; ++i) {
                        consumer1.acknowledge(consumer1.receive());
                        consumer2.receive();
                    }
                }
                catch (Exception e) {
                    gotException.set(true);
                }
                finally {
                    counter.countDown();
                }
            }
        };
        producerThread.start();
        ConsumerThread.start();
        counter.await(20L, TimeUnit.SECONDS);
        Assert.assertFalse((boolean)gotException.get());
        Thread.sleep(6000L);
        this.rolloverStats();
        TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic12");
        Assert.assertTrue((stats.backlogSize <= 10240L ? 1 : 0) != 0, (String)("Storage size is [" + stats.storageSize + "]"));
        client.close();
        client2.close();
    }

    @Test
    public void testNoEviction() throws Exception {
        Assert.assertEquals((Object)this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), ConfigHelper.backlogQuotaMap(this.config));
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", new BacklogQuota(10240L, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
        String topic1 = "persistent://prop/ns-quota/topic13";
        String subName1 = "c13";
        String subName2 = "c23";
        int numMsgs = 10;
        final CyclicBarrier barrier = new CyclicBarrier(2);
        final CountDownLatch counter = new CountDownLatch(2);
        final AtomicBoolean gotException = new AtomicBoolean(false);
        PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        final Consumer consumer1 = client.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic13"}).subscriptionName("c13").subscribe();
        final Consumer consumer2 = client.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic13"}).subscriptionName("c23").subscribe();
        final PulsarClient client2 = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        Thread producerThread = new Thread(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    barrier.await();
                    Producer producer = client2.newProducer().topic("persistent://prop/ns-quota/topic13").create();
                    byte[] content = new byte[1024];
                    for (int i = 0; i < 10; ++i) {
                        producer.send((Object)content);
                    }
                    producer.close();
                }
                catch (Exception e) {
                    gotException.set(true);
                }
                finally {
                    counter.countDown();
                }
            }
        };
        Thread ConsumerThread = new Thread(){

            @Override
            public void run() {
                try {
                    barrier.await();
                    for (int i = 0; i < 10; ++i) {
                        consumer1.acknowledge(consumer1.receive());
                        consumer2.acknowledge(consumer2.receive());
                    }
                }
                catch (Exception e) {
                    gotException.set(true);
                }
                finally {
                    counter.countDown();
                }
            }
        };
        producerThread.start();
        ConsumerThread.start();
        counter.await();
        Assert.assertFalse((boolean)gotException.get());
        client.close();
        client2.close();
    }

    @Test
    public void testEvictionMulti() throws Exception {
        Assert.assertEquals((Object)this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), ConfigHelper.backlogQuotaMap(this.config));
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", new BacklogQuota(15360L, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
        String topic1 = "persistent://prop/ns-quota/topic14";
        String subName1 = "c14";
        String subName2 = "c24";
        int numMsgs = 10;
        final CyclicBarrier barrier = new CyclicBarrier(4);
        final CountDownLatch counter = new CountDownLatch(4);
        final AtomicBoolean gotException = new AtomicBoolean(false);
        PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        final Consumer consumer1 = client.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic14"}).subscriptionName("c14").subscribe();
        final Consumer consumer2 = client.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic14"}).subscriptionName("c24").subscribe();
        final PulsarClient client3 = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        final PulsarClient client2 = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        Thread producerThread1 = new Thread(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    barrier.await();
                    Producer producer = client2.newProducer().topic("persistent://prop/ns-quota/topic14").create();
                    byte[] content = new byte[1024];
                    for (int i = 0; i < 10; ++i) {
                        producer.send((Object)content);
                    }
                    producer.close();
                }
                catch (Exception e) {
                    gotException.set(true);
                }
                finally {
                    counter.countDown();
                }
            }
        };
        Thread producerThread2 = new Thread(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    barrier.await();
                    Producer producer = client3.newProducer().topic("persistent://prop/ns-quota/topic14").create();
                    byte[] content = new byte[1024];
                    for (int i = 0; i < 10; ++i) {
                        producer.send((Object)content);
                    }
                    producer.close();
                }
                catch (Exception e) {
                    gotException.set(true);
                }
                finally {
                    counter.countDown();
                }
            }
        };
        Thread ConsumerThread1 = new Thread(){

            @Override
            public void run() {
                try {
                    barrier.await();
                    for (int i = 0; i < 20; ++i) {
                        consumer1.acknowledge(consumer1.receive());
                    }
                }
                catch (Exception e) {
                    gotException.set(true);
                }
                finally {
                    counter.countDown();
                }
            }
        };
        Thread ConsumerThread2 = new Thread(){

            @Override
            public void run() {
                try {
                    barrier.await();
                    for (int i = 0; i < 20; ++i) {
                        consumer2.acknowledge(consumer2.receive());
                    }
                }
                catch (Exception e) {
                    gotException.set(true);
                }
                finally {
                    counter.countDown();
                }
            }
        };
        producerThread1.start();
        producerThread2.start();
        ConsumerThread1.start();
        ConsumerThread2.start();
        counter.await(20L, TimeUnit.SECONDS);
        Assert.assertFalse((boolean)gotException.get());
        Thread.sleep(6000L);
        this.rolloverStats();
        TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic14");
        Assert.assertTrue((stats.backlogSize <= 15360L ? 1 : 0) != 0, (String)("Storage size is [" + stats.storageSize + "]"));
        client.close();
        client2.close();
        client3.close();
    }

    @Test
    public void testAheadProducerOnHold() throws Exception {
        int i;
        Assert.assertEquals((Object)this.admin.namespaces().getBacklogQuotaMap("prop/quotahold"), ConfigHelper.backlogQuotaMap(this.config));
        this.admin.namespaces().setBacklogQuota("prop/quotahold", new BacklogQuota(10240L, BacklogQuota.RetentionPolicy.producer_request_hold));
        PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        String topic1 = "persistent://prop/quotahold/hold";
        String subName1 = "c1hold";
        int numMsgs = 10;
        Consumer consumer = client.newConsumer().topic(new String[]{"persistent://prop/quotahold/hold"}).subscriptionName("c1hold").subscribe();
        byte[] content = new byte[1024];
        Producer producer = client.newProducer().topic("persistent://prop/quotahold/hold").sendTimeout(2, TimeUnit.SECONDS).create();
        for (i = 0; i <= 10; ++i) {
            try {
                producer.send((Object)content);
                LOG.info("sent [{}]", (Object)i);
                continue;
            }
            catch (PulsarClientException.TimeoutException cte) {
                LOG.info("timeout on [{}]", (Object)i);
            }
        }
        for (i = 0; i < 10; ++i) {
            consumer.receive();
            LOG.info("received [{}]", (Object)i);
        }
        Thread.sleep(6000L);
        this.rolloverStats();
        TopicStats stats = this.admin.topics().getStats("persistent://prop/quotahold/hold");
        Assert.assertEquals((int)stats.publishers.size(), (int)0, (String)("Number of producers on topic persistent://prop/quotahold/hold are [" + stats.publishers.size() + "]"));
        client.close();
    }

    @Test
    public void testAheadProducerOnHoldTimeout() throws Exception {
        Assert.assertEquals((Object)this.admin.namespaces().getBacklogQuotaMap("prop/quotahold"), ConfigHelper.backlogQuotaMap(this.config));
        this.admin.namespaces().setBacklogQuota("prop/quotahold", new BacklogQuota(10240L, BacklogQuota.RetentionPolicy.producer_request_hold));
        PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        String topic1 = "persistent://prop/quotahold/holdtimeout";
        String subName1 = "c1holdtimeout";
        boolean gotException = false;
        client.newConsumer().topic(new String[]{"persistent://prop/quotahold/holdtimeout"}).subscriptionName("c1holdtimeout").subscribe();
        byte[] content = new byte[1024];
        Producer producer = client.newProducer().topic("persistent://prop/quotahold/holdtimeout").sendTimeout(2, TimeUnit.SECONDS).create();
        for (int i = 0; i < 10; ++i) {
            producer.send((Object)content);
        }
        Thread.sleep(6000L);
        try {
            producer.send((Object)content);
            producer.send((Object)content);
            Assert.fail((String)"backlog quota did not exceed");
        }
        catch (PulsarClientException.TimeoutException te) {
            gotException = true;
        }
        Assert.assertTrue((boolean)gotException, (String)"timeout did not occur");
        client.close();
    }

    @Test
    public void testProducerException() throws Exception {
        Assert.assertEquals((Object)this.admin.namespaces().getBacklogQuotaMap("prop/quotahold"), ConfigHelper.backlogQuotaMap(this.config));
        this.admin.namespaces().setBacklogQuota("prop/quotahold", new BacklogQuota(10240L, BacklogQuota.RetentionPolicy.producer_exception));
        PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        String topic1 = "persistent://prop/quotahold/except";
        String subName1 = "c1except";
        boolean gotException = false;
        client.newConsumer().topic(new String[]{"persistent://prop/quotahold/except"}).subscriptionName("c1except").subscribe();
        byte[] content = new byte[1024];
        Producer producer = client.newProducer().topic("persistent://prop/quotahold/except").sendTimeout(2, TimeUnit.SECONDS).create();
        for (int i = 0; i < 10; ++i) {
            producer.send((Object)content);
        }
        Thread.sleep(6000L);
        try {
            producer.send((Object)content);
            producer.send((Object)content);
            Assert.fail((String)"backlog quota did not exceed");
        }
        catch (PulsarClientException ce) {
            Assert.assertTrue((ce instanceof PulsarClientException.ProducerBlockedQuotaExceededException || ce instanceof PulsarClientException.TimeoutException ? 1 : 0) != 0, (String)ce.getMessage());
            gotException = true;
        }
        Assert.assertTrue((boolean)gotException, (String)"backlog exceeded exception did not occur");
        client.close();
    }

    @Test
    public void testProducerExceptionAndThenUnblock() throws Exception {
        Assert.assertEquals((Object)this.admin.namespaces().getBacklogQuotaMap("prop/quotahold"), ConfigHelper.backlogQuotaMap(this.config));
        this.admin.namespaces().setBacklogQuota("prop/quotahold", new BacklogQuota(10240L, BacklogQuota.RetentionPolicy.producer_exception));
        PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        String topic1 = "persistent://prop/quotahold/exceptandunblock";
        String subName1 = "c1except";
        boolean gotException = false;
        Consumer consumer = client.newConsumer().topic(new String[]{"persistent://prop/quotahold/exceptandunblock"}).subscriptionName("c1except").subscribe();
        byte[] content = new byte[1024];
        Producer producer = client.newProducer().topic("persistent://prop/quotahold/exceptandunblock").sendTimeout(2, TimeUnit.SECONDS).create();
        for (int i = 0; i < 10; ++i) {
            producer.send((Object)content);
        }
        Thread.sleep(6000L);
        try {
            producer.send((Object)content);
            producer.send((Object)content);
            Assert.fail((String)"backlog quota did not exceed");
        }
        catch (PulsarClientException ce) {
            Assert.assertTrue((ce instanceof PulsarClientException.ProducerBlockedQuotaExceededException || ce instanceof PulsarClientException.TimeoutException ? 1 : 0) != 0, (String)ce.getMessage());
            gotException = true;
        }
        Assert.assertTrue((boolean)gotException, (String)"backlog exceeded exception did not occur");
        TopicStats stats = this.admin.topics().getStats("persistent://prop/quotahold/exceptandunblock");
        int backlog = (int)((SubscriptionStats)stats.subscriptions.get((Object)"c1except")).msgBacklog;
        for (int i = 0; i < backlog; ++i) {
            Message msg = consumer.receive();
            consumer.acknowledge(msg);
        }
        Thread.sleep(6000L);
        Exception sendException = null;
        gotException = false;
        try {
            for (int i = 0; i < 5; ++i) {
                producer.send((Object)content);
            }
        }
        catch (Exception e) {
            gotException = true;
            sendException = e;
        }
        Assert.assertFalse((boolean)gotException, (String)("unable to publish due to " + sendException));
        client.close();
    }
}

