/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadBalancerTest;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class SLAMonitoringTest {
    LocalBookkeeperEnsemble bkEnsemble;
    ExecutorService executor = new ThreadPoolExecutor(5, 20, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    private static final Logger log = LoggerFactory.getLogger(LoadBalancerTest.class);
    private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
    private static final int BROKER_COUNT = 5;
    private int[] brokerWebServicePorts = new int[5];
    private int[] brokerNativeBrokerPorts = new int[5];
    private URL[] brokerUrls = new URL[5];
    private PulsarService[] pulsarServices = new PulsarService[5];
    private PulsarAdmin[] pulsarAdmins = new PulsarAdmin[5];
    private ServiceConfiguration[] configurations = new ServiceConfiguration[5];

    @BeforeClass
    void setup() throws Exception {
        int i;
        log.info("---- Initializing SLAMonitoringTest -----");
        this.bkEnsemble = new LocalBookkeeperEnsemble(3, this.ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
        this.bkEnsemble.start();
        for (i = 0; i < 5; ++i) {
            this.brokerWebServicePorts[i] = PortManager.nextFreePort();
            this.brokerNativeBrokerPorts[i] = PortManager.nextFreePort();
            ServiceConfiguration config = new ServiceConfiguration();
            config.setBrokerServicePort(Optional.ofNullable(this.brokerNativeBrokerPorts[i]));
            config.setClusterName("my-cluster");
            config.setAdvertisedAddress("localhost");
            config.setWebServicePort(Optional.ofNullable(this.brokerWebServicePorts[i]));
            config.setZookeeperServers("127.0.0.1:" + this.ZOOKEEPER_PORT);
            config.setBrokerServicePort(Optional.ofNullable(this.brokerNativeBrokerPorts[i]));
            config.setDefaultNumberOfNamespaceBundles(1);
            config.setLoadBalancerEnabled(false);
            this.configurations[i] = config;
            this.pulsarServices[i] = new PulsarService(config);
            this.pulsarServices[i].start();
            this.brokerUrls[i] = new URL("http://127.0.0.1:" + this.brokerWebServicePorts[i]);
            this.pulsarAdmins[i] = PulsarAdmin.builder().serviceHttpUrl(this.brokerUrls[i].toString()).build();
        }
        Thread.sleep(100L);
        this.createTenant(this.pulsarAdmins[4]);
        for (i = 0; i < 5; ++i) {
            String topic = String.format("%s/%s/%s:%s", "sla-monitor", "my-cluster", this.pulsarServices[i].getAdvertisedAddress(), this.brokerWebServicePorts[i]);
            this.pulsarAdmins[0].namespaces().createNamespace(topic);
        }
    }

    private void createTenant(PulsarAdmin pulsarAdmin) throws PulsarClientException, MalformedURLException, PulsarAdminException {
        ClusterData clusterData = new ClusterData();
        clusterData.setServiceUrl(pulsarAdmin.getServiceUrl());
        this.pulsarAdmins[0].clusters().createCluster("my-cluster", clusterData);
        HashSet<String> allowedClusters = new HashSet<String>();
        allowedClusters.add("my-cluster");
        TenantInfo adminConfig = new TenantInfo();
        adminConfig.setAllowedClusters(allowedClusters);
        HashSet<String> adminRoles = new HashSet<String>();
        adminRoles.add("");
        adminConfig.setAdminRoles(adminRoles);
        pulsarAdmin.tenants().createTenant("sla-monitor", adminConfig);
    }

    @AfterClass
    void shutdown() throws Exception {
        log.info("--- Shutting down ---");
        this.executor.shutdown();
        for (int i = 0; i < 5; ++i) {
            this.pulsarAdmins[i].close();
            this.pulsarServices[i].close();
        }
        this.bkEnsemble.stop();
    }

    @Test
    public void testOwnershipAfterSetup() {
        for (int i = 0; i < 5; ++i) {
            try {
                Assert.assertTrue((boolean)this.pulsarServices[0].getNamespaceService().registerSLANamespace());
                continue;
            }
            catch (PulsarServerException e) {
                e.printStackTrace();
                log.error("Exception occured", (Throwable)e);
                Assert.fail((String)"SLA Namespace should have been owned by the broker, Exception.", (Throwable)e);
            }
        }
    }

    @Test
    public void testOwnedNamespaces() {
        this.testOwnershipViaAdminAfterSetup();
        try {
            for (int i = 0; i < 5; ++i) {
                List list = this.pulsarAdmins[i].brokers().getActiveBrokers("my-cluster");
                Assert.assertNotNull((Object)list);
                Assert.assertEquals((int)list.size(), (int)5);
                Map nsMap = this.pulsarAdmins[i].brokers().getOwnedNamespaces("my-cluster", (String)list.get(0));
                Assert.assertEquals((int)2, (int)nsMap.size());
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)"Hearbeat namespace and SLA namespace should be owned by the broker");
        }
    }

    @Test
    public void testOwnershipViaAdminAfterSetup() {
        for (int i = 0; i < 5; ++i) {
            try {
                String topic = String.format("persistent://%s/%s/%s:%s/%s", "sla-monitor", "my-cluster", this.pulsarServices[i].getAdvertisedAddress(), this.brokerWebServicePorts[i], "my-topic");
                Assert.assertEquals((String)this.pulsarAdmins[0].lookups().lookupTopic(topic), (String)("pulsar://" + this.pulsarServices[i].getAdvertisedAddress() + ":" + this.brokerNativeBrokerPorts[i]));
                continue;
            }
            catch (Exception e) {
                e.printStackTrace();
                Assert.fail((String)("SLA Namespace should have been owned by the broker(pulsar://" + this.pulsarServices[i].getAdvertisedAddress() + ":" + this.brokerNativeBrokerPorts[i] + ")"));
            }
        }
    }

    @Test
    public void testUnloadIfBrokerCrashes() {
        int crashIndex = 2;
        log.info("Trying to close the broker at index = {}", (Object)crashIndex);
        try {
            this.pulsarServices[crashIndex].close();
        }
        catch (PulsarServerException e) {
            e.printStackTrace();
            Assert.fail((String)("Should be a able to close the broker index " + crashIndex + " Exception: " + (Object)((Object)e)));
        }
        String topic = String.format("persistent://%s/%s/%s:%s/%s", "sla-monitor", "my-cluster", this.pulsarServices[crashIndex].getAdvertisedAddress(), this.brokerWebServicePorts[crashIndex], "my-topic");
        log.info("Lookup for namespace {}", (Object)topic);
        String broker = null;
        try {
            broker = this.pulsarAdmins[4].lookups().lookupTopic(topic);
            log.info("{} Namespace is owned by {}", (Object)topic, (Object)broker);
            Assert.assertNotEquals((Object)broker, (Object)("pulsar://" + this.pulsarServices[crashIndex].getAdvertisedAddress() + ":" + this.brokerNativeBrokerPorts[crashIndex]));
        }
        catch (PulsarAdminException e) {
            e.printStackTrace();
            Assert.fail((String)"The SLA Monitor namespace should be owned by some other broker");
        }
        try {
            this.pulsarServices[crashIndex] = new PulsarService(this.configurations[crashIndex]);
            this.pulsarServices[crashIndex].start();
            Assert.assertEquals(this.pulsarServices[crashIndex].getConfiguration().getBrokerServicePort().get(), (Object)new Integer(this.brokerNativeBrokerPorts[crashIndex]));
        }
        catch (PulsarServerException e) {
            e.printStackTrace();
            Assert.fail((String)"The broker should be able to start without exception");
        }
        try {
            broker = this.pulsarAdmins[0].lookups().lookupTopic(topic);
            log.info("{} Namespace is re-owned by {}", (Object)topic, (Object)broker);
            Assert.assertEquals((String)broker, (String)("pulsar://" + this.pulsarServices[crashIndex].getAdvertisedAddress() + ":" + this.brokerNativeBrokerPorts[crashIndex]));
        }
        catch (PulsarAdminException e) {
            e.printStackTrace();
            Assert.fail((String)("The SLA Monitor namespace should be reowned by the broker" + broker));
        }
        try {
            this.pulsarServices[crashIndex].close();
        }
        catch (PulsarServerException e) {
            e.printStackTrace();
            Assert.fail((String)"The broker should be able to stop without exception");
        }
    }
}

