/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.loadbalance;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.test.PortManager;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.ResourceDescription;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.loadbalance.impl.PulsarResourceDescription;
import org.apache.pulsar.broker.loadbalance.impl.ResourceAvailabilityRanker;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.internal.NamespacesImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.LocalZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class LoadBalancerTest {
    LocalBookkeeperEnsemble bkEnsemble;
    ExecutorService executor = new ThreadPoolExecutor(5, 20, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    private static final Logger log = LoggerFactory.getLogger(LoadBalancerTest.class);
    private static final int MAX_RETRIES = 10;
    private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
    private static final int BROKER_COUNT = 5;
    private int[] brokerWebServicePorts = new int[5];
    private int[] brokerNativeBrokerPorts = new int[5];
    private URL[] brokerUrls = new URL[5];
    private String[] lookupAddresses = new String[5];
    private PulsarService[] pulsarServices = new PulsarService[5];
    private PulsarAdmin[] pulsarAdmins = new PulsarAdmin[5];

    @BeforeMethod
    void setup() throws Exception {
        this.bkEnsemble = new LocalBookkeeperEnsemble(3, this.ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
        this.bkEnsemble.start();
        ZkUtils.createFullPathOptimistic((ZooKeeper)this.bkEnsemble.getZkClient(), (String)"/loadbalance/settings/strategy", (byte[])"{\"loadBalancerStrategy\":\"leastLoadedServer\"}".getBytes(), (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, (CreateMode)CreateMode.PERSISTENT);
        String localhost = "localhost";
        for (int i = 0; i < 5; ++i) {
            this.brokerWebServicePorts[i] = PortManager.nextFreePort();
            this.brokerNativeBrokerPorts[i] = PortManager.nextFreePort();
            ServiceConfiguration config = new ServiceConfiguration();
            config.setBrokerServicePort(Optional.ofNullable(this.brokerNativeBrokerPorts[i]));
            config.setClusterName("use");
            config.setAdvertisedAddress("localhost");
            config.setAdvertisedAddress("localhost");
            config.setWebServicePort(Optional.ofNullable(this.brokerWebServicePorts[i]));
            config.setZookeeperServers("127.0.0.1:" + this.ZOOKEEPER_PORT);
            config.setBrokerServicePort(Optional.ofNullable(this.brokerNativeBrokerPorts[i]));
            config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
            config.setAdvertisedAddress("localhost" + i);
            config.setLoadBalancerEnabled(false);
            this.pulsarServices[i] = new PulsarService(config);
            this.pulsarServices[i].start();
            this.brokerUrls[i] = new URL("http://127.0.0.1:" + this.brokerWebServicePorts[i]);
            this.lookupAddresses[i] = this.pulsarServices[i].getAdvertisedAddress() + ":" + config.getWebServicePort().get();
            this.pulsarAdmins[i] = PulsarAdmin.builder().serviceHttpUrl(this.brokerUrls[i].toString()).build();
        }
        this.createNamespacePolicies(this.pulsarServices[0]);
        Thread.sleep(100L);
    }

    @AfterMethod
    void shutdown() throws Exception {
        log.info("--- Shutting down ---");
        this.executor.shutdown();
        for (int i = 0; i < 5; ++i) {
            this.pulsarAdmins[i].close();
            this.pulsarServices[i].close();
        }
        this.bkEnsemble.stop();
    }

    private LeaderBroker loopUntilLeaderChanges(LeaderElectionService les, LeaderBroker oldLeader, LeaderBroker newLeader) throws InterruptedException {
        int loopCount;
        for (loopCount = 0; loopCount < 10; ++loopCount) {
            Thread.sleep(1000L);
            newLeader = les.getCurrentLeader();
            if (!newLeader.equals((Object)oldLeader)) break;
        }
        Assert.assertNotEquals((Object)loopCount, (Object)10, (String)"Leader is not changed even after maximum retries.");
        return newLeader;
    }

    @Test
    public void testLoadReportsWrittenOnZK() throws Exception {
        ZooKeeper zkc = this.bkEnsemble.getZkClient();
        try {
            for (int i = 0; i < 5; ++i) {
                String znodePath = String.format("%s/%s", "/loadbalance/brokers", this.lookupAddresses[i]);
                byte[] loadReportData = zkc.getData(znodePath, false, null);
                assert (loadReportData.length > 0);
                log.info("LoadReport {}, {}", (Object)this.lookupAddresses[i], (Object)new String(loadReportData));
                LoadReport loadReport = (LoadReport)ObjectMapperFactory.getThreadLocal().readValue(loadReportData, LoadReport.class);
                assert (loadReport.getName().equals(this.lookupAddresses[i]));
                Field ranking = ((SimpleLoadManagerImpl)this.pulsarServices[i].getLoadManager().get()).getClass().getDeclaredField("sortedRankings");
                ranking.setAccessible(true);
                AtomicReference sortedRanking = (AtomicReference)ranking.get(this.pulsarServices[i].getLoadManager().get());
                this.printSortedRanking(sortedRanking);
                int brokerCount = 0;
                for (Map.Entry entry : ((Map)sortedRanking.get()).entrySet()) {
                    brokerCount += ((Set)entry.getValue()).size();
                }
                Assert.assertEquals((int)brokerCount, (int)5);
                TopicName topicName = TopicName.get((String)"persistent://pulsar/use/primary-ns/test-topic");
                ResourceUnit found = (ResourceUnit)((LoadManager)this.pulsarServices[i].getLoadManager().get()).getLeastLoaded((ServiceUnitId)this.pulsarServices[i].getNamespaceService().getBundle(topicName)).get();
                Assert.assertNotNull((Object)found);
            }
        }
        catch (InterruptedException | KeeperException e) {
            Assert.fail((String)"Unable to read the data from Zookeeper - [{}]", (Throwable)e);
        }
    }

    @Test
    public void testUpdateLoadReportAndCheckUpdatedRanking() throws Exception {
        for (int i = 0; i < 5; ++i) {
            LoadReport lr = new LoadReport();
            lr.setName(this.lookupAddresses[i]);
            SystemResourceUsage sru = new SystemResourceUsage();
            sru.setBandwidthIn(new ResourceUsage(256.0, 1024000.0));
            sru.setBandwidthOut(new ResourceUsage(250.0, 1024000.0));
            sru.setMemory(new ResourceUsage(1024.0, 8192.0));
            sru.setCpu(new ResourceUsage(5.0, 400.0));
            lr.setSystemResourceUsage(sru);
            String znodePath = String.format("%s/%s", "/loadbalance/brokers", this.lookupAddresses[i]);
            String loadReportJson = ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)lr);
            this.bkEnsemble.getZkClient().setData(znodePath, loadReportJson.getBytes(Charsets.UTF_8), -1);
        }
        Thread.sleep(5000L);
        int totalNamespaces = 200;
        HashMap<String, Integer> namespaceOwner = new HashMap<String, Integer>();
        for (int i = 0; i < totalNamespaces; ++i) {
            TopicName topicName = TopicName.get((String)("persistent://pulsar/use/primary-ns-" + i + "/test-topic"));
            ResourceUnit found = (ResourceUnit)((LoadManager)this.pulsarServices[0].getLoadManager().get()).getLeastLoaded((ServiceUnitId)this.pulsarServices[0].getNamespaceService().getBundle(topicName)).get();
            if (namespaceOwner.containsKey(found.getResourceId())) {
                namespaceOwner.put(found.getResourceId(), (Integer)namespaceOwner.get(found.getResourceId()) + 1);
                continue;
            }
            namespaceOwner.put(found.getResourceId(), 1);
        }
        int averageNamespaces = totalNamespaces / 5;
        int tenPercentOfAverageNamespaces = averageNamespaces / 10;
        int lowerBound = averageNamespaces - tenPercentOfAverageNamespaces;
        int upperBound = averageNamespaces + tenPercentOfAverageNamespaces;
        for (Map.Entry broker : namespaceOwner.entrySet()) {
            log.info("Count of bundles assigned: {}, {}", broker.getKey(), broker.getValue());
            Assert.assertTrue(((Integer)broker.getValue() >= lowerBound && (Integer)broker.getValue() <= upperBound ? 1 : 0) != 0);
        }
    }

    private AtomicReference<Map<Long, Set<ResourceUnit>>> getSortedRanking(PulsarService pulsar) throws NoSuchFieldException, IllegalAccessException {
        Field ranking = ((SimpleLoadManagerImpl)pulsar.getLoadManager().get()).getClass().getDeclaredField("sortedRankings");
        ranking.setAccessible(true);
        AtomicReference sortedRanking = (AtomicReference)ranking.get(pulsar.getLoadManager().get());
        return sortedRanking;
    }

    private void printSortedRanking(AtomicReference<Map<Long, Set<ResourceUnit>>> sortedRanking) {
        log.info("Sorted Ranking Result:");
        sortedRanking.get().forEach((score, rus) -> {
            for (ResourceUnit ru : rus) {
                log.info("  - {}, {}", (Object)ru.getResourceId(), score);
            }
        });
    }

    @Test
    public void testBrokerRanking() throws Exception {
        int i;
        for (i = 0; i < 5; ++i) {
            LoadReport lr = new LoadReport();
            lr.setName(this.lookupAddresses[i]);
            SystemResourceUsage sru = new SystemResourceUsage();
            sru.setBandwidthIn(new ResourceUsage(0.0, 1024000.0));
            sru.setBandwidthOut(new ResourceUsage(0.0, 1024000.0));
            sru.setMemory(new ResourceUsage(1024.0, (double)(2048 * (i + 1))));
            sru.setCpu(new ResourceUsage(60.0, 400.0));
            lr.setSystemResourceUsage(sru);
            String znodePath = String.format("%s/%s", "/loadbalance/brokers", this.lookupAddresses[i]);
            String loadReportJson = ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)lr);
            this.bkEnsemble.getZkClient().setData(znodePath, loadReportJson.getBytes(Charsets.UTF_8), -1);
        }
        Thread.sleep(5000L);
        for (i = 0; i < 5; ++i) {
            AtomicReference<Map<Long, Set<ResourceUnit>>> sortedRanking = this.getSortedRanking(this.pulsarServices[i]);
            this.printSortedRanking(sortedRanking);
            Assert.assertEquals((int)sortedRanking.get().size(), (int)4);
            Assert.assertEquals((int)sortedRanking.get().get(50L).size(), (int)1);
            Assert.assertEquals((int)sortedRanking.get().get(25L).size(), (int)1);
            Assert.assertEquals((int)sortedRanking.get().get(16L).size(), (int)1);
            Assert.assertEquals((int)sortedRanking.get().get(15L).size(), (int)2);
        }
    }

    @Test
    public void testTopicAssignmentWithExistingBundles() throws Exception {
        int i;
        for (i = 0; i < 5; ++i) {
            ResourceQuota defaultQuota = new ResourceQuota();
            defaultQuota.setMsgRateIn(20.0);
            defaultQuota.setMsgRateOut(60.0);
            defaultQuota.setBandwidthIn(20000.0);
            defaultQuota.setBandwidthOut(60000.0);
            defaultQuota.setMemory(87.0);
            this.pulsarServices[i].getLocalZkCacheService().getResourceQuotaCache().setDefaultQuota(defaultQuota);
            LoadReport lr = new LoadReport();
            lr.setName(this.lookupAddresses[i]);
            SystemResourceUsage sru = new SystemResourceUsage();
            sru.setBandwidthIn(new ResourceUsage(0.0, 1024000.0));
            sru.setBandwidthOut(new ResourceUsage(0.0, 1024000.0));
            sru.setMemory(new ResourceUsage(0.0, (double)(2048 * (i + 1))));
            sru.setCpu(new ResourceUsage(60.0, 400.0));
            lr.setSystemResourceUsage(sru);
            HashMap<String, NamespaceBundleStats> bundleStats = new HashMap<String, NamespaceBundleStats>();
            for (int j = 0; j < (i + 1) * 5; ++j) {
                String bundleName = String.format("pulsar/use/primary-ns-%d-%d/0x00000000_0xffffffff", i, j);
                NamespaceBundleStats stats = new NamespaceBundleStats();
                bundleStats.put(bundleName, stats);
            }
            lr.setBundleStats(bundleStats);
            String znodePath = String.format("%s/%s", "/loadbalance/brokers", this.lookupAddresses[i]);
            String loadReportJson = ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)lr);
            this.bkEnsemble.getZkClient().setData(znodePath, loadReportJson.getBytes(Charsets.UTF_8), -1);
        }
        Thread.sleep(5000L);
        for (i = 0; i < 5; ++i) {
            AtomicReference<Map<Long, Set<ResourceUnit>>> sortedRanking = this.getSortedRanking(this.pulsarServices[i]);
            this.printSortedRanking(sortedRanking);
        }
        int totalNamespaces = 250;
        int[] expectedAssignments = new int[]{17, 34, 51, 68, 85};
        HashMap<String, Integer> namespaceOwner = new HashMap<String, Integer>();
        for (int i2 = 0; i2 < totalNamespaces; ++i2) {
            TopicName topicName = TopicName.get((String)("persistent://pulsar/use/primary-ns-" + i2 + "/test-topic"));
            ResourceUnit found = (ResourceUnit)((LoadManager)this.pulsarServices[0].getLoadManager().get()).getLeastLoaded((ServiceUnitId)this.pulsarServices[0].getNamespaceService().getBundle(topicName)).get();
            if (namespaceOwner.containsKey(found.getResourceId())) {
                namespaceOwner.put(found.getResourceId(), (Integer)namespaceOwner.get(found.getResourceId()) + 1);
                continue;
            }
            namespaceOwner.put(found.getResourceId(), 1);
        }
        double expectedMaxVariation = 10.0;
        for (int i3 = 0; i3 < 5; ++i3) {
            long actualValue = 0L;
            String resourceId = "http://" + this.lookupAddresses[i3];
            if (namespaceOwner.containsKey(resourceId)) {
                actualValue = ((Integer)namespaceOwner.get(resourceId)).intValue();
            }
            long expectedValue = expectedAssignments[i3];
            double variation = (double)Math.abs(actualValue - expectedValue) * 100.0 / (double)expectedValue;
            log.info("Topic assignment - {}, actual: {}, expected baseline: {}, variation: {}/%", new Object[]{this.lookupAddresses[i3], actualValue, expectedValue, String.format("%.2f", variation)});
            Assert.assertTrue((variation < expectedMaxVariation ? 1 : 0) != 0);
        }
    }

    @Test
    public void testStop() throws Exception {
        SimpleLoadManagerImpl loadManager = (SimpleLoadManagerImpl)this.pulsarServices[0].getLoadManager().get();
        loadManager.stop();
        Field loadReportCacheField = SimpleLoadManagerImpl.class.getDeclaredField("loadReportCacheZk");
        loadReportCacheField.setAccessible(true);
        ZooKeeperDataCache loadReportCache = (ZooKeeperDataCache)loadReportCacheField.get(loadManager);
        Field IS_SHUTDOWN_UPDATER = ZooKeeperDataCache.class.getDeclaredField("IS_SHUTDOWN_UPDATER");
        IS_SHUTDOWN_UPDATER.setAccessible(true);
        boolean TRUE = true;
        assert (((AtomicIntegerFieldUpdater)IS_SHUTDOWN_UPDATER.get(loadReportCache)).get(loadReportCache) == 1);
    }

    private AtomicReference<Map<String, ResourceQuota>> getRealtimeResourceQuota(PulsarService pulsar) throws NoSuchFieldException, IllegalAccessException {
        Field quotasField = ((SimpleLoadManagerImpl)pulsar.getLoadManager().get()).getClass().getDeclaredField("realtimeResourceQuotas");
        quotasField.setAccessible(true);
        AtomicReference realtimeResourceQuotas = (AtomicReference)quotasField.get(pulsar.getLoadManager().get());
        return realtimeResourceQuotas;
    }

    private void printResourceQuotas(Map<String, ResourceQuota> resourceQuotas) throws Exception {
        log.info("Realtime Resource Quota:");
        for (Map.Entry<String, ResourceQuota> entry : resourceQuotas.entrySet()) {
            String quotaStr = ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)entry.getValue());
            log.info(" {}, {}", (Object)entry.getKey(), (Object)quotaStr);
        }
    }

    private void writeLoadReportsForDynamicQuota(long timestamp) throws Exception {
        for (int i = 0; i < 5; ++i) {
            LoadReport lr = new LoadReport();
            lr.setName(this.lookupAddresses[i]);
            lr.setTimestamp(timestamp);
            SystemResourceUsage sru = new SystemResourceUsage();
            sru.setBandwidthIn(new ResourceUsage((double)(5000 * (10 + i * 5)), 1024000.0));
            sru.setBandwidthOut(new ResourceUsage((double)(15000 * (10 + i * 5)), 1024000.0));
            sru.setMemory(new ResourceUsage((double)(25 * (10 + i * 5)), (double)(2048 * (i + 1))));
            sru.setCpu(new ResourceUsage(200.0, 400.0));
            lr.setSystemResourceUsage(sru);
            HashMap<String, NamespaceBundleStats> bundleStats = new HashMap<String, NamespaceBundleStats>();
            for (int j = 0; j < 5; ++j) {
                String bundleName = String.format("pulsar/use/primary-ns-%d-%d/0x00000000_0xffffffff", i, j);
                NamespaceBundleStats stats = new NamespaceBundleStats();
                stats.msgRateIn = 5 * (i + j);
                stats.msgRateOut = 15 * (i + j);
                stats.msgThroughputIn = 5000 * (i + j);
                stats.msgThroughputOut = 15000 * (i + j);
                stats.topics = 25 * (i + j);
                stats.consumerCount = 50 * (i + j);
                stats.producerCount = 50 * (i + j);
                bundleStats.put(bundleName, stats);
            }
            lr.setBundleStats(bundleStats);
            String znodePath = String.format("%s/%s", "/loadbalance/brokers", this.lookupAddresses[i]);
            String loadReportJson = ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)lr);
            this.bkEnsemble.getZkClient().setData(znodePath, loadReportJson.getBytes(Charsets.UTF_8), -1);
        }
    }

    private void verifyBundleResourceQuota(ResourceQuota quota, double expMsgRateIn, double expMsgRateOut, double expBandwidthIn, double expBandwidthOut, double expMemory) {
        Assert.assertTrue((Math.abs(quota.getMsgRateIn() - expMsgRateIn) < 1.0 ? 1 : 0) != 0);
        Assert.assertTrue((Math.abs(quota.getMsgRateOut() - expMsgRateOut) < 1.0 ? 1 : 0) != 0);
        Assert.assertTrue((Math.abs(quota.getBandwidthIn() - expBandwidthIn) < 1.0 ? 1 : 0) != 0);
        Assert.assertTrue((Math.abs(quota.getBandwidthOut() - expBandwidthOut) < 1.0 ? 1 : 0) != 0);
        Assert.assertTrue((Math.abs(quota.getMemory() - expMemory) < 1.0 ? 1 : 0) != 0);
    }

    @Test
    public void testDynamicNamespaceBundleQuota() throws Exception {
        Map<String, ResourceQuota> quotas;
        int i;
        long startTime = System.currentTimeMillis();
        for (i = 0; i < 5; ++i) {
            ResourceQuota defaultQuota = new ResourceQuota();
            defaultQuota.setMsgRateIn(20.0);
            defaultQuota.setMsgRateOut(60.0);
            defaultQuota.setBandwidthIn(20000.0);
            defaultQuota.setBandwidthOut(60000.0);
            defaultQuota.setMemory(75.0);
            this.pulsarServices[i].getLocalZkCacheService().getResourceQuotaCache().setDefaultQuota(defaultQuota);
        }
        this.writeLoadReportsForDynamicQuota(startTime);
        Thread.sleep(5000L);
        this.writeLoadReportsForDynamicQuota(startTime + SimpleLoadManagerImpl.RESOURCE_QUOTA_GO_UP_TIMEWINDOW);
        Thread.sleep(5000L);
        for (i = 0; i < 5; ++i) {
            quotas = this.getRealtimeResourceQuota(this.pulsarServices[i]).get();
            this.printResourceQuotas(quotas);
            this.verifyBundleResourceQuota(quotas.get("pulsar/use/primary-ns-0-0/0x00000000_0xffffffff"), 19.0, 58.0, 19791.0, 58958.0, 74.0);
            this.verifyBundleResourceQuota(quotas.get("pulsar/use/primary-ns-2-2/0x00000000_0xffffffff"), 20.0, 60.0, 20000.0, 60000.0, 100.0);
            this.verifyBundleResourceQuota(quotas.get("pulsar/use/primary-ns-4-4/0x00000000_0xffffffff"), 40.0, 120.0, 40000.0, 120000.0, 150.0);
        }
        this.writeLoadReportsForDynamicQuota(startTime + SimpleLoadManagerImpl.RESOURCE_QUOTA_GO_DOWN_TIMEWINDOW);
        Thread.sleep(5000L);
        for (i = 0; i < 5; ++i) {
            quotas = this.getRealtimeResourceQuota(this.pulsarServices[i]).get();
            this.printResourceQuotas(quotas);
            this.verifyBundleResourceQuota(quotas.get("pulsar/use/primary-ns-0-0/0x00000000_0xffffffff"), 5.0, 6.0, 10203.0, 11019.0, 50.0);
            this.verifyBundleResourceQuota(quotas.get("pulsar/use/primary-ns-2-2/0x00000000_0xffffffff"), 20.0, 60.0, 20000.0, 60000.0, 100.0);
            this.verifyBundleResourceQuota(quotas.get("pulsar/use/primary-ns-4-4/0x00000000_0xffffffff"), 40.0, 120.0, 40000.0, 120000.0, 150.0);
        }
    }

    private void setObjectField(Class objClass, Object objInstance, String fieldName, Object newValue) throws Exception {
        Field field = objClass.getDeclaredField(fieldName);
        field.setAccessible(true);
        field.set(objInstance, newValue);
    }

    private NamespaceBundleStats newBundleStats(long topics, int producers, int consumers, double msgRateIn, double msgRateOut, double throughputIn, double throughputOut) {
        NamespaceBundleStats stats = new NamespaceBundleStats();
        stats.topics = topics;
        stats.producerCount = producers;
        stats.consumerCount = consumers;
        stats.msgRateIn = msgRateIn;
        stats.msgRateOut = msgRateOut;
        stats.msgThroughputIn = throughputIn;
        stats.msgThroughputOut = throughputOut;
        return stats;
    }

    private BundlesData getBundles(int numBundles) {
        Long maxVal = 0x100000000L;
        Long segSize = maxVal / (long)numBundles;
        ArrayList partitions = Lists.newArrayList();
        partitions.add(String.format("0x%08x", 0L));
        Long curPartition = segSize;
        for (int i = 0; i < numBundles; ++i) {
            if (i != numBundles - 1) {
                partitions.add(String.format("0x%08x", curPartition));
            } else {
                partitions.add(String.format("0x%08x", maxVal - 1L));
            }
            curPartition = curPartition + segSize;
        }
        return new BundlesData((List)partitions);
    }

    private void createNamespace(PulsarService pulsar, String namespace, int numBundles) throws Exception {
        Policies policies = new Policies();
        policies.bundles = this.getBundles(numBundles);
        ObjectMapper jsonMapper = ObjectMapperFactory.create();
        ZooKeeper globalZk = pulsar.getGlobalZkCache().getZooKeeper();
        String zpath = AdminResource.path((String[])new String[]{"policies", namespace});
        ZkUtils.createFullPathOptimistic((ZooKeeper)globalZk, (String)zpath, (byte[])jsonMapper.writeValueAsBytes((Object)policies), (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, (CreateMode)CreateMode.PERSISTENT);
    }

    @Test
    public void testNamespaceBundleAutoSplit() throws Exception {
        int maxBundles = this.pulsarServices[0].getConfiguration().getLoadBalancerNamespaceMaximumBundles();
        long maxTopics = this.pulsarServices[0].getConfiguration().getLoadBalancerNamespaceBundleMaxTopics();
        int maxSessions = this.pulsarServices[0].getConfiguration().getLoadBalancerNamespaceBundleMaxSessions();
        long maxMsgRate = this.pulsarServices[0].getConfiguration().getLoadBalancerNamespaceBundleMaxMsgRate();
        long maxBandwidth = this.pulsarServices[0].getConfiguration().getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * 0x100000;
        this.pulsarServices[0].getConfiguration().setLoadBalancerAutoBundleSplitEnabled(true);
        for (int i = 1; i <= 10; ++i) {
            int numBundles = i == 10 ? maxBundles : 2;
            this.createNamespace(this.pulsarServices[0], String.format("pulsar/use/primary-ns-%02d", i), numBundles);
        }
        NamespacesImpl namespaceAdmin = (NamespacesImpl)Mockito.mock(NamespacesImpl.class);
        this.setObjectField(PulsarAdmin.class, this.pulsarServices[0].getAdminClient(), "namespaces", namespaceAdmin);
        LoadReport lr = new LoadReport();
        lr.setName(this.lookupAddresses[0]);
        lr.setSystemResourceUsage(new SystemResourceUsage());
        HashMap<String, NamespaceBundleStats> bundleStats = new HashMap<String, NamespaceBundleStats>();
        bundleStats.put("pulsar/use/primary-ns-01/0x00000000_0x80000000", this.newBundleStats(maxTopics + 1L, 0, 0, 0.0, 0.0, 0.0, 0.0));
        bundleStats.put("pulsar/use/primary-ns-02/0x00000000_0x80000000", this.newBundleStats(2L, maxSessions + 1, 0, 0.0, 0.0, 0.0, 0.0));
        bundleStats.put("pulsar/use/primary-ns-03/0x00000000_0x80000000", this.newBundleStats(2L, 0, maxSessions + 1, 0.0, 0.0, 0.0, 0.0));
        bundleStats.put("pulsar/use/primary-ns-04/0x00000000_0x80000000", this.newBundleStats(2L, 0, 0, maxMsgRate + 1L, 0.0, 0.0, 0.0));
        bundleStats.put("pulsar/use/primary-ns-05/0x00000000_0x80000000", this.newBundleStats(2L, 0, 0, 0.0, maxMsgRate + 1L, 0.0, 0.0));
        bundleStats.put("pulsar/use/primary-ns-06/0x00000000_0x80000000", this.newBundleStats(2L, 0, 0, 0.0, 0.0, maxBandwidth + 1L, 0.0));
        bundleStats.put("pulsar/use/primary-ns-07/0x00000000_0x80000000", this.newBundleStats(2L, 0, 0, 0.0, 0.0, 0.0, maxBandwidth + 1L));
        bundleStats.put("pulsar/use/primary-ns-08/0x00000000_0x80000000", this.newBundleStats(maxTopics - 1L, maxSessions - 1, 1, maxMsgRate - 1L, 1.0, maxBandwidth - 1L, 1.0));
        bundleStats.put("pulsar/use/primary-ns-09/0x00000000_0x80000000", this.newBundleStats(1L, 0, 0, 0.0, 0.0, 0.0, maxBandwidth + 1L));
        bundleStats.put("pulsar/use/primary-ns-10/0x00000000_0x02000000", this.newBundleStats(maxTopics + 1L, 0, 0, 0.0, 0.0, 0.0, 0.0));
        lr.setBundleStats(bundleStats);
        this.setObjectField(SimpleLoadManagerImpl.class, this.pulsarServices[0].getLoadManager().get(), "lastLoadReport", lr);
        String znodePath = String.format("%s/%s", "/loadbalance/brokers", this.lookupAddresses[0]);
        String loadReportJson = ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)lr);
        this.bkEnsemble.getZkClient().setData(znodePath, loadReportJson.getBytes(Charsets.UTF_8), -1);
        Thread.sleep(5000L);
        ((LoadManager)this.pulsarServices[0].getLoadManager().get()).doNamespaceBundleSplit();
        boolean isAutoUnooadSplitBundleEnabled = this.pulsarServices[0].getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled();
        ((NamespacesImpl)Mockito.verify((Object)namespaceAdmin, (VerificationMode)Mockito.times((int)1))).splitNamespaceBundle("pulsar/use/primary-ns-01", "0x00000000_0x80000000", isAutoUnooadSplitBundleEnabled);
        ((NamespacesImpl)Mockito.verify((Object)namespaceAdmin, (VerificationMode)Mockito.times((int)1))).splitNamespaceBundle("pulsar/use/primary-ns-02", "0x00000000_0x80000000", isAutoUnooadSplitBundleEnabled);
        ((NamespacesImpl)Mockito.verify((Object)namespaceAdmin, (VerificationMode)Mockito.times((int)1))).splitNamespaceBundle("pulsar/use/primary-ns-03", "0x00000000_0x80000000", isAutoUnooadSplitBundleEnabled);
        ((NamespacesImpl)Mockito.verify((Object)namespaceAdmin, (VerificationMode)Mockito.times((int)1))).splitNamespaceBundle("pulsar/use/primary-ns-04", "0x00000000_0x80000000", isAutoUnooadSplitBundleEnabled);
        ((NamespacesImpl)Mockito.verify((Object)namespaceAdmin, (VerificationMode)Mockito.times((int)1))).splitNamespaceBundle("pulsar/use/primary-ns-05", "0x00000000_0x80000000", isAutoUnooadSplitBundleEnabled);
        ((NamespacesImpl)Mockito.verify((Object)namespaceAdmin, (VerificationMode)Mockito.times((int)1))).splitNamespaceBundle("pulsar/use/primary-ns-06", "0x00000000_0x80000000", isAutoUnooadSplitBundleEnabled);
        ((NamespacesImpl)Mockito.verify((Object)namespaceAdmin, (VerificationMode)Mockito.times((int)1))).splitNamespaceBundle("pulsar/use/primary-ns-07", "0x00000000_0x80000000", isAutoUnooadSplitBundleEnabled);
        ((NamespacesImpl)Mockito.verify((Object)namespaceAdmin, (VerificationMode)Mockito.never())).splitNamespaceBundle("pulsar/use/primary-ns-08", "0x00000000_0x80000000", isAutoUnooadSplitBundleEnabled);
        ((NamespacesImpl)Mockito.verify((Object)namespaceAdmin, (VerificationMode)Mockito.never())).splitNamespaceBundle("pulsar/use/primary-ns-09", "0x00000000_0x80000000", isAutoUnooadSplitBundleEnabled);
        ((NamespacesImpl)Mockito.verify((Object)namespaceAdmin, (VerificationMode)Mockito.never())).splitNamespaceBundle("pulsar/use/primary-ns-10", "0x00000000_0x02000000", isAutoUnooadSplitBundleEnabled);
    }

    @Test
    public void testLeaderElection() throws Exception {
        for (int i = 0; i < 4; ++i) {
            HashSet<PulsarService> activePulsar = new HashSet<PulsarService>();
            LeaderBroker oldLeader = null;
            PulsarService leaderPulsar = null;
            PulsarService followerPulsar = null;
            for (int j = 0; j < 5; ++j) {
                if (this.pulsarServices[j].getState() == PulsarService.State.Closed) continue;
                activePulsar.add(this.pulsarServices[j]);
                LeaderElectionService les = this.pulsarServices[j].getLeaderElectionService();
                if (les.isLeader()) {
                    oldLeader = les.getCurrentLeader();
                    leaderPulsar = this.pulsarServices[j];
                    continue;
                }
                followerPulsar = this.pulsarServices[j];
            }
            log.info("Old leader is : {}", (Object)oldLeader.getServiceUrl());
            for (PulsarService pulsar : activePulsar) {
                Assert.assertEquals((Object)pulsar.getLeaderElectionService().getCurrentLeader(), oldLeader);
            }
            leaderPulsar.close();
            LeaderBroker newLeader = oldLeader;
            newLeader = this.loopUntilLeaderChanges(followerPulsar.getLeaderElectionService(), oldLeader, newLeader);
            log.info("New leader is : {}", (Object)newLeader.getServiceUrl());
            Assert.assertNotEquals((Object)newLeader, (Object)oldLeader);
        }
    }

    private void createNamespacePolicies(PulsarService pulsar) throws Exception {
        int i;
        NamespaceIsolationPolicies policies = new NamespaceIsolationPolicies();
        NamespaceIsolationData policyData = new NamespaceIsolationData();
        policyData.namespaces = new ArrayList();
        policyData.namespaces.add("pulsar/use/primary-ns.*");
        policyData.primary = new ArrayList();
        for (i = 0; i < 5; ++i) {
            policyData.primary.add(this.pulsarServices[i].getAdvertisedAddress());
        }
        policyData.secondary = new ArrayList();
        policyData.auto_failover_policy = new AutoFailoverPolicyData();
        policyData.auto_failover_policy.policy_type = AutoFailoverPolicyType.min_available;
        policyData.auto_failover_policy.parameters = new HashMap();
        policyData.auto_failover_policy.parameters.put("min_limit", "1");
        policyData.auto_failover_policy.parameters.put("usage_threshold", "100");
        policies.setPolicy("primaryBrokerPolicy", policyData);
        policyData = new NamespaceIsolationData();
        policyData.namespaces = new ArrayList();
        policyData.namespaces.add("pulsar/use/secondary-ns.*");
        policyData.primary = new ArrayList();
        policyData.primary.add(this.pulsarServices[0].getAdvertisedAddress());
        policyData.secondary = new ArrayList();
        for (i = 1; i < 5; ++i) {
            policyData.secondary.add(this.pulsarServices[i].getAdvertisedAddress());
        }
        policyData.auto_failover_policy = new AutoFailoverPolicyData();
        policyData.auto_failover_policy.policy_type = AutoFailoverPolicyType.min_available;
        policyData.auto_failover_policy.parameters = new HashMap();
        policyData.auto_failover_policy.parameters.put("min_limit", "1");
        policyData.auto_failover_policy.parameters.put("usage_threshold", "100");
        policies.setPolicy("secondaryBrokerPolicy", policyData);
        policyData = new NamespaceIsolationData();
        policyData.namespaces = new ArrayList();
        policyData.namespaces.add("pulsar/use/shared-ns.*");
        policyData.primary = new ArrayList();
        policyData.primary.add(this.pulsarServices[0].getAdvertisedAddress());
        policyData.secondary = new ArrayList();
        for (i = 1; i < 5; ++i) {
            policyData.secondary.add(this.pulsarServices[i].getAdvertisedAddress());
        }
        policyData.auto_failover_policy = new AutoFailoverPolicyData();
        policyData.auto_failover_policy.policy_type = AutoFailoverPolicyType.min_available;
        policyData.auto_failover_policy.parameters = new HashMap();
        policyData.auto_failover_policy.parameters.put("min_limit", "1");
        policyData.auto_failover_policy.parameters.put("usage_threshold", "100");
        policies.setPolicy("otherBrokerPolicy", policyData);
        ObjectMapper jsonMapper = ObjectMapperFactory.create();
        ZooKeeper globalZk = pulsar.getGlobalZkCache().getZooKeeper();
        ZkUtils.createFullPathOptimistic((ZooKeeper)globalZk, (String)AdminResource.path((String[])new String[]{"clusters", "use", "namespaceIsolationPolicies"}), (byte[])new byte[0], (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, (CreateMode)CreateMode.PERSISTENT);
        byte[] content = jsonMapper.writeValueAsBytes((Object)policies.getPolicies());
        globalZk.setData(AdminResource.path((String[])new String[]{"clusters", "use", "namespaceIsolationPolicies"}), content, -1);
    }

    @Test(enabled=false)
    public void testGetLeastLoadedBasic() throws Exception {
        LocalZooKeeperCache mockCache = (LocalZooKeeperCache)Mockito.mock(LocalZooKeeperCache.class);
        HashSet activeBrokers = Sets.newHashSet((Object[])new String[]{"prod1-broker1.messaging.use.example.com:8080", "prod1-broker2.messaging.use.example.com:8080", "prod1-broker3.messaging.use.example.com:8080"});
        Mockito.when((Object)mockCache.getChildren("/loadbalance/brokers")).thenReturn((Object)activeBrokers);
        Field zkCacheField = PulsarService.class.getDeclaredField("localZkCache");
        zkCacheField.setAccessible(true);
        LocalZooKeeperCache originalLZK1 = (LocalZooKeeperCache)zkCacheField.get(this.pulsarServices[0]);
        LocalZooKeeperCache originalLZK2 = (LocalZooKeeperCache)zkCacheField.get(this.pulsarServices[1]);
        zkCacheField.set(this.pulsarServices[0], mockCache);
        zkCacheField.set(this.pulsarServices[1], mockCache);
        SimpleLoadManagerImpl loadManager = new SimpleLoadManagerImpl(this.pulsarServices[0]);
        PulsarResourceDescription rd = new PulsarResourceDescription();
        rd.put("memory", new ResourceUsage(1024.0, 4096.0));
        rd.put("cpu", new ResourceUsage(10.0, 100.0));
        rd.put("bandwidthIn", new ResourceUsage(256000.0, 1048576.0));
        rd.put("bandwidthOut", new ResourceUsage(563200.0, 1048576.0));
        SimpleResourceUnit ru1 = new SimpleResourceUnit("http://prod1-broker1.messaging.use.example.com:8080", (ResourceDescription)rd);
        HashSet<SimpleResourceUnit> rus = new HashSet<SimpleResourceUnit>();
        rus.add(ru1);
        ResourceAvailabilityRanker lr = new ResourceAvailabilityRanker();
        AtomicReference<TreeMap> sortedRankingsInstance = new AtomicReference<TreeMap>(Maps.newTreeMap());
        ((Map)sortedRankingsInstance.get()).put(lr.getRank((ResourceDescription)rd), rus);
        Field sortedRankings = SimpleLoadManagerImpl.class.getDeclaredField("sortedRankings");
        sortedRankings.setAccessible(true);
        sortedRankings.set(loadManager, sortedRankingsInstance);
        ResourceUnit found = (ResourceUnit)loadManager.getLeastLoaded((ServiceUnitId)NamespaceName.get((String)"pulsar/use/primary-ns.10")).get();
        Assert.assertEquals((String)"http://prod1-broker1.messaging.use.example.com:8080", (String)found.getResourceId());
        zkCacheField.set(this.pulsarServices[0], originalLZK1);
        zkCacheField.set(this.pulsarServices[1], originalLZK2);
    }

    private PulsarResourceDescription createResourceDescription(long memoryInMB, long cpuPercentage, long bandwidthInMbps, long bandwidthOutInMbps, long threads) {
        long KB = 1024L;
        long MB = 1024L * KB;
        long GB = 1024L * MB;
        PulsarResourceDescription rd = new PulsarResourceDescription();
        rd.put("memory", new ResourceUsage((double)memoryInMB, (double)(4L * GB)));
        rd.put("cpu", new ResourceUsage((double)cpuPercentage, 100.0));
        rd.put("bandwidthIn", new ResourceUsage((double)(bandwidthInMbps * MB), (double)GB));
        rd.put("bandwidthOut", new ResourceUsage((double)(bandwidthOutInMbps * MB), (double)GB));
        return rd;
    }

    @Test(enabled=false)
    public void testLoadbalanceDistributionAmongEquallyLoaded() throws Exception {
        SimpleLoadManagerImpl loadManager = new SimpleLoadManagerImpl(this.pulsarServices[0]);
        ZooKeeperCache mockCache = (ZooKeeperCache)Mockito.mock(ZooKeeperCache.class);
        HashSet activeBrokers = Sets.newHashSet((Object[])new String[]{"prod1-broker1.messaging.use.example.com:8080", "prod1-broker2.messaging.use.example.com:8080", "prod1-broker3.messaging.use.example.com:8080"});
        Mockito.when((Object)mockCache.getChildren("/loadbalance/brokers")).thenReturn((Object)activeBrokers);
        Mockito.when((Object)mockCache.getChildren("/loadbalance/brokers")).thenReturn((Object)activeBrokers);
        Field zkCacheField = PulsarService.class.getDeclaredField("localZkCache");
        zkCacheField.setAccessible(true);
        zkCacheField.set(this.pulsarServices[0], mockCache);
        long memoryMB = 4096L;
        long cpuPercent = 45L;
        long bInMbps = 350L;
        long bOutMbps = 180L;
        long threads = 10L;
        PulsarResourceDescription rd = this.createResourceDescription(memoryMB, cpuPercent, bInMbps, bOutMbps, threads);
        HashSet<SimpleResourceUnit> rus = new HashSet<SimpleResourceUnit>();
        for (String broker : activeBrokers) {
            SimpleResourceUnit ru = new SimpleResourceUnit(broker, (ResourceDescription)rd);
            rus.add(ru);
        }
        TreeMap<Long, HashSet<SimpleResourceUnit>> sortedRankingsInstance = new TreeMap<Long, HashSet<SimpleResourceUnit>>();
        ResourceAvailabilityRanker ranker = new ResourceAvailabilityRanker();
        sortedRankingsInstance.put(ranker.getRank((ResourceDescription)rd), rus);
        Field sortedRankings = SimpleLoadManagerImpl.class.getDeclaredField("sortedRankings");
        sortedRankings.setAccessible(true);
        AtomicReference<TreeMap<Long, HashSet<SimpleResourceUnit>>> ar = new AtomicReference<TreeMap<Long, HashSet<SimpleResourceUnit>>>();
        ar.set(sortedRankingsInstance);
        sortedRankings.set(loadManager, ar);
    }

    @Test(enabled=false)
    void testLoadBalanceDiscardingInactiveBrokersInSelection() throws Exception {
        long memoryMB = 2096L;
        long cpuPercent = 12L;
        long bInMbps = 100L;
        long bOutMbps = 100L;
        long threads = 3L;
        SimpleLoadManagerImpl loadManager = new SimpleLoadManagerImpl(this.pulsarServices[0]);
        ZooKeeperCache mockCache = (ZooKeeperCache)Mockito.mock(ZooKeeperCache.class);
        HashSet activeBrokers = Sets.newHashSet((Object[])new String[]{"prod1-broker1.messaging.use.example.com:8080", "prod1-broker2.messaging.use.example.com:8080"});
        Mockito.when((Object)mockCache.getChildren("/loadbalance/brokers")).thenReturn((Object)activeBrokers);
        Field zkCacheField = PulsarService.class.getDeclaredField("localZkCache");
        zkCacheField.setAccessible(true);
        zkCacheField.set(this.pulsarServices[0], mockCache);
        TreeMap sortedRankingsInstance = new TreeMap();
        for (int i = 1; i <= 3; ++i) {
            PulsarResourceDescription rd = this.createResourceDescription(memoryMB * (long)i, cpuPercent * (long)i, bInMbps * (long)i, bOutMbps * 2L, threads * (long)i);
            SimpleResourceUnit ru1 = new SimpleResourceUnit(String.format("http://prod1-broker%d.messaging.use.example.com:8080", i), (ResourceDescription)rd);
            ResourceAvailabilityRanker ranker = new ResourceAvailabilityRanker();
            long rank = ranker.getRank((ResourceDescription)rd);
            if (sortedRankingsInstance.containsKey(rank)) {
                ((Set)sortedRankingsInstance.get(rank)).add(ru1);
                continue;
            }
            HashSet<SimpleResourceUnit> rus = new HashSet<SimpleResourceUnit>();
            rus.add(ru1);
            sortedRankingsInstance.put(rank, rus);
        }
        Field sortedRankings = SimpleLoadManagerImpl.class.getDeclaredField("sortedRankings");
        sortedRankings.setAccessible(true);
        AtomicReference ar = new AtomicReference();
        ar.set(sortedRankingsInstance);
        sortedRankings.set(loadManager, ar);
        int totalNamespaces = 10;
        HashMap<String, Integer> namespaceOwner = new HashMap<String, Integer>();
        for (int i = 0; i < totalNamespaces; ++i) {
            ResourceUnit found = (ResourceUnit)loadManager.getLeastLoaded((ServiceUnitId)TopicName.get((String)("persistent://pulsar/use/primary-ns/topic" + i))).get();
            if (namespaceOwner.containsKey(found.getResourceId())) {
                namespaceOwner.put(found.getResourceId(), (Integer)namespaceOwner.get(found.getResourceId()) + 1);
                continue;
            }
            namespaceOwner.put(found.getResourceId(), 0);
        }
        String inactiveBroker = "prod1-broker3.messaging.use.example.com:8080";
        Assert.assertEquals((int)namespaceOwner.size(), (int)2);
        Assert.assertFalse((boolean)namespaceOwner.containsKey(inactiveBroker));
    }

    @Test(enabled=false)
    void testLoadBalanceDistributionAmongUnequallyLoaded() throws Exception {
        long memoryMB = 4096L;
        long cpuPercent = 25L;
        long bInMbps = 256L;
        long bOutMbps = 256L;
        long threads = 25L;
        SimpleLoadManagerImpl loadManager = new SimpleLoadManagerImpl(this.pulsarServices[0]);
        ZooKeeperCache mockCache = (ZooKeeperCache)Mockito.mock(ZooKeeperCache.class);
        HashSet activeBrokers = Sets.newHashSet((Object[])new String[]{"prod1-broker1.messaging.use.example.com:8080", "prod1-broker2.messaging.use.example.com:8080", "prod1-broker3.messaging.use.example.com:8080"});
        Mockito.when((Object)mockCache.getChildren("/loadbalance/brokers")).thenReturn((Object)activeBrokers);
        Mockito.when((Object)mockCache.getChildren("/loadbalance/brokers")).thenReturn((Object)activeBrokers);
        Field zkCacheField = PulsarService.class.getDeclaredField("localZkCache");
        zkCacheField.setAccessible(true);
        zkCacheField.set(this.pulsarServices[0], mockCache);
        int totalAvailabilityWeight = 0;
        TreeMap sortedRankingsInstance = new TreeMap();
        for (int i = 1; i <= 3; ++i) {
            PulsarResourceDescription rd = this.createResourceDescription(memoryMB * (long)i, cpuPercent * (long)i, bInMbps * (long)i, bOutMbps * 2L, threads * (long)i);
            SimpleResourceUnit ru1 = new SimpleResourceUnit(String.format("http://prod1-broker%d.messaging.use.example.com:8080", i), (ResourceDescription)rd);
            ResourceAvailabilityRanker ranker = new ResourceAvailabilityRanker();
            long rank = ranker.getRank((ResourceDescription)rd);
            if (sortedRankingsInstance.containsKey(rank)) {
                ((Set)sortedRankingsInstance.get(rank)).add(ru1);
            } else {
                HashSet<SimpleResourceUnit> rus = new HashSet<SimpleResourceUnit>();
                rus.add(ru1);
                sortedRankingsInstance.put(rank, rus);
            }
            totalAvailabilityWeight = (int)((long)totalAvailabilityWeight + rank);
        }
        Field sortedRankings = SimpleLoadManagerImpl.class.getDeclaredField("sortedRankings");
        sortedRankings.setAccessible(true);
        AtomicReference ar = new AtomicReference();
        ar.set(sortedRankingsInstance);
        sortedRankings.set(loadManager, ar);
        int totalNamespaces = 1000;
        HashMap<String, Integer> namespaceOwner = new HashMap<String, Integer>();
        for (int i = 0; i < totalNamespaces; ++i) {
            ResourceUnit found = (ResourceUnit)loadManager.getLeastLoaded((ServiceUnitId)TopicName.get((String)("persistent://pulsar/use/primary-ns/topic-" + i))).get();
            if (namespaceOwner.containsKey(found.getResourceId())) {
                namespaceOwner.put(found.getResourceId(), (Integer)namespaceOwner.get(found.getResourceId()) + 1);
                continue;
            }
            namespaceOwner.put(found.getResourceId(), 0);
        }
        for (Map.Entry entry : sortedRankingsInstance.entrySet()) {
            int selectionProbability = (int)((double)((Long)entry.getKey()).longValue() / (double)totalAvailabilityWeight * 100.0);
            int idealExpectedOwned = selectionProbability * (int)((double)totalNamespaces / 100.0);
            int expectedOwnedLowerBound = idealExpectedOwned - idealExpectedOwned / 10;
            int expectedOwnedUpperBound = idealExpectedOwned + idealExpectedOwned / 10;
            for (ResourceUnit ru : (Set)entry.getValue()) {
                Assert.assertTrue((boolean)namespaceOwner.containsKey(ru.getResourceId()));
                int ownedNamespaces = (Integer)namespaceOwner.get(ru.getResourceId());
                Assert.assertTrue((ownedNamespaces > expectedOwnedLowerBound || ownedNamespaces < expectedOwnedUpperBound ? 1 : 0) != 0);
            }
        }
    }
}

