/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.loadbalance.impl;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.broker.BrokerData;
import org.apache.pulsar.broker.BundleData;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TimeAverageBrokerData;
import org.apache.pulsar.broker.TimeAverageMessageData;
import org.apache.pulsar.broker.loadbalance.BrokerFilter;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
import org.apache.pulsar.broker.loadbalance.BundleSplitStrategy;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
import org.apache.pulsar.broker.loadbalance.ModularLoadManager;
import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
import org.apache.pulsar.broker.loadbalance.impl.BrokerVersionFilter;
import org.apache.pulsar.broker.loadbalance.impl.BundleSplitterTask;
import org.apache.pulsar.broker.loadbalance.impl.GenericBrokerHostUsageImpl;
import org.apache.pulsar.broker.loadbalance.impl.LinuxBrokerHostUsageImpl;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.OverloadShedder;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.coordination.LockManager;
import org.apache.pulsar.metadata.api.coordination.ResourceLock;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ModularLoadManagerImpl
implements ModularLoadManager {
    private static final Logger log = LoggerFactory.getLogger(ModularLoadManagerImpl.class);
    public static final String BUNDLE_DATA_PATH = "/loadbalance/bundle-data";
    public static final double DEFAULT_MESSAGE_RATE = 50.0;
    public static final double DEFAULT_MESSAGE_THROUGHPUT = 50000.0;
    public static final int NUM_LONG_SAMPLES = 1000;
    public static final int NUM_SHORT_SAMPLES = 10;
    public static final String RESOURCE_QUOTA_ZPATH = "/loadbalance/resource-quota/namespace";
    public static final String TIME_AVERAGE_BROKER_ZPATH = "/loadbalance/broker-time-average";
    private final Set<String> brokerCandidateCache;
    private LockManager<LocalBrokerData> brokersData;
    private ResourceLock<LocalBrokerData> brokerDataLock;
    private MetadataCache<BundleData> bundlesCache;
    private MetadataCache<ResourceQuota> resourceQuotaCache;
    private MetadataCache<TimeAverageBrokerData> timeAverageBrokerDataCache;
    private BrokerHostUsage brokerHostUsage;
    private final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange;
    private String brokerZnodePath;
    private BundleSplitStrategy bundleSplitStrategy;
    private ServiceConfiguration conf;
    private final NamespaceBundleStats defaultStats;
    private final List<BrokerFilter> filterPipeline;
    private long lastBundleDataUpdate;
    private LocalBrokerData lastData;
    private final List<LoadSheddingStrategy> loadSheddingPipeline;
    private LocalBrokerData localData;
    private final LoadData loadData;
    private final Map<String, String> preallocatedBundleToBroker;
    private ModularLoadManagerStrategy placementStrategy;
    private SimpleResourceAllocationPolicies policies;
    private PulsarService pulsar;
    private final ScheduledExecutorService scheduler;
    private final LoadManagerShared.BrokerTopicLoadingPredicate brokerTopicLoadingPredicate;
    private Map<String, String> brokerToFailureDomainMap;
    private SessionEvent lastMetadataSessionEvent = SessionEvent.Reconnected;
    private AtomicReference<List<Metrics>> loadBalancingMetrics = new AtomicReference();
    private AtomicReference<List<Metrics>> bundleUnloadMetrics = new AtomicReference();
    private AtomicReference<List<Metrics>> bundleSplitMetrics = new AtomicReference();
    private long bundleSplitCount = 0L;
    private long unloadBrokerCount = 0L;
    private long unloadBundleCount = 0L;
    private final Lock lock = new ReentrantLock();
    private static final Summary selectBrokerForAssignment = (Summary)Summary.build("pulsar_broker_load_manager_bundle_assigment", "-").quantile(0.5).quantile(0.99).quantile(0.999).quantile(1.0).register();

    public ModularLoadManagerImpl() {
        this.brokerCandidateCache = new HashSet<String>();
        this.brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap();
        this.defaultStats = new NamespaceBundleStats();
        this.filterPipeline = new ArrayList<BrokerFilter>();
        this.loadData = new LoadData();
        this.loadSheddingPipeline = new ArrayList<LoadSheddingStrategy>();
        this.preallocatedBundleToBroker = new ConcurrentHashMap<String, String>();
        this.scheduler = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new DefaultThreadFactory("pulsar-modular-load-manager"));
        this.brokerToFailureDomainMap = Maps.newHashMap();
        this.brokerTopicLoadingPredicate = new LoadManagerShared.BrokerTopicLoadingPredicate(){

            @Override
            public boolean isEnablePersistentTopics(String brokerUrl) {
                BrokerData brokerData = ModularLoadManagerImpl.this.loadData.getBrokerData().get(brokerUrl.replace("http://", ""));
                return brokerData != null && brokerData.getLocalData() != null && brokerData.getLocalData().isPersistentTopicsEnabled();
            }

            @Override
            public boolean isEnableNonPersistentTopics(String brokerUrl) {
                BrokerData brokerData = ModularLoadManagerImpl.this.loadData.getBrokerData().get(brokerUrl.replace("http://", ""));
                return brokerData != null && brokerData.getLocalData() != null && brokerData.getLocalData().isNonPersistentTopicsEnabled();
            }
        };
    }

    @Override
    public void initialize(PulsarService pulsar) {
        this.pulsar = pulsar;
        this.brokersData = pulsar.getCoordinationService().getLockManager(LocalBrokerData.class);
        this.bundlesCache = pulsar.getLocalMetadataStore().getMetadataCache(BundleData.class);
        this.resourceQuotaCache = pulsar.getLocalMetadataStore().getMetadataCache(ResourceQuota.class);
        this.timeAverageBrokerDataCache = pulsar.getLocalMetadataStore().getMetadataCache(TimeAverageBrokerData.class);
        pulsar.getLocalMetadataStore().registerListener(this::handleDataNotification);
        pulsar.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent);
        this.brokerHostUsage = SystemUtils.IS_OS_LINUX ? new LinuxBrokerHostUsageImpl(pulsar) : new GenericBrokerHostUsageImpl(pulsar);
        this.bundleSplitStrategy = new BundleSplitterTask();
        this.conf = pulsar.getConfiguration();
        this.defaultStats.msgThroughputIn = 50000.0;
        this.defaultStats.msgThroughputOut = 50000.0;
        this.defaultStats.msgRateIn = 50.0;
        this.defaultStats.msgRateOut = 50.0;
        this.placementStrategy = ModularLoadManagerStrategy.create(this.conf);
        this.policies = new SimpleResourceAllocationPolicies(pulsar);
        this.filterPipeline.add(new BrokerVersionFilter());
        this.refreshBrokerToFailureDomainMap();
        pulsar.getPulsarResources().getClusterResources().getFailureDomainResources().registerListener(__ -> this.scheduler.execute(() -> this.refreshBrokerToFailureDomainMap()));
        this.loadSheddingPipeline.add(this.createLoadSheddingStrategy());
    }

    public void handleDataNotification(Notification t) {
        if (t.getPath().startsWith("/loadbalance/brokers")) {
            this.brokersData.listLocks("/loadbalance/brokers").thenAccept(brokers -> this.reapDeadBrokerPreallocations((List<String>)brokers));
            try {
                this.scheduler.submit(this::updateAll);
            }
            catch (RejectedExecutionException rejectedExecutionException) {
                // empty catch block
            }
        }
    }

    private void handleMetadataSessionEvent(SessionEvent e) {
        this.lastMetadataSessionEvent = e;
    }

    private LoadSheddingStrategy createLoadSheddingStrategy() {
        try {
            Class<?> loadSheddingClass = Class.forName(this.conf.getLoadBalancerLoadSheddingStrategy());
            Object loadSheddingInstance = loadSheddingClass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            if (loadSheddingInstance instanceof LoadSheddingStrategy) {
                return (LoadSheddingStrategy)loadSheddingInstance;
            }
            log.error("create load shedding strategy failed. using OverloadShedder instead.");
            return new OverloadShedder();
        }
        catch (Exception e) {
            log.error("Error when trying to create load shedding strategy: ", (Throwable)e);
            return new OverloadShedder();
        }
    }

    public ModularLoadManagerImpl(PulsarService pulsar) {
        this();
        this.initialize(pulsar);
    }

    private void reapDeadBrokerPreallocations(List<String> aliveBrokers) {
        for (String broker : this.loadData.getBrokerData().keySet()) {
            if (aliveBrokers.contains(broker)) continue;
            if (log.isDebugEnabled()) {
                log.debug("Broker {} appears to have stopped; now reclaiming any preallocations", (Object)broker);
            }
            Iterator<Map.Entry<String, String>> iterator = this.preallocatedBundleToBroker.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<String, String> entry = iterator.next();
                String preallocatedBundle = entry.getKey();
                String preallocatedBroker = entry.getValue();
                if (!broker.equals(preallocatedBroker)) continue;
                if (log.isDebugEnabled()) {
                    log.debug("Removing old preallocation on dead broker {} for bundle {}", (Object)preallocatedBroker, (Object)preallocatedBundle);
                }
                iterator.remove();
            }
        }
    }

    @Override
    public Set<String> getAvailableBrokers() {
        try {
            return new HashSet<String>((Collection)this.brokersData.listLocks("/loadbalance/brokers").join());
        }
        catch (Exception e) {
            log.warn("Error when trying to get active brokers", (Throwable)e);
            return this.loadData.getBrokerData().keySet();
        }
    }

    private BundleData getBundleDataOrDefault(String bundle) {
        BundleData bundleData = null;
        try {
            Optional optBundleData = (Optional)this.bundlesCache.get(ModularLoadManagerImpl.getBundleDataPath(bundle)).join();
            if (optBundleData.isPresent()) {
                return (BundleData)optBundleData.get();
            }
            Optional optQuota = (Optional)this.resourceQuotaCache.get(String.format("%s/%s", RESOURCE_QUOTA_ZPATH, bundle)).join();
            if (optQuota.isPresent()) {
                ResourceQuota quota = (ResourceQuota)optQuota.get();
                bundleData = new BundleData(10, 1000);
                TimeAverageMessageData shortTermData = bundleData.getShortTermData();
                TimeAverageMessageData longTermData = bundleData.getLongTermData();
                shortTermData.setMsgRateIn(quota.getMsgRateIn());
                shortTermData.setMsgRateOut(quota.getMsgRateOut());
                shortTermData.setMsgThroughputIn(quota.getBandwidthIn());
                shortTermData.setMsgThroughputOut(quota.getBandwidthOut());
                longTermData.setMsgRateIn(quota.getMsgRateIn());
                longTermData.setMsgRateOut(quota.getMsgRateOut());
                longTermData.setMsgThroughputIn(quota.getBandwidthIn());
                longTermData.setMsgThroughputOut(quota.getBandwidthOut());
                shortTermData.setNumSamples(10);
                longTermData.setNumSamples(1000);
            }
        }
        catch (Exception e) {
            log.warn("Error when trying to find bundle {} on metadata store: {}", (Object)bundle, (Object)e);
        }
        if (bundleData == null) {
            bundleData = new BundleData(10, 1000, this.defaultStats);
        }
        return bundleData;
    }

    private static String getBundleDataPath(String bundle) {
        return "/loadbalance/bundle-data/" + bundle;
    }

    private Map<String, NamespaceBundleStats> getBundleStats() {
        return this.pulsar.getBrokerService().getBundleStats();
    }

    private double percentChange(double oldValue, double newValue) {
        if (oldValue == 0.0) {
            if (newValue == 0.0) {
                return 0.0;
            }
            return Double.POSITIVE_INFINITY;
        }
        return 100.0 * Math.abs((oldValue - newValue) / oldValue);
    }

    private boolean needBrokerDataUpdate() {
        long updateMaxIntervalMillis = TimeUnit.MINUTES.toMillis(this.conf.getLoadBalancerReportUpdateMaxIntervalMinutes());
        long timeSinceLastReportWrittenToStore = System.currentTimeMillis() - this.localData.getLastUpdate();
        if (timeSinceLastReportWrittenToStore > updateMaxIntervalMillis) {
            log.info("Writing local data to metadata store because time since last update exceeded threshold of {} minutes", (Object)this.conf.getLoadBalancerReportUpdateMaxIntervalMinutes());
            return true;
        }
        double maxChange = Math.max(100.0 * Math.abs(this.lastData.getMaxResourceUsage() - this.localData.getMaxResourceUsage()), Math.max(this.percentChange(this.lastData.getMsgRateIn() + this.lastData.getMsgRateOut(), this.localData.getMsgRateIn() + this.localData.getMsgRateOut()), Math.max(this.percentChange(this.lastData.getMsgThroughputIn() + this.lastData.getMsgThroughputOut(), this.localData.getMsgThroughputIn() + this.localData.getMsgThroughputOut()), this.percentChange(this.lastData.getNumBundles(), this.localData.getNumBundles()))));
        if (maxChange > (double)this.conf.getLoadBalancerReportUpdateThresholdPercentage()) {
            log.info("Writing local data to metadata store because maximum change {}% exceeded threshold {}%; time since last report written is {} seconds", new Object[]{maxChange, this.conf.getLoadBalancerReportUpdateThresholdPercentage(), (double)timeSinceLastReportWrittenToStore / 1000.0});
            return true;
        }
        return false;
    }

    public void updateAll() {
        if (log.isDebugEnabled()) {
            log.debug("Updating broker and bundle data for loadreport");
        }
        this.updateAllBrokerData();
        this.updateBundleData();
        this.checkNamespaceBundleSplit();
    }

    private void updateAllBrokerData() {
        Set<String> activeBrokers = this.getAvailableBrokers();
        Map<String, BrokerData> brokerDataMap = this.loadData.getBrokerData();
        for (String broker : activeBrokers) {
            try {
                String key = String.format("%s/%s", "/loadbalance/brokers", broker);
                Optional localData = (Optional)this.brokersData.readLock(key).get();
                if (!localData.isPresent()) {
                    brokerDataMap.remove(broker);
                    log.info("[{}] Broker load report is not present", (Object)broker);
                    continue;
                }
                if (brokerDataMap.containsKey(broker)) {
                    brokerDataMap.get(broker).setLocalData((LocalBrokerData)localData.get());
                    continue;
                }
                brokerDataMap.put(broker, new BrokerData((LocalBrokerData)localData.get()));
            }
            catch (Exception e) {
                log.warn("Error reading broker data from cache for broker - [{}], [{}]", (Object)broker, (Object)e.getMessage());
            }
        }
        for (String broker : brokerDataMap.keySet()) {
            if (activeBrokers.contains(broker)) continue;
            brokerDataMap.remove(broker);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateBundleData() {
        Map<String, BundleData> bundleData = this.loadData.getBundleData();
        for (Map.Entry<String, BrokerData> brokerEntry : this.loadData.getBrokerData().entrySet()) {
            ConcurrentOpenHashMap concurrentOpenHashMap;
            Map<String, BundleData> preallocatedBundleData;
            String broker = brokerEntry.getKey();
            BrokerData brokerData = brokerEntry.getValue();
            Map statsMap = brokerData.getLocalData().getLastStats();
            for (Map.Entry entry : statsMap.entrySet()) {
                String bundle = (String)entry.getKey();
                NamespaceBundleStats stats = (NamespaceBundleStats)entry.getValue();
                if (bundleData.containsKey(bundle)) {
                    bundleData.get(bundle).update(stats);
                    continue;
                }
                BundleData currentBundleData = this.getBundleDataOrDefault(bundle);
                currentBundleData.update(stats);
                bundleData.put(bundle, currentBundleData);
            }
            Map<String, BundleData> map = preallocatedBundleData = brokerData.getPreallocatedBundleData();
            synchronized (map) {
                for (String preallocatedBundleName : brokerData.getPreallocatedBundleData().keySet()) {
                    if (brokerData.getLocalData().getBundles().contains(preallocatedBundleName)) {
                        Iterator<Map.Entry<String, BundleData>> preallocatedIterator = preallocatedBundleData.entrySet().iterator();
                        while (preallocatedIterator.hasNext()) {
                            String bundle = preallocatedIterator.next().getKey();
                            if (!bundleData.containsKey(bundle)) continue;
                            preallocatedIterator.remove();
                            this.preallocatedBundleToBroker.remove(bundle);
                        }
                    }
                    this.preallocatedBundleToBroker.remove(preallocatedBundleName);
                }
            }
            brokerData.getTimeAverageData().reset(statsMap.keySet(), bundleData, this.defaultStats);
            ConcurrentOpenHashMap concurrentOpenHashMap2 = concurrentOpenHashMap = (ConcurrentOpenHashMap)this.brokerToNamespaceToBundleRange.computeIfAbsent((Object)broker, k -> new ConcurrentOpenHashMap());
            synchronized (concurrentOpenHashMap2) {
                concurrentOpenHashMap.clear();
                LoadManagerShared.fillNamespaceToBundlesMap(statsMap.keySet(), (ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>)concurrentOpenHashMap);
                LoadManagerShared.fillNamespaceToBundlesMap(preallocatedBundleData.keySet(), (ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>)concurrentOpenHashMap);
            }
        }
    }

    @Override
    public void disableBroker() throws PulsarServerException {
        if (StringUtils.isNotEmpty((CharSequence)this.brokerZnodePath)) {
            try {
                this.brokerDataLock.release().join();
            }
            catch (CompletionException e) {
                if (e.getCause() instanceof MetadataStoreException.NotFoundException) {
                    throw new PulsarServerException.NotFoundException((Throwable)MetadataStoreException.unwrap((Throwable)e));
                }
                throw new PulsarServerException((Throwable)MetadataStoreException.unwrap((Throwable)e));
            }
        }
    }

    @Override
    public synchronized void doLoadShedding() {
        if (!LoadManagerShared.isLoadSheddingEnabled(this.pulsar)) {
            return;
        }
        if (this.getAvailableBrokers().size() <= 1) {
            log.info("Only 1 broker available: no load shedding will be performed");
            return;
        }
        long timeout = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(this.conf.getLoadBalancerSheddingGracePeriodMinutes());
        Map<String, Long> recentlyUnloadedBundles = this.loadData.getRecentlyUnloadedBundles();
        recentlyUnloadedBundles.keySet().removeIf(e -> (Long)recentlyUnloadedBundles.get(e) < timeout);
        for (LoadSheddingStrategy strategy : this.loadSheddingPipeline) {
            Multimap<String, String> bundlesToUnload = strategy.findBundlesForUnloading(this.loadData, this.conf);
            bundlesToUnload.asMap().forEach((broker, bundles) -> bundles.forEach(bundle -> {
                String bundleRange;
                String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
                if (!this.shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle), (String)broker)) {
                    return;
                }
                log.info("[Overload shedder] Unloading bundle: {} from broker {}", bundle, broker);
                try {
                    this.pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName, bundleRange);
                    this.loadData.getRecentlyUnloadedBundles().put((String)bundle, System.currentTimeMillis());
                }
                catch (PulsarServerException | PulsarAdminException e) {
                    log.warn("Error when trying to perform load shedding on {} for broker {}", new Object[]{bundle, broker, e});
                }
            }));
            this.updateBundleUnloadingMetrics(bundlesToUnload);
        }
    }

    private void updateBundleUnloadingMetrics(Multimap<String, String> bundlesToUnload) {
        this.unloadBrokerCount += (long)bundlesToUnload.keySet().size();
        this.unloadBundleCount += (long)bundlesToUnload.values().size();
        ArrayList metrics = Lists.newArrayList();
        HashMap<String, String> dimensions = new HashMap<String, String>();
        dimensions.put("metric", "bundleUnloading");
        Metrics m = Metrics.create(dimensions);
        m.put("brk_lb_unload_broker_count", (Object)this.unloadBrokerCount);
        m.put("brk_lb_unload_bundle_count", (Object)this.unloadBundleCount);
        metrics.add(m);
        this.bundleUnloadMetrics.set(metrics);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean shouldAntiAffinityNamespaceUnload(String namespace, String bundle, String currentBroker) {
        try {
            Optional nsPolicies = this.pulsar.getPulsarResources().getLocalPolicies().getLocalPolicies(NamespaceName.get((String)namespace));
            if (!nsPolicies.isPresent() || StringUtils.isBlank((CharSequence)((LocalPolicies)nsPolicies.get()).namespaceAntiAffinityGroup)) {
                return true;
            }
            Set<String> set = this.brokerCandidateCache;
            synchronized (set) {
                this.brokerCandidateCache.clear();
                NamespaceBundle serviceUnit = this.pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespace, bundle);
                LoadManagerShared.applyNamespacePolicies(serviceUnit, this.policies, this.brokerCandidateCache, this.getAvailableBrokers(), this.brokerTopicLoadingPredicate);
                return LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, bundle, currentBroker, this.pulsar, this.brokerToNamespaceToBundleRange, this.brokerCandidateCache);
            }
        }
        catch (Exception e) {
            log.warn("Failed to check anti-affinity namespace ownership for {}/{}/{}, {}", new Object[]{namespace, bundle, currentBroker, e.getMessage()});
            return true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void checkNamespaceBundleSplit() {
        if (!this.conf.isLoadBalancerAutoBundleSplitEnabled() || this.pulsar.getLeaderElectionService() == null || !this.pulsar.getLeaderElectionService().isLeader()) {
            return;
        }
        boolean unloadSplitBundles = this.pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled();
        BundleSplitStrategy bundleSplitStrategy = this.bundleSplitStrategy;
        synchronized (bundleSplitStrategy) {
            Set<String> bundlesToBeSplit = this.bundleSplitStrategy.findBundlesToSplit(this.loadData, this.pulsar);
            NamespaceBundleFactory namespaceBundleFactory = this.pulsar.getNamespaceService().getNamespaceBundleFactory();
            for (String bundleName : bundlesToBeSplit) {
                try {
                    String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundleName);
                    String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundleName);
                    if (!namespaceBundleFactory.canSplitBundle(namespaceBundleFactory.getBundle(namespaceName, bundleRange))) continue;
                    this.loadData.getBundleData().remove(bundleName);
                    this.localData.getLastStats().remove(bundleName);
                    this.pulsar.getNamespaceService().getNamespaceBundleFactory().invalidateBundleCache(NamespaceName.get((String)namespaceName));
                    this.deleteBundleDataFromMetadataStore(bundleName);
                    log.info("Load-manager splitting bundle {} and unloading {}", (Object)bundleName, (Object)unloadSplitBundles);
                    this.pulsar.getAdminClient().namespaces().splitNamespaceBundle(namespaceName, bundleRange, unloadSplitBundles, null);
                    log.info("Successfully split namespace bundle {}", (Object)bundleName);
                }
                catch (Exception e) {
                    log.error("Failed to split namespace bundle {}", (Object)bundleName, (Object)e);
                }
            }
            this.updateBundleSplitMetrics(bundlesToBeSplit);
        }
    }

    private void updateBundleSplitMetrics(Set<String> bundlesToBeSplit) {
        this.bundleSplitCount += (long)bundlesToBeSplit.size();
        ArrayList metrics = Lists.newArrayList();
        HashMap<String, String> dimensions = new HashMap<String, String>();
        dimensions.put("metric", "bundlesSplit");
        Metrics m = Metrics.create(dimensions);
        m.put("brk_lb_bundles_split_count", (Object)this.bundleSplitCount);
        metrics.add(m);
        this.bundleSplitMetrics.set(metrics);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public Optional<String> selectBrokerForAssignment(ServiceUnitId serviceUnit) {
        Object object;
        BundleData data;
        String bundle;
        long startTime = System.nanoTime();
        try {
            Set<String> set = this.brokerCandidateCache;
            synchronized (set) {
                bundle = serviceUnit.toString();
                if (this.preallocatedBundleToBroker.containsKey(bundle)) {
                    Optional<String> optional = Optional.of(this.preallocatedBundleToBroker.get(bundle));
                    // MONITOREXIT @DISABLED, blocks:[0, 16, 7] lbl8 : MonitorExitStatement: MONITOREXIT : var4_3
                    selectBrokerForAssignment.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
                    return optional;
                }
                data = this.loadData.getBundleData().computeIfAbsent(bundle, key -> this.getBundleDataOrDefault(bundle));
                this.brokerCandidateCache.clear();
            }
        }
        catch (Throwable throwable) {
            selectBrokerForAssignment.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
            throw throwable;
        }
        {
            ConcurrentOpenHashMap namespaceToBundleRange;
            LoadManagerShared.applyNamespacePolicies(serviceUnit, this.policies, this.brokerCandidateCache, this.getAvailableBrokers(), this.brokerTopicLoadingPredicate);
            LoadManagerShared.filterBrokersWithLargeTopicCount(this.brokerCandidateCache, this.loadData, this.conf.getLoadBalancerBrokerMaxTopics());
            LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(this.pulsar, serviceUnit.toString(), this.brokerCandidateCache, this.brokerToNamespaceToBundleRange, this.brokerToFailureDomainMap);
            LoadManagerShared.removeMostServicingBrokersForNamespace(serviceUnit.toString(), this.brokerCandidateCache, this.brokerToNamespaceToBundleRange);
            log.info("{} brokers being considered for assignment of {}", (Object)this.brokerCandidateCache.size(), (Object)bundle);
            try {
                for (BrokerFilter brokerFilter : this.filterPipeline) {
                    brokerFilter.filter(this.brokerCandidateCache, data, this.loadData, this.conf);
                }
            }
            catch (BrokerFilterException x) {
                LoadManagerShared.applyNamespacePolicies(serviceUnit, this.policies, this.brokerCandidateCache, this.getAvailableBrokers(), this.brokerTopicLoadingPredicate);
            }
            if (this.brokerCandidateCache.isEmpty()) {
                LoadManagerShared.applyNamespacePolicies(serviceUnit, this.policies, this.brokerCandidateCache, this.getAvailableBrokers(), this.brokerTopicLoadingPredicate);
            }
            Optional<String> broker = this.placementStrategy.selectBroker(this.brokerCandidateCache, data, this.loadData, this.conf);
            if (log.isDebugEnabled()) {
                log.debug("Selected broker {} from candidate brokers {}", broker, this.brokerCandidateCache);
            }
            if (!broker.isPresent()) {
                Optional<String> optional = broker;
                // MONITOREXIT @DISABLED, blocks:[7, 13] lbl38 : MonitorExitStatement: MONITOREXIT : var4_3
                selectBrokerForAssignment.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
                return optional;
            }
            double d = (double)this.conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
            double maxUsage = this.loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
            if (maxUsage > d) {
                LoadManagerShared.applyNamespacePolicies(serviceUnit, this.policies, this.brokerCandidateCache, this.getAvailableBrokers(), this.brokerTopicLoadingPredicate);
                broker = this.placementStrategy.selectBroker(this.brokerCandidateCache, data, this.loadData, this.conf);
            }
            this.loadData.getBrokerData().get(broker.get()).getPreallocatedBundleData().put(bundle, data);
            this.preallocatedBundleToBroker.put(bundle, broker.get());
            String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
            String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
            object = namespaceToBundleRange = (ConcurrentOpenHashMap)this.brokerToNamespaceToBundleRange.computeIfAbsent((Object)broker.get(), k -> new ConcurrentOpenHashMap());
            synchronized (object) {
                ((ConcurrentOpenHashSet)namespaceToBundleRange.computeIfAbsent((Object)namespaceName, k -> new ConcurrentOpenHashSet())).add((Object)bundleRange);
            }
            object = broker;
        }
        selectBrokerForAssignment.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
        return object;
    }

    @Override
    public void start() throws PulsarServerException {
        try {
            Map<String, String> protocolData = this.pulsar.getProtocolDataToAdvertise();
            this.lastData = new LocalBrokerData(this.pulsar.getSafeWebServiceAddress(), this.pulsar.getWebServiceAddressTls(), this.pulsar.getBrokerServiceUrl(), this.pulsar.getBrokerServiceUrlTls(), this.pulsar.getAdvertisedListeners());
            this.lastData.setProtocols(protocolData);
            this.lastData.setPersistentTopicsEnabled(this.pulsar.getConfiguration().isEnablePersistentTopics());
            this.lastData.setNonPersistentTopicsEnabled(this.pulsar.getConfiguration().isEnableNonPersistentTopics());
            this.localData = new LocalBrokerData(this.pulsar.getSafeWebServiceAddress(), this.pulsar.getWebServiceAddressTls(), this.pulsar.getBrokerServiceUrl(), this.pulsar.getBrokerServiceUrlTls(), this.pulsar.getAdvertisedListeners());
            this.localData.setProtocols(protocolData);
            this.localData.setBrokerVersionString(this.pulsar.getBrokerVersion());
            this.localData.setPersistentTopicsEnabled(this.pulsar.getConfiguration().isEnablePersistentTopics());
            this.localData.setNonPersistentTopicsEnabled(this.pulsar.getConfiguration().isEnableNonPersistentTopics());
            String lookupServiceAddress = this.pulsar.getAdvertisedAddress() + ":" + (this.conf.getWebServicePort().isPresent() ? (Integer)this.conf.getWebServicePort().get() : (Integer)this.conf.getWebServicePortTls().get());
            this.brokerZnodePath = "/loadbalance/brokers/" + lookupServiceAddress;
            String timeAverageZPath = "/loadbalance/broker-time-average/" + lookupServiceAddress;
            this.updateLocalBrokerData();
            this.brokerDataLock = (ResourceLock)this.brokersData.acquireLock(this.brokerZnodePath, (Object)this.localData).join();
            this.timeAverageBrokerDataCache.readModifyUpdateOrCreate(timeAverageZPath, __ -> new TimeAverageBrokerData()).join();
            this.updateAll();
            this.lastBundleDataUpdate = System.currentTimeMillis();
        }
        catch (Exception e) {
            log.error("Unable to acquire lock for broker: [{}]", (Object)this.brokerZnodePath, (Object)e);
            throw new PulsarServerException((Throwable)e);
        }
    }

    @Override
    public void stop() throws PulsarServerException {
        this.scheduler.shutdownNow();
        try {
            this.brokersData.close();
        }
        catch (Exception e) {
            log.warn("Failed to release broker lock: {}", (Object)e.getMessage());
        }
    }

    @Override
    public LocalBrokerData updateLocalBrokerData() {
        this.lock.lock();
        try {
            SystemResourceUsage systemResourceUsage = LoadManagerShared.getSystemResourceUsage(this.brokerHostUsage);
            this.localData.update(systemResourceUsage, this.getBundleStats());
            this.updateLoadBalancingMetrics(systemResourceUsage);
        }
        catch (Exception e) {
            log.warn("Error when attempting to update local broker data", (Throwable)e);
            if (e instanceof ConcurrentModificationException) {
                throw (ConcurrentModificationException)e;
            }
        }
        finally {
            this.lock.unlock();
        }
        return this.localData;
    }

    private void updateLoadBalancingMetrics(SystemResourceUsage systemResourceUsage) {
        ArrayList metrics = Lists.newArrayList();
        HashMap<String, String> dimensions = new HashMap<String, String>();
        dimensions.put("broker", this.pulsar.getAdvertisedAddress());
        dimensions.put("metric", "loadBalancing");
        Metrics m = Metrics.create(dimensions);
        m.put("brk_lb_cpu_usage", (Object)Float.valueOf(systemResourceUsage.getCpu().percentUsage()));
        m.put("brk_lb_memory_usage", (Object)Float.valueOf(systemResourceUsage.getMemory().percentUsage()));
        m.put("brk_lb_directMemory_usage", (Object)Float.valueOf(systemResourceUsage.getDirectMemory().percentUsage()));
        m.put("brk_lb_bandwidth_in_usage", (Object)Float.valueOf(systemResourceUsage.getBandwidthIn().percentUsage()));
        m.put("brk_lb_bandwidth_out_usage", (Object)Float.valueOf(systemResourceUsage.getBandwidthOut().percentUsage()));
        metrics.add(m);
        this.loadBalancingMetrics.set(metrics);
    }

    @Override
    public void writeBrokerDataOnZooKeeper() {
        this.writeBrokerDataOnZooKeeper(false);
    }

    @Override
    public void writeBrokerDataOnZooKeeper(boolean force) {
        this.lock.lock();
        try {
            this.updateLocalBrokerData();
            if (this.lastMetadataSessionEvent != null && this.lastMetadataSessionEvent.isConnected() && (this.needBrokerDataUpdate() || force)) {
                this.localData.setLastUpdate(System.currentTimeMillis());
                this.brokerDataLock.updateValue((Object)this.localData).join();
                this.localData.cleanDeltas();
                this.lastData.update(this.localData);
            }
        }
        catch (Exception e) {
            log.warn("Error writing broker data on metadata store", (Throwable)e);
            if (e instanceof ConcurrentModificationException) {
                throw (ConcurrentModificationException)e;
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public void writeBundleDataOnZooKeeper() {
        Object data;
        this.updateBundleData();
        ArrayList<CompletionStage> futures = new ArrayList<CompletionStage>();
        for (Map.Entry<String, BundleData> entry : this.loadData.getBundleData().entrySet()) {
            String bundle = entry.getKey();
            data = entry.getValue();
            futures.add(this.bundlesCache.readModifyUpdateOrCreate(ModularLoadManagerImpl.getBundleDataPath(bundle), arg_0 -> ModularLoadManagerImpl.lambda$writeBundleDataOnZooKeeper$11((BundleData)data, arg_0)).thenApply(__ -> null));
        }
        for (Map.Entry<String, Object> entry : this.loadData.getBrokerData().entrySet()) {
            String broker = entry.getKey();
            data = ((BrokerData)entry.getValue()).getTimeAverageData();
            futures.add(this.timeAverageBrokerDataCache.readModifyUpdateOrCreate("/loadbalance/broker-time-average/" + broker, arg_0 -> ModularLoadManagerImpl.lambda$writeBundleDataOnZooKeeper$13((TimeAverageBrokerData)data, arg_0)).thenApply(__ -> null));
        }
        try {
            FutureUtil.waitForAll(futures).join();
        }
        catch (Exception e) {
            log.warn("Error when writing metadata data to store", (Throwable)e);
        }
    }

    private void deleteBundleDataFromMetadataStore(String bundle) {
        block2: {
            try {
                this.bundlesCache.delete(ModularLoadManagerImpl.getBundleDataPath(bundle)).join();
            }
            catch (Exception e) {
                if (e.getCause() instanceof MetadataStoreException.NotFoundException) break block2;
                log.warn("Failed to delete bundle-data {} from metadata store", (Object)bundle, (Object)e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void refreshBrokerToFailureDomainMap() {
        if (!this.pulsar.getConfiguration().isFailureDomainsEnabled()) {
            return;
        }
        ClusterResources.FailureDomainResources fdr = this.pulsar.getPulsarResources().getClusterResources().getFailureDomainResources();
        String clusterName = this.pulsar.getConfiguration().getClusterName();
        try {
            Map<String, String> map = this.brokerToFailureDomainMap;
            synchronized (map) {
                HashMap tempBrokerToFailureDomainMap = Maps.newHashMap();
                for (String domainName : fdr.listFailureDomains(clusterName)) {
                    try {
                        Optional domain = fdr.getFailureDomain(clusterName, domainName);
                        if (!domain.isPresent()) continue;
                        for (String broker : ((FailureDomainImpl)domain.get()).brokers) {
                            tempBrokerToFailureDomainMap.put(broker, domainName);
                        }
                    }
                    catch (Exception e) {
                        log.warn("Failed to get domain {}", (Object)domainName, (Object)e);
                    }
                }
                this.brokerToFailureDomainMap = tempBrokerToFailureDomainMap;
            }
            log.info("Cluster domain refreshed {}", this.brokerToFailureDomainMap);
        }
        catch (Exception e) {
            log.warn("Failed to get domain-list for cluster {}", (Object)e.getMessage());
        }
    }

    @Override
    public LocalBrokerData getBrokerLocalData(String broker) {
        String key = String.format("%s/%s", "/loadbalance/brokers", broker);
        try {
            return ((Optional)this.brokersData.readLock(key).join()).orElse(null);
        }
        catch (Exception e) {
            log.warn("Failed to get local-broker data for {}", (Object)broker, (Object)e);
            return null;
        }
    }

    @Override
    public List<Metrics> getLoadBalancingMetrics() {
        ArrayList<Metrics> metricsCollection = new ArrayList<Metrics>();
        if (this.loadBalancingMetrics.get() != null) {
            metricsCollection.addAll((Collection)this.loadBalancingMetrics.get());
        }
        if (this.bundleUnloadMetrics.get() != null) {
            metricsCollection.addAll((Collection)this.bundleUnloadMetrics.get());
        }
        if (this.bundleSplitMetrics.get() != null) {
            metricsCollection.addAll((Collection<Metrics>)this.bundleSplitMetrics.get());
        }
        return metricsCollection;
    }

    private static /* synthetic */ TimeAverageBrokerData lambda$writeBundleDataOnZooKeeper$13(TimeAverageBrokerData data, Optional __) {
        return data;
    }

    private static /* synthetic */ BundleData lambda$writeBundleDataOnZooKeeper$11(BundleData data, Optional __) {
        return data;
    }
}

