package org.apache.pulsar.broker.service;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.class */
public class BrokerServiceThrottlingTest extends BrokerTestBase {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testThrottlingLookupRequestSemaphore() throws Exception {
        BrokerService brokerService = this.pulsar.getBrokerService();
        Assert.assertNotEquals(Integer.valueOf(((Semaphore) brokerService.lookupRequestSemaphore.get()).availablePermits()), 0);
        this.admin.brokers().updateDynamicConfiguration("maxConcurrentLookupRequest", Integer.toString(0));
        Thread.sleep(1000L);
        Assert.assertEquals(((Semaphore) brokerService.lookupRequestSemaphore.get()).availablePermits(), 0);
    }

    @Test
    public void testLookupThrottlingForClientByBroker0Permit() throws Exception {
        PulsarClient build = PulsarClient.builder().serviceUrl(new URI("pulsar://localhost:" + this.BROKER_PORT).toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        build.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("mysub").subscribe().close();
        this.admin.brokers().updateDynamicConfiguration("maxConcurrentLookupRequest", Integer.toString(0));
        for (int i = 0; i < 5 && this.pulsar.getConfiguration().getMaxConcurrentLookupRequest() != 0; i++) {
            Thread.sleep(100 + (i * 10));
        }
        try {
            build.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("mysub").subscribe().close();
            Assert.fail("It should fail as throttling should not receive any request");
        } catch (PulsarClientException.TooManyRequestsException e) {
        }
    }

    @Test
    public void testLookupThrottlingForClientByBroker() throws Exception {
        PulsarClient build = PulsarClient.builder().serviceUrl(new URI("pulsar://localhost:" + this.BROKER_PORT).toString()).statsInterval(0L, TimeUnit.SECONDS).ioThreads(20).connectionsPerBroker(20).build();
        this.admin.brokers().updateDynamicConfiguration("maxConcurrentLookupRequest", Integer.toString(1));
        for (int i = 0; i < 5 && this.pulsar.getConfiguration().getMaxConcurrentLookupRequest() != 1; i++) {
            Thread.sleep(100 + (i * 10));
        }
        List<Consumer> synchronizedList = Collections.synchronizedList(Lists.newArrayList());
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
        CountDownLatch countDownLatch = new CountDownLatch(20);
        for (int i2 = 0; i2 < 20; i2++) {
            newFixedThreadPool.execute(() -> {
                try {
                    synchronizedList.add(build.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("mysub").subscriptionType(SubscriptionType.Shared).subscribe());
                } catch (Exception e) {
                    Assert.fail("it shouldn't failed");
                } catch (PulsarClientException.TooManyRequestsException e2) {
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        for (Consumer consumer : synchronizedList) {
            if (consumer != null) {
                consumer.close();
            }
        }
        build.close();
        newFixedThreadPool.shutdown();
        Assert.assertNotEquals(Integer.valueOf(synchronizedList.size()), 20);
    }

    @Test
    public void testLookupThrottlingForClientByBrokerInternalRetry() throws Exception {
        PulsarClient build = PulsarClient.builder().serviceUrl(new URI("pulsar://localhost:" + this.BROKER_PORT).toString()).statsInterval(0L, TimeUnit.SECONDS).ioThreads(20).connectionsPerBroker(20).build();
        upsertLookupPermits(100);
        List synchronizedList = Collections.synchronizedList(Lists.newArrayList());
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
        CountDownLatch countDownLatch = new CountDownLatch(8);
        for (int i = 0; i < 8; i++) {
            newFixedThreadPool.execute(() -> {
                try {
                    synchronizedList.add(build.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("mysub").subscriptionType(SubscriptionType.Shared).subscribe());
                } catch (Exception e) {
                    Assert.fail("it shouldn't failed");
                } catch (PulsarClientException.TooManyRequestsException e2) {
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        stopBroker();
        this.conf.setMaxConcurrentLookupRequest(1);
        startBroker();
        retryStrategically(r5 -> {
            return areAllConsumersConnected(synchronizedList);
        }, 5, 500L);
        int i2 = 0;
        for (int i3 = 0; i3 < synchronizedList.size(); i3++) {
            if (((ConsumerImpl) synchronizedList.get(i3)).isConnected()) {
                i2++;
            }
            ((Consumer) synchronizedList.get(i3)).close();
        }
        Assert.assertEquals(i2, 8);
        newFixedThreadPool.shutdown();
        build.close();
    }

    private boolean areAllConsumersConnected(List<Consumer<byte[]>> list) {
        for (int i = 0; i < list.size(); i++) {
            if (!list.get(i).isConnected()) {
                return false;
            }
        }
        return true;
    }

    private void upsertLookupPermits(int i) throws Exception {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("maxConcurrentLookupRequest", Integer.toString(i));
        byte[] writeValueAsBytes = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(newHashMap);
        if (this.mockZookKeeper.exists("/admin/configuration", false) != null) {
            this.mockZookKeeper.setData("/admin/configuration", writeValueAsBytes, -1);
        } else {
            ZkUtils.createFullPathOptimistic(this.mockZookKeeper, "/admin/configuration", writeValueAsBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }
}
