/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class BrokerServiceTest
extends BrokerTestBase {
    private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt";
    private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/certificate/server.key";
    private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/certificate/client.crt";
    private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/certificate/client.key";

    @Override
    @BeforeClass
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @Override
    @AfterClass
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testOwnedNsCheck() throws Exception {
        String topic = "persistent://prop/ns-abc/successTopic";
        BrokerService service = this.pulsar.getBrokerService();
        CountDownLatch latch1 = new CountDownLatch(1);
        ((CompletableFuture)service.getOrCreateTopic("persistent://prop/ns-abc/successTopic").thenAccept(t -> {
            latch1.countDown();
            Assert.fail((String)"should fail as NS is not owned");
        })).exceptionally(exception -> {
            Assert.assertTrue((boolean)(exception.getCause() instanceof IOException));
            latch1.countDown();
            return null;
        });
        latch1.await();
        this.admin.lookups().lookupTopic("persistent://prop/ns-abc/successTopic");
        CountDownLatch latch2 = new CountDownLatch(1);
        ((CompletableFuture)service.getOrCreateTopic("persistent://prop/ns-abc/successTopic").thenAccept(t -> {
            try {
                Assert.assertNotNull((Object)service.getTopicReference("persistent://prop/ns-abc/successTopic"));
            }
            catch (Exception e) {
                Assert.fail((String)"should not fail");
            }
            latch2.countDown();
        })).exceptionally(exception -> {
            latch2.countDown();
            Assert.fail((String)"should not fail");
            return null;
        });
        latch2.await();
    }

    @Test
    public void testBrokerServicePersistentTopicStats() throws Exception {
        String topicName = "persistent://prop/ns-abc/successTopic";
        String subName = "successSub";
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/successTopic"}).subscriptionName("successSub").acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        Thread.sleep(100L);
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/successTopic").get();
        Assert.assertNotNull((Object)topicRef);
        this.rolloverPerIntervalStats();
        TopicStats stats = topicRef.getStats();
        SubscriptionStats subStats = (SubscriptionStats)stats.subscriptions.values().iterator().next();
        Assert.assertEquals((int)stats.subscriptions.keySet().size(), (int)1);
        Assert.assertEquals((long)subStats.msgBacklog, (long)0L);
        Assert.assertEquals((int)subStats.consumers.size(), (int)1);
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/successTopic").create();
        Thread.sleep(100L);
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Thread.sleep(100L);
        this.rolloverPerIntervalStats();
        stats = topicRef.getStats();
        subStats = (SubscriptionStats)stats.subscriptions.values().iterator().next();
        Assert.assertEquals((long)subStats.msgBacklog, (long)10L);
        Assert.assertEquals((int)stats.publishers.size(), (int)1);
        Assert.assertTrue((((PublisherStats)stats.publishers.get((int)0)).msgRateIn > 0.0 ? 1 : 0) != 0);
        Assert.assertTrue((((PublisherStats)stats.publishers.get((int)0)).msgThroughputIn > 0.0 ? 1 : 0) != 0);
        Assert.assertTrue((((PublisherStats)stats.publishers.get((int)0)).averageMsgSize > 0.0 ? 1 : 0) != 0);
        Assert.assertNotNull((Object)((PublisherStats)stats.publishers.get(0)).getClientVersion());
        Assert.assertEquals((Object)stats.msgRateIn, (Object)((PublisherStats)stats.publishers.get((int)0)).msgRateIn);
        Assert.assertEquals((Object)stats.msgThroughputIn, (Object)((PublisherStats)stats.publishers.get((int)0)).msgThroughputIn);
        double diff = stats.averageMsgSize - ((PublisherStats)stats.publishers.get((int)0)).averageMsgSize;
        Assert.assertTrue((Math.abs(diff) < 1.0E-6 ? 1 : 0) != 0);
        Assert.assertTrue((((ConsumerStats)subStats.consumers.get((int)0)).msgRateOut > 0.0 ? 1 : 0) != 0);
        Assert.assertTrue((((ConsumerStats)subStats.consumers.get((int)0)).msgThroughputOut > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((Object)subStats.msgRateOut, (Object)((ConsumerStats)subStats.consumers.get((int)0)).msgRateOut);
        Assert.assertEquals((Object)subStats.msgThroughputOut, (Object)((ConsumerStats)subStats.consumers.get((int)0)).msgThroughputOut);
        Assert.assertEquals((Object)stats.msgRateOut, (Object)((ConsumerStats)subStats.consumers.get((int)0)).msgRateOut);
        Assert.assertEquals((Object)stats.msgThroughputOut, (Object)((ConsumerStats)subStats.consumers.get((int)0)).msgThroughputOut);
        Assert.assertNotNull((Object)((ConsumerStats)subStats.consumers.get(0)).getClientVersion());
        for (int i = 0; i < 10; ++i) {
            Message msg = consumer.receive();
            consumer.acknowledge(msg);
        }
        consumer.close();
        Thread.sleep(100L);
        this.rolloverPerIntervalStats();
        stats = topicRef.getStats();
        subStats = (SubscriptionStats)stats.subscriptions.values().iterator().next();
        Assert.assertEquals((long)subStats.msgBacklog, (long)0L);
    }

    @Test
    public void testStatsOfStorageSizeWithSubscription() throws Exception {
        String topicName = "persistent://prop/ns-abc/no-subscription";
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/no-subscription").create();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/no-subscription").get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertEquals((long)topicRef.getStats().storageSize, (long)0L);
        for (int i = 0; i < 10; ++i) {
            producer.send((Object)new byte[10]);
        }
        Assert.assertTrue((topicRef.getStats().storageSize > 0L ? 1 : 0) != 0);
    }

    @Test
    public void testBrokerServicePersistentRedeliverTopicStats() throws Exception {
        String topicName = "persistent://prop/ns-abc/successSharedTopic";
        String subName = "successSharedSub";
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/successSharedTopic"}).subscriptionName("successSharedSub").subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        Thread.sleep(100L);
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/successSharedTopic").get();
        Assert.assertNotNull((Object)topicRef);
        this.rolloverPerIntervalStats();
        TopicStats stats = topicRef.getStats();
        SubscriptionStats subStats = (SubscriptionStats)stats.subscriptions.values().iterator().next();
        Assert.assertEquals((int)stats.subscriptions.keySet().size(), (int)1);
        Assert.assertEquals((long)subStats.msgBacklog, (long)0L);
        Assert.assertEquals((int)subStats.consumers.size(), (int)1);
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/successSharedTopic").create();
        Thread.sleep(100L);
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Thread.sleep(100L);
        this.rolloverPerIntervalStats();
        stats = topicRef.getStats();
        subStats = (SubscriptionStats)stats.subscriptions.values().iterator().next();
        Assert.assertEquals((long)subStats.msgBacklog, (long)10L);
        Assert.assertEquals((int)stats.publishers.size(), (int)1);
        Assert.assertTrue((((PublisherStats)stats.publishers.get((int)0)).msgRateIn > 0.0 ? 1 : 0) != 0);
        Assert.assertTrue((((PublisherStats)stats.publishers.get((int)0)).msgThroughputIn > 0.0 ? 1 : 0) != 0);
        Assert.assertTrue((((PublisherStats)stats.publishers.get((int)0)).averageMsgSize > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((Object)stats.msgRateIn, (Object)((PublisherStats)stats.publishers.get((int)0)).msgRateIn);
        Assert.assertEquals((Object)stats.msgThroughputIn, (Object)((PublisherStats)stats.publishers.get((int)0)).msgThroughputIn);
        double diff = stats.averageMsgSize - ((PublisherStats)stats.publishers.get((int)0)).averageMsgSize;
        Assert.assertTrue((Math.abs(diff) < 1.0E-6 ? 1 : 0) != 0);
        Assert.assertTrue((((ConsumerStats)subStats.consumers.get((int)0)).msgRateOut > 0.0 ? 1 : 0) != 0);
        Assert.assertTrue((((ConsumerStats)subStats.consumers.get((int)0)).msgThroughputOut > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((Object)subStats.msgRateRedeliver, (Object)0.0);
        Assert.assertEquals((int)((ConsumerStats)subStats.consumers.get((int)0)).unackedMessages, (int)10);
        Assert.assertEquals((Object)subStats.msgRateOut, (Object)((ConsumerStats)subStats.consumers.get((int)0)).msgRateOut);
        Assert.assertEquals((Object)subStats.msgThroughputOut, (Object)((ConsumerStats)subStats.consumers.get((int)0)).msgThroughputOut);
        Assert.assertEquals((Object)subStats.msgRateRedeliver, (Object)((ConsumerStats)subStats.consumers.get((int)0)).msgRateRedeliver);
        Assert.assertEquals((Object)stats.msgRateOut, (Object)((ConsumerStats)subStats.consumers.get((int)0)).msgRateOut);
        Assert.assertEquals((Object)stats.msgThroughputOut, (Object)((ConsumerStats)subStats.consumers.get((int)0)).msgThroughputOut);
        Assert.assertEquals((Object)subStats.msgRateRedeliver, (Object)((ConsumerStats)subStats.consumers.get((int)0)).msgRateRedeliver);
        Assert.assertEquals((long)subStats.unackedMessages, (long)((ConsumerStats)subStats.consumers.get((int)0)).unackedMessages);
        consumer.redeliverUnacknowledgedMessages();
        Thread.sleep(100L);
        this.rolloverPerIntervalStats();
        stats = topicRef.getStats();
        subStats = (SubscriptionStats)stats.subscriptions.values().iterator().next();
        Assert.assertTrue((subStats.msgRateRedeliver > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((Object)subStats.msgRateRedeliver, (Object)((ConsumerStats)subStats.consumers.get((int)0)).msgRateRedeliver);
        for (int i = 0; i < 10; ++i) {
            Message msg = consumer.receive();
            consumer.acknowledge(msg);
        }
        consumer.close();
        Thread.sleep(100L);
        this.rolloverPerIntervalStats();
        stats = topicRef.getStats();
        subStats = (SubscriptionStats)stats.subscriptions.values().iterator().next();
        Assert.assertEquals((long)subStats.msgBacklog, (long)0L);
    }

    @Test
    public void testBrokerStatsMetrics() throws Exception {
        String topicName = "persistent://prop/ns-abc/newTopic";
        String subName = "newSub";
        BrokerStats brokerStatsClient = this.admin.brokerStats();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
        Thread.sleep(100L);
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/newTopic").create();
        Thread.sleep(100L);
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Thread.sleep(100L);
        Message msg = null;
        for (int i = 0; i < 10; ++i) {
            msg = consumer.receive();
            consumer.acknowledge(msg);
        }
        consumer.close();
        Thread.sleep(100L);
        JsonArray metrics = brokerStatsClient.getMetrics();
        boolean namespaceDimensionFound = false;
        boolean topicLoadTimesDimensionFound = false;
        for (int i = 0; i < metrics.size(); ++i) {
            try {
                String data = metrics.get(i).getAsJsonObject().get("dimensions").toString();
                if (!namespaceDimensionFound && data.contains("prop/ns-abc")) {
                    namespaceDimensionFound = true;
                }
                if (topicLoadTimesDimensionFound || !data.contains("prop/ns-abc")) continue;
                topicLoadTimesDimensionFound = true;
                continue;
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        Assert.assertTrue((namespaceDimensionFound && topicLoadTimesDimensionFound ? 1 : 0) != 0);
        Thread.sleep(100L);
    }

    @Test
    public void testBrokerServiceNamespaceStats() throws Exception {
        int numBundles = 4;
        String ns1 = "prop/stats1";
        String ns2 = "prop/stats2";
        ArrayList nsList = Lists.newArrayList((Object[])new String[]{"prop/stats1", "prop/stats2"});
        ArrayList producerList = Lists.newArrayList();
        BrokerStats brokerStatsClient = this.admin.brokerStats();
        for (String ns : nsList) {
            this.admin.namespaces().createNamespace(ns, 4);
            this.admin.namespaces().setNamespaceReplicationClusters(ns, (Set)Sets.newHashSet((Object[])new String[]{"test"}));
            String topic1 = String.format("persistent://%s/topic1", ns);
            producerList.add(this.pulsarClient.newProducer().topic(topic1).create());
            String topic2 = String.format("persistent://%s/topic2", ns);
            producerList.add(this.pulsarClient.newProducer().topic(topic2).create());
        }
        this.rolloverPerIntervalStats();
        JsonObject topicStats = brokerStatsClient.getTopics();
        Assert.assertEquals((int)topicStats.size(), (int)2, (String)topicStats.toString());
        for (String ns : nsList) {
            JsonObject nsObject = topicStats.getAsJsonObject(ns);
            List topicList = this.admin.namespaces().getTopics(ns);
            for (String topic : topicList) {
                NamespaceBundle bundle = this.pulsar.getNamespaceService().getBundle(TopicName.get((String)topic));
                JsonObject bundleObject = nsObject.getAsJsonObject(bundle.getBundleRange());
                JsonObject topicObject = bundleObject.getAsJsonObject("persistent");
                AtomicBoolean topicPresent = new AtomicBoolean();
                topicObject.entrySet().iterator().forEachRemaining(persistentTopic -> {
                    if (((String)persistentTopic.getKey()).equals(topic)) {
                        topicPresent.set(true);
                    }
                });
                Assert.assertTrue((boolean)topicPresent.get());
            }
        }
        for (Producer producer : producerList) {
            producer.close();
        }
        for (String ns : nsList) {
            List topics = this.admin.namespaces().getTopics(ns);
            for (String dest : topics) {
                this.admin.topics().delete(dest);
            }
            this.admin.namespaces().deleteNamespace(ns);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTlsDisabled() throws Exception {
        Consumer consumer;
        String topicName = "persistent://prop/ns-abc/newTopic";
        String subName = "newSub";
        PulsarClient pulsarClient = null;
        this.conf.setAuthenticationEnabled(false);
        this.restartBroker();
        try {
            pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
        catch (Exception e) {
            Assert.fail((String)"should not fail");
        }
        finally {
            pulsarClient.close();
        }
        try {
            pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).statsInterval(0L, TimeUnit.SECONDS).build();
            consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
            try {
                Assert.fail((String)"TLS connection should fail");
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)e.getMessage().contains("ConnectException"));
        }
        finally {
            pulsarClient.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTlsEnabled() throws Exception {
        Consumer consumer;
        String topicName = "persistent://prop/ns-abc/newTopic";
        String subName = "newSub";
        this.conf.setAuthenticationEnabled(false);
        this.conf.setBrokerServicePortTls(Optional.of(this.BROKER_PORT_TLS));
        this.conf.setWebServicePortTls(Optional.of(this.BROKER_WEBSERVICE_PORT_TLS));
        this.conf.setTlsCertificateFilePath("./src/test/resources/certificate/server.crt");
        this.conf.setTlsKeyFilePath("./src/test/resources/certificate/server.key");
        this.restartBroker();
        try (PulsarClient pulsarClient = null;){
            pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
        try {
            pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).build();
            consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
        catch (Exception e) {
            Assert.fail((String)"should not fail");
        }
        finally {
            pulsarClient.close();
        }
        try {
            pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(false).statsInterval(0L, TimeUnit.SECONDS).build();
            consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
            try {
                Assert.fail((String)"should fail");
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)e.getMessage().contains("General OpenSslEngine problem"));
        }
        finally {
            pulsarClient.close();
        }
        try {
            pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(false).tlsTrustCertsFilePath("./src/test/resources/certificate/server.crt").statsInterval(0L, TimeUnit.SECONDS).build();
            consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
        catch (Exception e) {
            Assert.fail((String)"should not fail");
        }
        finally {
            pulsarClient.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTlsAuthAllowInsecure() throws Exception {
        Consumer consumer;
        String topicName = "persistent://prop/ns-abc/newTopic";
        String subName = "newSub";
        HashSet<String> providers = new HashSet<String>();
        providers.add("org.apache.pulsar.broker.authentication.AuthenticationProviderTls");
        this.conf.setAuthenticationEnabled(true);
        this.conf.setAuthenticationProviders(providers);
        this.conf.setBrokerServicePortTls(Optional.of(this.BROKER_PORT_TLS));
        this.conf.setWebServicePortTls(Optional.of(this.BROKER_WEBSERVICE_PORT_TLS));
        this.conf.setTlsCertificateFilePath("./src/test/resources/certificate/server.crt");
        this.conf.setTlsKeyFilePath("./src/test/resources/certificate/server.key");
        this.conf.setTlsAllowInsecureConnection(true);
        this.restartBroker();
        HashMap<String, String> authParams = new HashMap<String, String>();
        authParams.put("tlsCertFile", "./src/test/resources/certificate/client.crt");
        authParams.put("tlsKeyFile", "./src/test/resources/certificate/client.key");
        try (PulsarClient pulsarClient = null;){
            pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).build();
            consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
            try {
                Assert.fail((String)"should fail");
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        try {
            AuthenticationTls auth = new AuthenticationTls();
            auth.configure(authParams);
            pulsarClient = PulsarClient.builder().authentication((Authentication)auth).serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).build();
            consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
        catch (Exception e) {
            Assert.fail((String)"should not fail");
        }
        finally {
            pulsarClient.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTlsAuthDisallowInsecure() throws Exception {
        Consumer consumer;
        String topicName = "persistent://prop/my-ns/newTopic";
        String subName = "newSub";
        HashSet<String> providers = new HashSet<String>();
        providers.add("org.apache.pulsar.broker.authentication.AuthenticationProviderTls");
        this.conf.setAuthenticationEnabled(true);
        this.conf.setAuthenticationProviders(providers);
        this.conf.setBrokerServicePortTls(Optional.of(this.BROKER_PORT_TLS));
        this.conf.setWebServicePortTls(Optional.of(this.BROKER_WEBSERVICE_PORT_TLS));
        this.conf.setTlsCertificateFilePath("./src/test/resources/certificate/server.crt");
        this.conf.setTlsKeyFilePath("./src/test/resources/certificate/server.key");
        this.conf.setTlsAllowInsecureConnection(false);
        this.restartBroker();
        HashMap<String, String> authParams = new HashMap<String, String>();
        authParams.put("tlsCertFile", "./src/test/resources/certificate/client.crt");
        authParams.put("tlsKeyFile", "./src/test/resources/certificate/client.key");
        try (PulsarClient pulsarClient = null;){
            pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).build();
            consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/my-ns/newTopic"}).subscriptionName("newSub").subscribe();
            try {
                Assert.fail((String)"should fail");
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        try {
            AuthenticationTls auth = new AuthenticationTls();
            auth.configure(authParams);
            pulsarClient = PulsarClient.builder().authentication((Authentication)auth).serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).build();
            consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/my-ns/newTopic"}).subscriptionName("newSub").subscribe();
            try {
                Assert.fail((String)"should fail");
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Authentication required"));
        }
        finally {
            pulsarClient.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTlsAuthUseTrustCert() throws Exception {
        Consumer consumer;
        String topicName = "persistent://prop/ns-abc/newTopic";
        String subName = "newSub";
        HashSet<String> providers = new HashSet<String>();
        providers.add("org.apache.pulsar.broker.authentication.AuthenticationProviderTls");
        this.conf.setAuthenticationEnabled(true);
        this.conf.setAuthenticationProviders(providers);
        this.conf.setBrokerServicePortTls(Optional.of(this.BROKER_PORT_TLS));
        this.conf.setWebServicePortTls(Optional.of(this.BROKER_WEBSERVICE_PORT_TLS));
        this.conf.setTlsCertificateFilePath("./src/test/resources/certificate/server.crt");
        this.conf.setTlsKeyFilePath("./src/test/resources/certificate/server.key");
        this.conf.setTlsAllowInsecureConnection(false);
        this.conf.setTlsTrustCertsFilePath("./src/test/resources/certificate/client.crt");
        this.restartBroker();
        HashMap<String, String> authParams = new HashMap<String, String>();
        authParams.put("tlsCertFile", "./src/test/resources/certificate/client.crt");
        authParams.put("tlsKeyFile", "./src/test/resources/certificate/client.key");
        try (PulsarClient pulsarClient = null;){
            pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).build();
            consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
            try {
                Assert.fail((String)"should fail");
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        try {
            AuthenticationTls auth = new AuthenticationTls();
            auth.configure(authParams);
            pulsarClient = PulsarClient.builder().authentication((Authentication)auth).serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).build();
            consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
        catch (Exception e) {
            Assert.fail((String)"should not fail");
        }
        finally {
            pulsarClient.close();
        }
    }

    @Test
    public void testLookupThrottlingForClientByClient() throws Exception {
        block4: {
            CompletableFuture consumer2;
            CompletableFuture consumer1;
            String topicName = "persistent://prop/ns-abc/newTopic";
            String lookupUrl = new URI("pulsar://localhost:" + this.BROKER_PORT).toString();
            PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl).statsInterval(0L, TimeUnit.SECONDS).maxConcurrentLookupRequests(1).maxLookupRequests(2).build();
            try {
                consumer1 = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("mysub1").subscribeAsync();
                consumer2 = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("mysub2").subscribeAsync();
                ((Consumer)consumer1.get()).close();
                ((Consumer)consumer2.get()).close();
            }
            catch (Exception e) {
                Assert.fail((String)"Subscribe should success with 2 requests");
            }
            try {
                consumer1 = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("mysub11").subscribeAsync();
                consumer2 = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("mysub22").subscribeAsync();
                CompletableFuture consumer3 = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("mysub33").subscribeAsync();
                ((Consumer)consumer1.get()).close();
                ((Consumer)consumer2.get()).close();
                ((Consumer)consumer3.get()).close();
                Assert.fail((String)"It should fail as throttling should only receive 2 requests");
            }
            catch (Exception e) {
                if (e.getCause() instanceof PulsarClientException.TooManyRequestsException) break block4;
                Assert.fail((String)"Subscribe should fail with TooManyRequestsException");
            }
        }
    }

    @Test
    public void testTopicLoadingOnDisableNamespaceBundle() throws Exception {
        block2: {
            String namespace = "prop/disableBundle";
            this.admin.namespaces().createNamespace("prop/disableBundle");
            this.admin.namespaces().setNamespaceReplicationClusters("prop/disableBundle", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
            String topicName = "persistent://prop/disableBundle/my-topic";
            TopicName topic = TopicName.get((String)"persistent://prop/disableBundle/my-topic");
            Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/disableBundle/my-topic").create();
            producer.close();
            NamespaceBundle bundle = this.pulsar.getNamespaceService().getBundle(topic);
            this.pulsar.getNamespaceService().getOwnershipCache().updateBundleState(bundle, false);
            CompletableFuture futureResult = this.pulsar.getBrokerService().loadOrCreatePersistentTopic("persistent://prop/disableBundle/my-topic", true);
            try {
                futureResult.get();
                Assert.fail((String)"Topic creation should fail due to disable bundle");
            }
            catch (Exception e) {
                if (e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException) break block2;
                Assert.fail((String)"Topic creation should fail with ServiceUnitNotReadyException");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=3000L)
    public void testTopicFailureShouldNotHaveDeadLock() {
        String namespace = "prop/ns-abc";
        String deadLockTestTopic = "persistent://prop/ns-abc/deadLockTestTopic";
        try {
            String successfulTopic = "persistent://prop/ns-abc/ownBundleTopic";
            Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/ownBundleTopic").create();
            producer.close();
        }
        catch (Exception e) {
            Assert.fail((String)e.getMessage());
        }
        ExecutorService executor = Executors.newSingleThreadExecutor();
        BrokerService service = (BrokerService)Mockito.spy((Object)this.pulsar.getBrokerService());
        CompletableFuture failedManagedLedgerConfig = new CompletableFuture();
        failedManagedLedgerConfig.completeExceptionally(new NullPointerException("failed to peristent policy"));
        ((BrokerService)Mockito.doReturn(failedManagedLedgerConfig).when((Object)service)).getManagedLedgerConfig((TopicName)Mockito.any());
        CompletableFuture topicCreation = new CompletableFuture();
        executor.submit(() -> ((CompletableFuture)service.getOrCreateTopic("persistent://prop/ns-abc/deadLockTestTopic").thenAccept(topic -> topicCreation.complete(null))).exceptionally(e -> {
            topicCreation.completeExceptionally(e.getCause());
            return null;
        }));
        try {
            topicCreation.get(1L, TimeUnit.SECONDS);
        }
        catch (InterruptedException | TimeoutException e) {
            Assert.fail((String)"there is a dead-lock and it should have been prevented");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof NullPointerException));
        }
        finally {
            executor.shutdownNow();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLedgerOpenFailureShouldNotHaveDeadLock() throws Exception {
        String namespace = "prop/ns-abc";
        String deadLockTestTopic = "persistent://prop/ns-abc/deadLockTestTopic";
        try {
            String successfulTopic = "persistent://prop/ns-abc/ownBundleTopic";
            Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/ownBundleTopic").create();
            producer.close();
        }
        catch (Exception e) {
            Assert.fail((String)e.getMessage());
        }
        ExecutorService executor = Executors.newSingleThreadExecutor();
        BrokerService service = (BrokerService)Mockito.spy((Object)this.pulsar.getBrokerService());
        CompletableFuture<ManagedLedgerConfig> failedManagedLedgerConfig = new CompletableFuture<ManagedLedgerConfig>();
        failedManagedLedgerConfig.complete(new ManagedLedgerConfig());
        ((BrokerService)Mockito.doReturn(failedManagedLedgerConfig).when((Object)service)).getManagedLedgerConfig((TopicName)Mockito.any());
        CompletableFuture topicCreation = new CompletableFuture();
        Field ledgerField = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
        ledgerField.setAccessible(true);
        ConcurrentHashMap ledgers = (ConcurrentHashMap)ledgerField.get(this.pulsar.getManagedLedgerFactory());
        CompletableFuture future = new CompletableFuture();
        future.completeExceptionally((Throwable)new ManagedLedgerException("ledger opening failed"));
        ledgers.put("prop/ns-abc/persistent/deadLockTestTopic", future);
        executor.submit(() -> ((CompletableFuture)service.getOrCreateTopic("persistent://prop/ns-abc/deadLockTestTopic").thenAccept(topic -> topicCreation.complete(null))).exceptionally(e -> {
            topicCreation.completeExceptionally(e.getCause());
            return null;
        }));
        try {
            topicCreation.get(1L, TimeUnit.SECONDS);
        }
        catch (InterruptedException | TimeoutException e) {
            Assert.fail((String)"there is a dead-lock and it should have been prevented");
        }
        catch (ExecutionException e) {
            Assert.assertEquals(e.getCause().getClass(), BrokerServiceException.PersistenceException.class);
        }
        finally {
            executor.shutdownNow();
            ledgers.clear();
        }
    }

    @Test
    public void testCreateNamespacePolicy() throws Exception {
        String namespace = "prop/testPolicy";
        int totalBundle = 3;
        System.err.println("----------------");
        this.admin.namespaces().createNamespace("prop/testPolicy", new BundlesData(3));
        String globalPath = PulsarWebResource.joinPath((String[])new String[]{"/admin/local-policies", "prop/testPolicy"});
        this.pulsar.getLocalZkCacheService().policiesCache().clear();
        Optional policy = this.pulsar.getLocalZkCacheService().policiesCache().get(globalPath);
        Assert.assertTrue((boolean)policy.isPresent());
        Assert.assertEquals((int)((LocalPolicies)policy.get()).bundles.numBundles, (int)3);
    }
}

