/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.loadbalance.impl;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.broker.BrokerData;
import org.apache.pulsar.broker.BundleData;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TimeAverageBrokerData;
import org.apache.pulsar.broker.TimeAverageMessageData;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.loadbalance.BrokerFilter;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
import org.apache.pulsar.broker.loadbalance.BundleSplitStrategy;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
import org.apache.pulsar.broker.loadbalance.ModularLoadManager;
import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
import org.apache.pulsar.broker.loadbalance.impl.BrokerVersionFilter;
import org.apache.pulsar.broker.loadbalance.impl.BundleSplitterTask;
import org.apache.pulsar.broker.loadbalance.impl.GenericBrokerHostUsageImpl;
import org.apache.pulsar.broker.loadbalance.impl.LinuxBrokerHostUsageImpl;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.OverloadShedder;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.policies.data.loadbalancer.JSONWritable;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperCacheListener;
import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ModularLoadManagerImpl
implements ModularLoadManager,
ZooKeeperCacheListener<LocalBrokerData> {
    private static final Logger log = LoggerFactory.getLogger(ModularLoadManagerImpl.class);
    public static final String BUNDLE_DATA_ZPATH = "/loadbalance/bundle-data";
    public static final double DEFAULT_MESSAGE_RATE = 50.0;
    public static final double DEFAULT_MESSAGE_THROUGHPUT = 50000.0;
    public static final int NUM_LONG_SAMPLES = 1000;
    public static final int NUM_SHORT_SAMPLES = 10;
    public static final String RESOURCE_QUOTA_ZPATH = "/loadbalance/resource-quota/namespace";
    public static final String TIME_AVERAGE_BROKER_ZPATH = "/loadbalance/broker-time-average";
    private ZooKeeperChildrenCache availableActiveBrokers;
    private final Set<String> brokerCandidateCache;
    private ZooKeeperDataCache<LocalBrokerData> brokerDataCache;
    private BrokerHostUsage brokerHostUsage;
    private final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange;
    private String brokerZnodePath;
    private BundleSplitStrategy bundleSplitStrategy;
    private ServiceConfiguration conf;
    private final NamespaceBundleStats defaultStats;
    private final List<BrokerFilter> filterPipeline;
    private long lastBundleDataUpdate;
    private LocalBrokerData lastData;
    private final List<LoadSheddingStrategy> loadSheddingPipeline;
    private LocalBrokerData localData;
    private final LoadData loadData;
    private final Map<String, String> preallocatedBundleToBroker;
    private ModularLoadManagerStrategy placementStrategy;
    private SimpleResourceAllocationPolicies policies;
    private PulsarService pulsar;
    private final ScheduledExecutorService scheduler;
    private ZooKeeper zkClient;
    private final LoadManagerShared.BrokerTopicLoadingPredicate brokerTopicLoadingPredicate;
    private Map<String, String> brokerToFailureDomainMap;
    private static final ZooKeeperCache.Deserializer<LocalBrokerData> loadReportDeserializer = (key, content) -> (LocalBrokerData)AdminResource.jsonMapper().readValue(content, LocalBrokerData.class);
    private AtomicReference<List<Metrics>> loadBalancingMetrics = new AtomicReference();
    private AtomicReference<List<Metrics>> bundleUnloadMetrics = new AtomicReference();
    private AtomicReference<List<Metrics>> bundleSplitMetrics = new AtomicReference();
    private long bundleSplitCount = 0L;
    private long unloadBrokerCount = 0L;
    private long unloadBundleCount = 0L;
    private static final Summary selectBrokerForAssignment = (Summary)Summary.build("pulsar_broker_load_manager_bundle_assigment", "-").quantile(0.5).quantile(0.99).quantile(0.999).quantile(1.0).register();

    public ModularLoadManagerImpl() {
        this.brokerCandidateCache = new HashSet<String>();
        this.brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap();
        this.defaultStats = new NamespaceBundleStats();
        this.filterPipeline = new ArrayList<BrokerFilter>();
        this.loadData = new LoadData();
        this.loadSheddingPipeline = new ArrayList<LoadSheddingStrategy>();
        this.preallocatedBundleToBroker = new ConcurrentHashMap<String, String>();
        this.scheduler = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new DefaultThreadFactory("pulsar-modular-load-manager"));
        this.brokerToFailureDomainMap = Maps.newHashMap();
        this.brokerTopicLoadingPredicate = new LoadManagerShared.BrokerTopicLoadingPredicate(){

            @Override
            public boolean isEnablePersistentTopics(String brokerUrl) {
                BrokerData brokerData = ModularLoadManagerImpl.this.loadData.getBrokerData().get(brokerUrl.replace("http://", ""));
                return brokerData != null && brokerData.getLocalData() != null && brokerData.getLocalData().isPersistentTopicsEnabled();
            }

            @Override
            public boolean isEnableNonPersistentTopics(String brokerUrl) {
                BrokerData brokerData = ModularLoadManagerImpl.this.loadData.getBrokerData().get(brokerUrl.replace("http://", ""));
                return brokerData != null && brokerData.getLocalData() != null && brokerData.getLocalData().isNonPersistentTopicsEnabled();
            }
        };
    }

    @Override
    public void initialize(PulsarService pulsar) {
        this.pulsar = pulsar;
        this.availableActiveBrokers = new ZooKeeperChildrenCache(pulsar.getLocalZkCache(), "/loadbalance/brokers");
        this.availableActiveBrokers.registerListener((ZooKeeperCacheListener)new ZooKeeperCacheListener<Set<String>>(){

            public void onUpdate(String path, Set<String> data, Stat stat) {
                if (log.isDebugEnabled()) {
                    log.debug("Update Received for path {}", (Object)path);
                }
                ModularLoadManagerImpl.this.reapDeadBrokerPreallocations(data);
                ModularLoadManagerImpl.this.scheduler.submit(ModularLoadManagerImpl.this::updateAll);
            }
        });
        this.brokerDataCache = new ZooKeeperDataCache<LocalBrokerData>(pulsar.getLocalZkCache()){

            public LocalBrokerData deserialize(String key, byte[] content) throws Exception {
                return (LocalBrokerData)ObjectMapperFactory.getThreadLocal().readValue(content, LocalBrokerData.class);
            }
        };
        this.brokerDataCache.registerListener((ZooKeeperCacheListener)this);
        this.brokerHostUsage = SystemUtils.IS_OS_LINUX ? new LinuxBrokerHostUsageImpl(pulsar) : new GenericBrokerHostUsageImpl(pulsar);
        this.bundleSplitStrategy = new BundleSplitterTask(pulsar);
        this.conf = pulsar.getConfiguration();
        this.defaultStats.msgThroughputIn = 50000.0;
        this.defaultStats.msgThroughputOut = 50000.0;
        this.defaultStats.msgRateIn = 50.0;
        this.defaultStats.msgRateOut = 50.0;
        this.placementStrategy = ModularLoadManagerStrategy.create(this.conf);
        this.policies = new SimpleResourceAllocationPolicies(pulsar);
        this.zkClient = pulsar.getZkClient();
        this.filterPipeline.add(new BrokerVersionFilter());
        this.refreshBrokerToFailureDomainMap();
        pulsar.getConfigurationCache().failureDomainListCache().registerListener((path, data, stat) -> this.scheduler.execute(() -> this.refreshBrokerToFailureDomainMap()));
        pulsar.getConfigurationCache().failureDomainCache().registerListener((path, data, stat) -> this.scheduler.execute(() -> this.refreshBrokerToFailureDomainMap()));
        this.loadSheddingPipeline.add(this.createLoadSheddingStrategy());
    }

    private LoadSheddingStrategy createLoadSheddingStrategy() {
        try {
            Class<?> loadSheddingClass = Class.forName(this.conf.getLoadBalancerLoadSheddingStrategy());
            Object loadSheddingInstance = loadSheddingClass.newInstance();
            if (loadSheddingInstance instanceof LoadSheddingStrategy) {
                return (LoadSheddingStrategy)loadSheddingInstance;
            }
            log.error("create load shedding strategy failed. using OverloadShedder instead.");
            return new OverloadShedder();
        }
        catch (Exception e) {
            log.error("Error when trying to create load shedding strategy: ", (Throwable)e);
            return new OverloadShedder();
        }
    }

    public ModularLoadManagerImpl(PulsarService pulsar) {
        this();
        this.initialize(pulsar);
    }

    private static void createZPathIfNotExists(ZooKeeper zkClient, String path) throws Exception {
        if (zkClient.exists(path, false) == null) {
            try {
                ZkUtils.createFullPathOptimistic((ZooKeeper)zkClient, (String)path, (byte[])new byte[0], (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, (CreateMode)CreateMode.PERSISTENT);
            }
            catch (KeeperException.NodeExistsException nodeExistsException) {
                // empty catch block
            }
        }
    }

    private void reapDeadBrokerPreallocations(Set<String> aliveBrokers) {
        for (String broker : this.loadData.getBrokerData().keySet()) {
            if (aliveBrokers.contains(broker)) continue;
            if (log.isDebugEnabled()) {
                log.debug("Broker {} appears to have stopped; now reclaiming any preallocations", (Object)broker);
            }
            Iterator<Map.Entry<String, String>> iterator = this.preallocatedBundleToBroker.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<String, String> entry = iterator.next();
                String preallocatedBundle = entry.getKey();
                String preallocatedBroker = entry.getValue();
                if (!broker.equals(preallocatedBroker)) continue;
                if (log.isDebugEnabled()) {
                    log.debug("Removing old preallocation on dead broker {} for bundle {}", (Object)preallocatedBroker, (Object)preallocatedBundle);
                }
                iterator.remove();
            }
        }
    }

    @Override
    public Set<String> getAvailableBrokers() {
        try {
            return this.availableActiveBrokers.get();
        }
        catch (Exception e) {
            log.warn("Error when trying to get active brokers", (Throwable)e);
            return this.loadData.getBrokerData().keySet();
        }
    }

    private BundleData getBundleDataOrDefault(String bundle) {
        BundleData bundleData = null;
        try {
            String bundleZPath = ModularLoadManagerImpl.getBundleDataZooKeeperPath(bundle);
            String quotaZPath = String.format("%s/%s", RESOURCE_QUOTA_ZPATH, bundle);
            if (this.zkClient.exists(bundleZPath, null) != null) {
                bundleData = ModularLoadManagerImpl.readJson(this.zkClient.getData(bundleZPath, null, null), BundleData.class);
            } else if (this.zkClient.exists(quotaZPath, null) != null) {
                ResourceQuota quota = ModularLoadManagerImpl.readJson(this.zkClient.getData(quotaZPath, null, null), ResourceQuota.class);
                bundleData = new BundleData(10, 1000);
                TimeAverageMessageData shortTermData = bundleData.getShortTermData();
                TimeAverageMessageData longTermData = bundleData.getLongTermData();
                shortTermData.setMsgRateIn(quota.getMsgRateIn());
                shortTermData.setMsgRateOut(quota.getMsgRateOut());
                shortTermData.setMsgThroughputIn(quota.getBandwidthIn());
                shortTermData.setMsgThroughputOut(quota.getBandwidthOut());
                longTermData.setMsgRateIn(quota.getMsgRateIn());
                longTermData.setMsgRateOut(quota.getMsgRateOut());
                longTermData.setMsgThroughputIn(quota.getBandwidthIn());
                longTermData.setMsgThroughputOut(quota.getBandwidthOut());
                shortTermData.setNumSamples(10);
                longTermData.setNumSamples(1000);
            }
        }
        catch (Exception e) {
            log.warn("Error when trying to find bundle {} on zookeeper: {}", (Object)bundle, (Object)e);
        }
        if (bundleData == null) {
            bundleData = new BundleData(10, 1000, this.defaultStats);
        }
        return bundleData;
    }

    private static String getBundleDataZooKeeperPath(String bundle) {
        return "/loadbalance/bundle-data/" + bundle;
    }

    private Map<String, NamespaceBundleStats> getBundleStats() {
        return this.pulsar.getBrokerService().getBundleStats();
    }

    private static <T> T readJson(byte[] data, Class<T> clazz) throws IOException {
        return (T)ObjectMapperFactory.getThreadLocal().readValue(data, clazz);
    }

    private double percentChange(double oldValue, double newValue) {
        if (oldValue == 0.0) {
            if (newValue == 0.0) {
                return 0.0;
            }
            return Double.POSITIVE_INFINITY;
        }
        return 100.0 * Math.abs((oldValue - newValue) / oldValue);
    }

    private boolean needBrokerDataUpdate() {
        long updateMaxIntervalMillis = TimeUnit.MINUTES.toMillis(this.conf.getLoadBalancerReportUpdateMaxIntervalMinutes());
        long timeSinceLastReportWrittenToZooKeeper = System.currentTimeMillis() - this.localData.getLastUpdate();
        if (timeSinceLastReportWrittenToZooKeeper > updateMaxIntervalMillis) {
            log.info("Writing local data to ZooKeeper because time since last update exceeded threshold of {} minutes", (Object)this.conf.getLoadBalancerReportUpdateMaxIntervalMinutes());
            return true;
        }
        double maxChange = Math.max(100.0 * Math.abs(this.lastData.getMaxResourceUsage() - this.localData.getMaxResourceUsage()), Math.max(this.percentChange(this.lastData.getMsgRateIn() + this.lastData.getMsgRateOut(), this.localData.getMsgRateIn() + this.localData.getMsgRateOut()), Math.max(this.percentChange(this.lastData.getMsgThroughputIn() + this.lastData.getMsgThroughputOut(), this.localData.getMsgThroughputIn() + this.localData.getMsgThroughputOut()), this.percentChange(this.lastData.getNumBundles(), this.localData.getNumBundles()))));
        if (maxChange > (double)this.conf.getLoadBalancerReportUpdateThresholdPercentage()) {
            log.info("Writing local data to ZooKeeper because maximum change {}% exceeded threshold {}%; time since last report written is {} seconds", new Object[]{maxChange, this.conf.getLoadBalancerReportUpdateThresholdPercentage(), (double)timeSinceLastReportWrittenToZooKeeper / 1000.0});
            return true;
        }
        return false;
    }

    public void updateAll() {
        if (log.isDebugEnabled()) {
            log.debug("Updating broker and bundle data for loadreport");
        }
        this.updateAllBrokerData();
        this.updateBundleData();
        this.checkNamespaceBundleSplit();
    }

    private void updateAllBrokerData() {
        Set<String> activeBrokers = this.getAvailableBrokers();
        Map<String, BrokerData> brokerDataMap = this.loadData.getBrokerData();
        for (String broker : activeBrokers) {
            try {
                String key = String.format("%s/%s", "/loadbalance/brokers", broker);
                LocalBrokerData localData = (LocalBrokerData)this.brokerDataCache.get(key).orElseThrow(KeeperException.NoNodeException::new);
                if (brokerDataMap.containsKey(broker)) {
                    brokerDataMap.get(broker).setLocalData(localData);
                    continue;
                }
                brokerDataMap.put(broker, new BrokerData(localData));
            }
            catch (KeeperException.NoNodeException ne) {
                brokerDataMap.remove(broker);
                log.warn("[{}] broker load-report znode not present", (Object)broker, (Object)ne);
            }
            catch (Exception e) {
                log.warn("Error reading broker data from cache for broker - [{}], [{}]", (Object)broker, (Object)e.getMessage());
            }
        }
        for (String broker : brokerDataMap.keySet()) {
            if (activeBrokers.contains(broker)) continue;
            brokerDataMap.remove(broker);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateBundleData() {
        Map<String, BundleData> bundleData = this.loadData.getBundleData();
        for (Map.Entry<String, BrokerData> brokerEntry : this.loadData.getBrokerData().entrySet()) {
            ConcurrentOpenHashMap concurrentOpenHashMap;
            Map<String, BundleData> preallocatedBundleData;
            String broker = brokerEntry.getKey();
            BrokerData brokerData = brokerEntry.getValue();
            Map statsMap = brokerData.getLocalData().getLastStats();
            for (Map.Entry entry : statsMap.entrySet()) {
                String bundle = (String)entry.getKey();
                NamespaceBundleStats stats = (NamespaceBundleStats)entry.getValue();
                if (bundleData.containsKey(bundle)) {
                    bundleData.get(bundle).update(stats);
                    continue;
                }
                BundleData currentBundleData = this.getBundleDataOrDefault(bundle);
                currentBundleData.update(stats);
                bundleData.put(bundle, currentBundleData);
            }
            Map<String, BundleData> map = preallocatedBundleData = brokerData.getPreallocatedBundleData();
            synchronized (map) {
                for (String preallocatedBundleName : brokerData.getPreallocatedBundleData().keySet()) {
                    if (brokerData.getLocalData().getBundles().contains(preallocatedBundleName)) {
                        Iterator<Map.Entry<String, BundleData>> preallocatedIterator = preallocatedBundleData.entrySet().iterator();
                        while (preallocatedIterator.hasNext()) {
                            String bundle = preallocatedIterator.next().getKey();
                            if (!bundleData.containsKey(bundle)) continue;
                            preallocatedIterator.remove();
                            this.preallocatedBundleToBroker.remove(bundle);
                        }
                    }
                    this.preallocatedBundleToBroker.remove(preallocatedBundleName);
                }
            }
            brokerData.getTimeAverageData().reset(statsMap.keySet(), bundleData, this.defaultStats);
            ConcurrentOpenHashMap concurrentOpenHashMap2 = concurrentOpenHashMap = (ConcurrentOpenHashMap)this.brokerToNamespaceToBundleRange.computeIfAbsent((Object)broker, k -> new ConcurrentOpenHashMap());
            synchronized (concurrentOpenHashMap2) {
                concurrentOpenHashMap.clear();
                LoadManagerShared.fillNamespaceToBundlesMap(statsMap.keySet(), (ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>)concurrentOpenHashMap);
                LoadManagerShared.fillNamespaceToBundlesMap(preallocatedBundleData.keySet(), (ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>)concurrentOpenHashMap);
            }
        }
    }

    @Override
    public void disableBroker() throws PulsarServerException {
        if (StringUtils.isNotEmpty((CharSequence)this.brokerZnodePath)) {
            try {
                this.pulsar.getZkClient().delete(this.brokerZnodePath, -1);
            }
            catch (KeeperException.NoNodeException e) {
                throw new PulsarServerException.NotFoundException((Throwable)e);
            }
            catch (Exception e) {
                throw new PulsarServerException((Throwable)e);
            }
        }
    }

    @Override
    public synchronized void doLoadShedding() {
        if (!LoadManagerShared.isLoadSheddingEnabled(this.pulsar)) {
            return;
        }
        if (this.getAvailableBrokers().size() <= 1) {
            log.info("Only 1 broker available: no load shedding will be performed");
            return;
        }
        long timeout = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(this.conf.getLoadBalancerSheddingGracePeriodMinutes());
        Map<String, Long> recentlyUnloadedBundles = this.loadData.getRecentlyUnloadedBundles();
        recentlyUnloadedBundles.keySet().removeIf(e -> (Long)recentlyUnloadedBundles.get(e) < timeout);
        for (LoadSheddingStrategy strategy : this.loadSheddingPipeline) {
            Multimap<String, String> bundlesToUnload = strategy.findBundlesForUnloading(this.loadData, this.conf);
            bundlesToUnload.asMap().forEach((broker, bundles) -> bundles.forEach(bundle -> {
                String bundleRange;
                String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
                if (!this.shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle), (String)broker)) {
                    return;
                }
                log.info("[Overload shedder] Unloading bundle: {} from broker {}", bundle, broker);
                try {
                    this.pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName, bundleRange);
                    this.loadData.getRecentlyUnloadedBundles().put((String)bundle, System.currentTimeMillis());
                }
                catch (PulsarServerException | PulsarAdminException e) {
                    log.warn("Error when trying to perform load shedding on {} for broker {}", new Object[]{bundle, broker, e});
                }
            }));
            this.updateBundleUnloadingMetrics(bundlesToUnload);
        }
    }

    private void updateBundleUnloadingMetrics(Multimap<String, String> bundlesToUnload) {
        this.unloadBrokerCount += (long)bundlesToUnload.keySet().size();
        this.unloadBundleCount += (long)bundlesToUnload.values().size();
        ArrayList metrics = Lists.newArrayList();
        HashMap<String, String> dimensions = new HashMap<String, String>();
        dimensions.put("metric", "bundleUnloading");
        Metrics m = Metrics.create(dimensions);
        m.put("brk_lb_unload_broker_count", (Object)this.unloadBrokerCount);
        m.put("brk_lb_unload_bundle_count", (Object)this.unloadBundleCount);
        metrics.add(m);
        this.bundleUnloadMetrics.set(metrics);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean shouldAntiAffinityNamespaceUnload(String namespace, String bundle, String currentBroker) {
        try {
            Optional nsPolicies = this.pulsar.getLocalZkCacheService().policiesCache().get(PulsarWebResource.joinPath("/admin/local-policies", namespace));
            if (!nsPolicies.isPresent() || StringUtils.isBlank((CharSequence)((LocalPolicies)nsPolicies.get()).namespaceAntiAffinityGroup)) {
                return true;
            }
            Set<String> set = this.brokerCandidateCache;
            synchronized (set) {
                this.brokerCandidateCache.clear();
                NamespaceBundle serviceUnit = this.pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespace, bundle);
                LoadManagerShared.applyNamespacePolicies(serviceUnit, this.policies, this.brokerCandidateCache, this.getAvailableBrokers(), this.brokerTopicLoadingPredicate);
                return LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, bundle, currentBroker, this.pulsar, this.brokerToNamespaceToBundleRange, this.brokerCandidateCache);
            }
        }
        catch (Exception e) {
            log.warn("Failed to check anti-affinity namespace ownership for {}/{}/{}, {}", new Object[]{namespace, bundle, currentBroker, e.getMessage()});
            return true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void checkNamespaceBundleSplit() {
        if (!this.conf.isLoadBalancerAutoBundleSplitEnabled() || this.pulsar.getLeaderElectionService() == null || !this.pulsar.getLeaderElectionService().isLeader()) {
            return;
        }
        boolean unloadSplitBundles = this.pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled();
        BundleSplitStrategy bundleSplitStrategy = this.bundleSplitStrategy;
        synchronized (bundleSplitStrategy) {
            Set<String> bundlesToBeSplit = this.bundleSplitStrategy.findBundlesToSplit(this.loadData, this.pulsar);
            NamespaceBundleFactory namespaceBundleFactory = this.pulsar.getNamespaceService().getNamespaceBundleFactory();
            for (String bundleName : bundlesToBeSplit) {
                try {
                    String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundleName);
                    String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundleName);
                    if (!namespaceBundleFactory.canSplitBundle(namespaceBundleFactory.getBundle(namespaceName, bundleRange))) continue;
                    this.loadData.getBundleData().remove(bundleName);
                    this.localData.getLastStats().remove(bundleName);
                    this.pulsar.getNamespaceService().getNamespaceBundleFactory().invalidateBundleCache(NamespaceName.get((String)namespaceName));
                    this.deleteBundleDataFromZookeeper(bundleName);
                    log.info("Load-manager splitting bundle {} and unloading {}", (Object)bundleName, (Object)unloadSplitBundles);
                    this.pulsar.getAdminClient().namespaces().splitNamespaceBundle(namespaceName, bundleRange, unloadSplitBundles, null);
                    log.info("Successfully split namespace bundle {}", (Object)bundleName);
                }
                catch (Exception e) {
                    log.error("Failed to split namespace bundle {}", (Object)bundleName, (Object)e);
                }
            }
            this.updateBundleSplitMetrics(bundlesToBeSplit);
        }
    }

    private void updateBundleSplitMetrics(Set<String> bundlesToBeSplit) {
        this.bundleSplitCount += (long)bundlesToBeSplit.size();
        ArrayList metrics = Lists.newArrayList();
        HashMap<String, String> dimensions = new HashMap<String, String>();
        dimensions.put("metric", "bundlesSplit");
        Metrics m = Metrics.create(dimensions);
        m.put("brk_lb_bundles_split_count", (Object)this.bundleSplitCount);
        metrics.add(m);
        this.bundleSplitMetrics.set(metrics);
    }

    public void onUpdate(String path, LocalBrokerData data, Stat stat) {
        this.scheduler.submit(this::updateAll);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public Optional<String> selectBrokerForAssignment(ServiceUnitId serviceUnit) {
        Object object;
        BundleData data;
        String bundle;
        long startTime = System.nanoTime();
        try {
            Set<String> set = this.brokerCandidateCache;
            synchronized (set) {
                bundle = serviceUnit.toString();
                if (this.preallocatedBundleToBroker.containsKey(bundle)) {
                    Optional<String> optional = Optional.of(this.preallocatedBundleToBroker.get(bundle));
                    // MONITOREXIT @DISABLED, blocks:[0, 16, 7] lbl8 : MonitorExitStatement: MONITOREXIT : var4_3
                    selectBrokerForAssignment.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
                    return optional;
                }
                data = this.loadData.getBundleData().computeIfAbsent(bundle, key -> this.getBundleDataOrDefault(bundle));
                this.brokerCandidateCache.clear();
            }
        }
        catch (Throwable throwable) {
            selectBrokerForAssignment.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
            throw throwable;
        }
        {
            ConcurrentOpenHashMap namespaceToBundleRange;
            LoadManagerShared.applyNamespacePolicies(serviceUnit, this.policies, this.brokerCandidateCache, this.getAvailableBrokers(), this.brokerTopicLoadingPredicate);
            LoadManagerShared.filterBrokersWithLargeTopicCount(this.brokerCandidateCache, this.loadData, this.conf.getLoadBalancerBrokerMaxTopics());
            LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(this.pulsar, serviceUnit.toString(), this.brokerCandidateCache, this.brokerToNamespaceToBundleRange, this.brokerToFailureDomainMap);
            LoadManagerShared.removeMostServicingBrokersForNamespace(serviceUnit.toString(), this.brokerCandidateCache, this.brokerToNamespaceToBundleRange);
            log.info("{} brokers being considered for assignment of {}", (Object)this.brokerCandidateCache.size(), (Object)bundle);
            try {
                for (BrokerFilter brokerFilter : this.filterPipeline) {
                    brokerFilter.filter(this.brokerCandidateCache, data, this.loadData, this.conf);
                }
            }
            catch (BrokerFilterException x) {
                LoadManagerShared.applyNamespacePolicies(serviceUnit, this.policies, this.brokerCandidateCache, this.getAvailableBrokers(), this.brokerTopicLoadingPredicate);
            }
            if (this.brokerCandidateCache.isEmpty()) {
                LoadManagerShared.applyNamespacePolicies(serviceUnit, this.policies, this.brokerCandidateCache, this.getAvailableBrokers(), this.brokerTopicLoadingPredicate);
            }
            Optional<String> broker = this.placementStrategy.selectBroker(this.brokerCandidateCache, data, this.loadData, this.conf);
            if (log.isDebugEnabled()) {
                log.debug("Selected broker {} from candidate brokers {}", broker, this.brokerCandidateCache);
            }
            if (!broker.isPresent()) {
                Optional<String> optional = broker;
                // MONITOREXIT @DISABLED, blocks:[7, 13] lbl38 : MonitorExitStatement: MONITOREXIT : var4_3
                selectBrokerForAssignment.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
                return optional;
            }
            double d = (double)this.conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
            double maxUsage = this.loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
            if (maxUsage > d) {
                LoadManagerShared.applyNamespacePolicies(serviceUnit, this.policies, this.brokerCandidateCache, this.getAvailableBrokers(), this.brokerTopicLoadingPredicate);
                broker = this.placementStrategy.selectBroker(this.brokerCandidateCache, data, this.loadData, this.conf);
            }
            this.loadData.getBrokerData().get(broker.get()).getPreallocatedBundleData().put(bundle, data);
            this.preallocatedBundleToBroker.put(bundle, broker.get());
            String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
            String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
            object = namespaceToBundleRange = (ConcurrentOpenHashMap)this.brokerToNamespaceToBundleRange.computeIfAbsent((Object)broker.get(), k -> new ConcurrentOpenHashMap());
            synchronized (object) {
                ((ConcurrentOpenHashSet)namespaceToBundleRange.computeIfAbsent((Object)namespaceName, k -> new ConcurrentOpenHashSet())).add((Object)bundleRange);
            }
            object = broker;
        }
        selectBrokerForAssignment.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
        return object;
    }

    @Override
    public void start() throws PulsarServerException {
        try {
            Map<String, String> protocolData = this.pulsar.getProtocolDataToAdvertise();
            this.lastData = new LocalBrokerData(this.pulsar.getSafeWebServiceAddress(), this.pulsar.getWebServiceAddressTls(), this.pulsar.getSafeBrokerServiceUrl(), this.pulsar.getBrokerServiceUrlTls(), this.pulsar.getAdvertisedListeners());
            this.lastData.setProtocols(protocolData);
            this.lastData.setPersistentTopicsEnabled(this.pulsar.getConfiguration().isEnablePersistentTopics());
            this.lastData.setNonPersistentTopicsEnabled(this.pulsar.getConfiguration().isEnableNonPersistentTopics());
            this.localData = new LocalBrokerData(this.pulsar.getSafeWebServiceAddress(), this.pulsar.getWebServiceAddressTls(), this.pulsar.getSafeBrokerServiceUrl(), this.pulsar.getBrokerServiceUrlTls(), this.pulsar.getAdvertisedListeners());
            this.localData.setProtocols(protocolData);
            this.localData.setBrokerVersionString(this.pulsar.getBrokerVersion());
            this.localData.setPersistentTopicsEnabled(this.pulsar.getConfiguration().isEnablePersistentTopics());
            this.localData.setNonPersistentTopicsEnabled(this.pulsar.getConfiguration().isEnableNonPersistentTopics());
            ModularLoadManagerImpl.createZPathIfNotExists(this.zkClient, "/loadbalance/brokers");
            String lookupServiceAddress = this.pulsar.getAdvertisedAddress() + ":" + (this.conf.getWebServicePort().isPresent() ? (Integer)this.conf.getWebServicePort().get() : (Integer)this.conf.getWebServicePortTls().get());
            this.brokerZnodePath = "/loadbalance/brokers/" + lookupServiceAddress;
            String timeAverageZPath = "/loadbalance/broker-time-average/" + lookupServiceAddress;
            this.updateLocalBrokerData();
            try {
                if (!org.apache.pulsar.zookeeper.ZkUtils.checkNodeAndWaitExpired((ZooKeeper)this.zkClient, (String)this.brokerZnodePath, (long)this.pulsar.getConfig().getZooKeeperSessionTimeoutMillis())) {
                    ZkUtils.createFullPathOptimistic((ZooKeeper)this.zkClient, (String)this.brokerZnodePath, (byte[])this.localData.getJsonBytes(), (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, (CreateMode)CreateMode.EPHEMERAL);
                } else {
                    this.zkClient.setData(this.brokerZnodePath, this.localData.getJsonBytes(), -1);
                }
            }
            catch (KeeperException.NodeExistsException e) {
                log.error("Broker znode - [{}] is own by different zookeeper-session", (Object)this.brokerZnodePath);
                throw new PulsarServerException("Broker znode - [" + this.brokerZnodePath + "] is owned by different zk-session");
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                log.error("Interrupted at creating znode - [{}] for load balance on zookeeper ", (Object)this.brokerZnodePath, (Object)ie);
                throw ie;
            }
            catch (Exception e) {
                log.error("Unable to create znode - [{}] for load balance on zookeeper ", (Object)this.brokerZnodePath, (Object)e);
                throw e;
            }
            ModularLoadManagerImpl.createZPathIfNotExists(this.zkClient, timeAverageZPath);
            this.zkClient.setData(timeAverageZPath, new TimeAverageBrokerData().getJsonBytes(), -1);
            this.updateAll();
            this.lastBundleDataUpdate = System.currentTimeMillis();
        }
        catch (Exception e) {
            log.error("Unable to create znode - [{}] for load balance on zookeeper ", (Object)this.brokerZnodePath, (Object)e);
            throw new PulsarServerException((Throwable)e);
        }
    }

    @Override
    public void stop() throws PulsarServerException {
        if (this.availableActiveBrokers != null) {
            this.availableActiveBrokers.close();
        }
        if (this.brokerDataCache != null) {
            this.brokerDataCache.close();
            this.brokerDataCache.clear();
        }
        this.scheduler.shutdown();
    }

    @Override
    public LocalBrokerData updateLocalBrokerData() {
        try {
            SystemResourceUsage systemResourceUsage = LoadManagerShared.getSystemResourceUsage(this.brokerHostUsage);
            this.localData.update(systemResourceUsage, this.getBundleStats());
            this.updateLoadBalancingMetrics(systemResourceUsage);
        }
        catch (Exception e) {
            log.warn("Error when attempting to update local broker data", (Throwable)e);
        }
        return this.localData;
    }

    private void updateLoadBalancingMetrics(SystemResourceUsage systemResourceUsage) {
        ArrayList metrics = Lists.newArrayList();
        HashMap<String, String> dimensions = new HashMap<String, String>();
        dimensions.put("broker", this.conf.getAdvertisedAddress());
        dimensions.put("metric", "loadBalancing");
        Metrics m = Metrics.create(dimensions);
        m.put("brk_lb_cpu_usage", (Object)Float.valueOf(systemResourceUsage.getCpu().percentUsage()));
        m.put("brk_lb_memory_usage", (Object)Float.valueOf(systemResourceUsage.getMemory().percentUsage()));
        m.put("brk_lb_directMemory_usage", (Object)Float.valueOf(systemResourceUsage.getDirectMemory().percentUsage()));
        m.put("brk_lb_bandwidth_in_usage", (Object)Float.valueOf(systemResourceUsage.getBandwidthIn().percentUsage()));
        m.put("brk_lb_bandwidth_out_usage", (Object)Float.valueOf(systemResourceUsage.getBandwidthOut().percentUsage()));
        metrics.add(m);
        this.loadBalancingMetrics.set(metrics);
    }

    @Override
    public void writeBrokerDataOnZooKeeper() {
        this.writeBrokerDataOnZooKeeper(false);
    }

    @Override
    public void writeBrokerDataOnZooKeeper(boolean force) {
        block4: {
            try {
                this.updateLocalBrokerData();
                if (!this.needBrokerDataUpdate() && !force) break block4;
                this.localData.setLastUpdate(System.currentTimeMillis());
                try {
                    this.zkClient.setData(this.brokerZnodePath, this.localData.getJsonBytes(), -1);
                }
                catch (KeeperException.NoNodeException e) {
                    ZkUtils.createFullPathOptimistic((ZooKeeper)this.zkClient, (String)this.brokerZnodePath, (byte[])this.localData.getJsonBytes(), (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, (CreateMode)CreateMode.EPHEMERAL);
                }
                this.localData.getLastBundleGains().clear();
                this.localData.getLastBundleLosses().clear();
                this.lastData.update(this.localData);
            }
            catch (Exception e) {
                log.warn("Error writing broker data on ZooKeeper", (Throwable)e);
            }
        }
    }

    public ZooKeeperCache.Deserializer<LocalBrokerData> getLoadReportDeserializer() {
        return loadReportDeserializer;
    }

    @Override
    public void writeBundleDataOnZooKeeper() {
        String zooKeeperPath;
        JSONWritable data;
        this.updateBundleData();
        for (Map.Entry<String, BundleData> entry : this.loadData.getBundleData().entrySet()) {
            String bundle = entry.getKey();
            data = entry.getValue();
            try {
                zooKeeperPath = ModularLoadManagerImpl.getBundleDataZooKeeperPath(bundle);
                ModularLoadManagerImpl.createZPathIfNotExists(this.zkClient, zooKeeperPath);
                this.zkClient.setData(zooKeeperPath, data.getJsonBytes(), -1);
            }
            catch (Exception e) {
                log.warn("Error when writing data for bundle {} to ZooKeeper: {}", (Object)bundle, (Object)e);
            }
        }
        for (Map.Entry<String, Object> entry : this.loadData.getBrokerData().entrySet()) {
            String broker = entry.getKey();
            data = ((BrokerData)entry.getValue()).getTimeAverageData();
            try {
                zooKeeperPath = "/loadbalance/broker-time-average/" + broker;
                ModularLoadManagerImpl.createZPathIfNotExists(this.zkClient, zooKeeperPath);
                this.zkClient.setData(zooKeeperPath, data.getJsonBytes(), -1);
                if (!log.isDebugEnabled()) continue;
                log.debug("Writing zookeeper report {}", (Object)data);
            }
            catch (Exception e) {
                log.warn("Error when writing time average broker data for {} to ZooKeeper: {}", (Object)broker, (Object)e);
            }
        }
    }

    private void deleteBundleDataFromZookeeper(String bundle) {
        String zooKeeperPath = ModularLoadManagerImpl.getBundleDataZooKeeperPath(bundle);
        try {
            if (this.zkClient.exists(zooKeeperPath, null) != null) {
                this.zkClient.delete(zooKeeperPath, -1);
            }
        }
        catch (Exception e) {
            log.warn("Failed to delete bundle-data {} from zookeeper", (Object)bundle, (Object)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void refreshBrokerToFailureDomainMap() {
        if (!this.pulsar.getConfiguration().isFailureDomainsEnabled()) {
            return;
        }
        String clusterDomainRootPath = this.pulsar.getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT;
        try {
            Map<String, String> map = this.brokerToFailureDomainMap;
            synchronized (map) {
                HashMap tempBrokerToFailureDomainMap = Maps.newHashMap();
                for (String domainName : this.pulsar.getConfigurationCache().failureDomainListCache().get()) {
                    try {
                        Optional domain = this.pulsar.getConfigurationCache().failureDomainCache().get(clusterDomainRootPath + "/" + domainName);
                        if (!domain.isPresent()) continue;
                        for (String broker : ((FailureDomain)domain.get()).brokers) {
                            tempBrokerToFailureDomainMap.put(broker, domainName);
                        }
                    }
                    catch (Exception e) {
                        log.warn("Failed to get domain {}", (Object)domainName, (Object)e);
                    }
                }
                this.brokerToFailureDomainMap = tempBrokerToFailureDomainMap;
            }
            log.info("Cluster domain refreshed {}", this.brokerToFailureDomainMap);
        }
        catch (Exception e) {
            log.warn("Failed to get domain-list for cluster {}", (Object)e.getMessage());
        }
    }

    @Override
    public LocalBrokerData getBrokerLocalData(String broker) {
        String key = String.format("%s/%s", "/loadbalance/brokers", broker);
        try {
            return this.brokerDataCache.get(key).orElse(null);
        }
        catch (Exception e) {
            log.warn("Failed to get local-broker data for {}", (Object)broker, (Object)e);
            return null;
        }
    }

    @Override
    public List<Metrics> getLoadBalancingMetrics() {
        ArrayList<Metrics> metricsCollection = new ArrayList<Metrics>();
        if (this.loadBalancingMetrics.get() != null) {
            metricsCollection.addAll((Collection)this.loadBalancingMetrics.get());
        }
        if (this.bundleUnloadMetrics.get() != null) {
            metricsCollection.addAll((Collection)this.bundleUnloadMetrics.get());
        }
        if (this.bundleSplitMetrics.get() != null) {
            metricsCollection.addAll((Collection<Metrics>)this.bundleSplitMetrics.get());
        }
        return metricsCollection;
    }
}

