/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.loadbalance.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BrokerFilter;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
import org.apache.pulsar.broker.loadbalance.BundleSplitStrategy;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
import org.apache.pulsar.broker.loadbalance.ModularLoadManager;
import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
import org.apache.pulsar.broker.loadbalance.impl.BrokerLoadManagerClassFilter;
import org.apache.pulsar.broker.loadbalance.impl.BrokerVersionFilter;
import org.apache.pulsar.broker.loadbalance.impl.BundleSplitterTask;
import org.apache.pulsar.broker.loadbalance.impl.GenericBrokerHostUsageImpl;
import org.apache.pulsar.broker.loadbalance.impl.LinuxBrokerHostUsageImpl;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.OverloadShedder;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.coordination.LockManager;
import org.apache.pulsar.metadata.api.coordination.ResourceLock;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
import org.apache.pulsar.policies.data.loadbalancer.BundleData;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.TimeAverageBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ModularLoadManagerImpl
implements ModularLoadManager {
    private static final Logger log = LoggerFactory.getLogger(ModularLoadManagerImpl.class);
    public static final double DEFAULT_MESSAGE_RATE = 50.0;
    public static final double DEFAULT_MESSAGE_THROUGHPUT = 50000.0;
    public static final int NUM_LONG_SAMPLES = 1000;
    public static final int NUM_SHORT_SAMPLES = 10;
    private final Set<String> brokerCandidateCache;
    private LockManager<LocalBrokerData> brokersData;
    private ResourceLock<LocalBrokerData> brokerDataLock;
    private BrokerHostUsage brokerHostUsage;
    private final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange;
    private String brokerZnodePath;
    private BundleSplitStrategy bundleSplitStrategy;
    private ServiceConfiguration conf;
    private final NamespaceBundleStats defaultStats;
    private final List<BrokerFilter> filterPipeline;
    private LocalBrokerData lastData;
    private final List<LoadSheddingStrategy> loadSheddingPipeline;
    private LocalBrokerData localData;
    private final LoadData loadData;
    private final Map<String, String> preallocatedBundleToBroker;
    private ModularLoadManagerStrategy placementStrategy;
    private SimpleResourceAllocationPolicies policies;
    private PulsarService pulsar;
    private PulsarResources pulsarResources;
    private final ExecutorService executors;
    private final LoadManagerShared.BrokerTopicLoadingPredicate brokerTopicLoadingPredicate;
    private Map<String, String> brokerToFailureDomainMap;
    private SessionEvent lastMetadataSessionEvent = SessionEvent.Reconnected;
    private AtomicReference<List<Metrics>> loadBalancingMetrics = new AtomicReference();
    private AtomicReference<List<Metrics>> bundleUnloadMetrics = new AtomicReference();
    private AtomicReference<List<Metrics>> bundleSplitMetrics = new AtomicReference();
    private AtomicReference<List<Metrics>> bundleMetrics = new AtomicReference();
    private long bundleSplitCount = 0L;
    private long unloadBrokerCount = 0L;
    private long unloadBundleCount = 0L;
    private final Lock lock = new ReentrantLock();
    private final Set<String> knownBrokers = new HashSet<String>();
    private Map<String, String> bundleBrokerAffinityMap;
    private static final Summary selectBrokerForAssignment = (Summary)Summary.build("pulsar_broker_load_manager_bundle_assigment", "-").quantile(0.5).quantile(0.99).quantile(0.999).quantile(1.0).register();

    public ModularLoadManagerImpl() {
        this.brokerCandidateCache = new HashSet<String>();
        this.brokerToNamespaceToBundleRange = ConcurrentOpenHashMap.newBuilder().build();
        this.defaultStats = new NamespaceBundleStats();
        this.filterPipeline = new ArrayList<BrokerFilter>();
        this.loadData = new LoadData();
        this.loadSheddingPipeline = new ArrayList<LoadSheddingStrategy>();
        this.preallocatedBundleToBroker = new ConcurrentHashMap<String, String>();
        this.executors = Executors.newSingleThreadExecutor((ThreadFactory)new ExecutorProvider.ExtendedThreadFactory("pulsar-modular-load-manager"));
        this.brokerToFailureDomainMap = new HashMap<String, String>();
        this.bundleBrokerAffinityMap = new ConcurrentHashMap<String, String>();
        this.brokerTopicLoadingPredicate = new LoadManagerShared.BrokerTopicLoadingPredicate(){

            @Override
            public boolean isEnablePersistentTopics(String brokerId) {
                BrokerData brokerData = ModularLoadManagerImpl.this.loadData.getBrokerData().get(brokerId);
                return brokerData != null && brokerData.getLocalData() != null && brokerData.getLocalData().isPersistentTopicsEnabled();
            }

            @Override
            public boolean isEnableNonPersistentTopics(String brokerId) {
                BrokerData brokerData = ModularLoadManagerImpl.this.loadData.getBrokerData().get(brokerId);
                return brokerData != null && brokerData.getLocalData() != null && brokerData.getLocalData().isNonPersistentTopicsEnabled();
            }
        };
    }

    @Override
    public void initialize(PulsarService pulsar) {
        this.pulsar = pulsar;
        this.pulsarResources = pulsar.getPulsarResources();
        this.brokersData = pulsar.getCoordinationService().getLockManager(LocalBrokerData.class);
        pulsar.getLocalMetadataStore().registerListener(this::handleDataNotification);
        pulsar.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent);
        this.brokerHostUsage = SystemUtils.IS_OS_LINUX ? new LinuxBrokerHostUsageImpl(pulsar) : new GenericBrokerHostUsageImpl(pulsar);
        this.bundleSplitStrategy = new BundleSplitterTask();
        this.conf = pulsar.getConfiguration();
        this.defaultStats.msgThroughputIn = 50000.0;
        this.defaultStats.msgThroughputOut = 50000.0;
        this.defaultStats.msgRateIn = 50.0;
        this.defaultStats.msgRateOut = 50.0;
        this.placementStrategy = ModularLoadManagerStrategy.create(this.conf);
        this.policies = new SimpleResourceAllocationPolicies(pulsar);
        this.filterPipeline.add(new BrokerLoadManagerClassFilter());
        this.filterPipeline.add(new BrokerVersionFilter());
        LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, this.brokerToFailureDomainMap);
        this.pulsarResources.getClusterResources().getFailureDomainResources().registerListener(__ -> this.executors.execute(() -> LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, this.brokerToFailureDomainMap)));
        if (this.placementStrategy instanceof LoadSheddingStrategy) {
            if (!this.conf.getLoadBalancerLoadSheddingStrategy().equals(this.conf.getLoadBalancerPlacementStrategy())) {
                throw new IllegalArgumentException("The load shedding strategy: " + this.conf.getLoadBalancerLoadSheddingStrategy() + " can't work with the placement strategy: " + this.conf.getLoadBalancerPlacementStrategy());
            }
            this.loadSheddingPipeline.add((LoadSheddingStrategy)((Object)this.placementStrategy));
        } else {
            this.loadSheddingPipeline.add(this.createLoadSheddingStrategy());
        }
    }

    public void handleDataNotification(Notification t) {
        if (t.getPath().startsWith("/loadbalance/brokers")) {
            this.brokersData.listLocks("/loadbalance/brokers").thenAccept(brokers -> this.reapDeadBrokerPreallocations((List<String>)brokers));
            try {
                this.executors.execute(this::updateAll);
            }
            catch (RejectedExecutionException rejectedExecutionException) {
                // empty catch block
            }
        }
    }

    private void handleMetadataSessionEvent(SessionEvent e) {
        this.lastMetadataSessionEvent = e;
    }

    private LoadSheddingStrategy createLoadSheddingStrategy() {
        try {
            return (LoadSheddingStrategy)Reflections.createInstance((String)this.conf.getLoadBalancerLoadSheddingStrategy(), LoadSheddingStrategy.class, (ClassLoader)Thread.currentThread().getContextClassLoader());
        }
        catch (Exception e) {
            log.error("Error when trying to create load shedding strategy: {}", (Object)this.conf.getLoadBalancerLoadPlacementStrategy(), (Object)e);
            log.error("create load shedding strategy failed. using OverloadShedder instead.");
            return new OverloadShedder();
        }
    }

    public ModularLoadManagerImpl(PulsarService pulsar) {
        this();
        this.initialize(pulsar);
    }

    private void reapDeadBrokerPreallocations(List<String> aliveBrokers) {
        for (String broker : this.loadData.getBrokerData().keySet()) {
            if (aliveBrokers.contains(broker)) continue;
            if (log.isDebugEnabled()) {
                log.debug("Broker {} appears to have stopped; now reclaiming any preallocations", (Object)broker);
            }
            Iterator<Map.Entry<String, String>> iterator = this.preallocatedBundleToBroker.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<String, String> entry = iterator.next();
                String preallocatedBundle = entry.getKey();
                String preallocatedBroker = entry.getValue();
                if (!broker.equals(preallocatedBroker)) continue;
                if (log.isDebugEnabled()) {
                    log.debug("Removing old preallocation on dead broker {} for bundle {}", (Object)preallocatedBroker, (Object)preallocatedBundle);
                }
                iterator.remove();
            }
        }
    }

    @Override
    public Set<String> getAvailableBrokers() {
        try {
            return new HashSet<String>((Collection)this.brokersData.listLocks("/loadbalance/brokers").get(this.conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS));
        }
        catch (Exception e) {
            log.warn("Error when trying to get active brokers", (Throwable)e);
            return this.loadData.getBrokerData().keySet();
        }
    }

    @Override
    public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
        CompletableFuture<Set<String>> future = new CompletableFuture<Set<String>>();
        this.brokersData.listLocks("/loadbalance/brokers").whenComplete((listLocks, ex) -> {
            if (ex != null) {
                Throwable realCause = FutureUtil.unwrapCompletionException((Throwable)ex);
                log.warn("Error when trying to get active brokers", realCause);
                future.complete(this.loadData.getBrokerData().keySet());
            } else {
                future.complete(Sets.newHashSet((Iterable)listLocks));
            }
        });
        return future;
    }

    @Override
    public BundleData getBundleDataOrDefault(String bundle) {
        BundleData bundleData = null;
        try {
            Optional optBundleData = (Optional)this.pulsarResources.getLoadBalanceResources().getBundleDataResources().getBundleData(bundle).join();
            if (optBundleData.isPresent()) {
                return (BundleData)optBundleData.get();
            }
            Optional optQuota = (Optional)this.pulsarResources.getLoadBalanceResources().getQuotaResources().getQuota(bundle).join();
            if (optQuota.isPresent()) {
                ResourceQuota quota = (ResourceQuota)optQuota.get();
                bundleData = new BundleData(10, 1000);
                TimeAverageMessageData shortTermData = bundleData.getShortTermData();
                TimeAverageMessageData longTermData = bundleData.getLongTermData();
                shortTermData.setMsgRateIn(quota.getMsgRateIn());
                shortTermData.setMsgRateOut(quota.getMsgRateOut());
                shortTermData.setMsgThroughputIn(quota.getBandwidthIn());
                shortTermData.setMsgThroughputOut(quota.getBandwidthOut());
                longTermData.setMsgRateIn(quota.getMsgRateIn());
                longTermData.setMsgRateOut(quota.getMsgRateOut());
                longTermData.setMsgThroughputIn(quota.getBandwidthIn());
                longTermData.setMsgThroughputOut(quota.getBandwidthOut());
                shortTermData.setNumSamples(10);
                longTermData.setNumSamples(1000);
            }
        }
        catch (Exception e) {
            log.warn("Error when trying to find bundle {} on metadata store: {}", (Object)bundle, (Object)e);
        }
        if (bundleData == null) {
            bundleData = new BundleData(10, 1000, this.defaultStats);
        }
        return bundleData;
    }

    private Map<String, NamespaceBundleStats> getBundleStats() {
        return this.pulsar.getBrokerService().getBundleStats();
    }

    private double percentChange(double oldValue, double newValue) {
        if (oldValue == 0.0) {
            if (newValue == 0.0) {
                return 0.0;
            }
            return Double.POSITIVE_INFINITY;
        }
        return 100.0 * Math.abs((oldValue - newValue) / oldValue);
    }

    private boolean needBrokerDataUpdate() {
        long updateMaxIntervalMillis = TimeUnit.MINUTES.toMillis(this.conf.getLoadBalancerReportUpdateMaxIntervalMinutes());
        long timeSinceLastReportWrittenToStore = System.currentTimeMillis() - this.localData.getLastUpdate();
        if (timeSinceLastReportWrittenToStore > updateMaxIntervalMillis) {
            log.info("Writing local data to metadata store because time since last update exceeded threshold of {} minutes", (Object)this.conf.getLoadBalancerReportUpdateMaxIntervalMinutes());
            return true;
        }
        double maxChange = Math.max(100.0 * Math.abs(this.lastData.getMaxResourceUsage() - this.localData.getMaxResourceUsage()), Math.max(this.percentChange(this.lastData.getMsgRateIn() + this.lastData.getMsgRateOut(), this.localData.getMsgRateIn() + this.localData.getMsgRateOut()), Math.max(this.percentChange(this.lastData.getMsgThroughputIn() + this.lastData.getMsgThroughputOut(), this.localData.getMsgThroughputIn() + this.localData.getMsgThroughputOut()), this.percentChange(this.lastData.getNumBundles(), this.localData.getNumBundles()))));
        if (maxChange > (double)this.conf.getLoadBalancerReportUpdateThresholdPercentage()) {
            log.info("Writing local data to metadata store because maximum change {}% exceeded threshold {}%; time since last report written is {} seconds", new Object[]{maxChange, this.conf.getLoadBalancerReportUpdateThresholdPercentage(), (double)timeSinceLastReportWrittenToStore / 1000.0});
            return true;
        }
        return false;
    }

    public void updateAll() {
        if (log.isDebugEnabled()) {
            log.debug("Updating broker and bundle data for loadreport");
        }
        this.cleanupDeadBrokersData();
        this.updateAllBrokerData();
        this.updateBundleData();
        this.checkNamespaceBundleSplit();
    }

    private synchronized void cleanupDeadBrokersData() {
        Set<String> activeBrokers = this.getAvailableBrokers();
        Collection deadBrokers = CollectionUtils.subtract(this.knownBrokers, activeBrokers);
        this.knownBrokers.clear();
        this.knownBrokers.addAll(activeBrokers);
        if (this.pulsar.getLeaderElectionService() != null && this.pulsar.getLeaderElectionService().isLeader()) {
            deadBrokers.forEach(this::deleteTimeAverageDataFromMetadataStoreAsync);
            for (LoadSheddingStrategy loadSheddingStrategy : this.loadSheddingPipeline) {
                loadSheddingStrategy.onActiveBrokersChange(activeBrokers);
            }
            this.placementStrategy.onActiveBrokersChange(activeBrokers);
        }
    }

    private void updateAllBrokerData() {
        Set<String> activeBrokers = this.getAvailableBrokers();
        Map<String, BrokerData> brokerDataMap = this.loadData.getBrokerData();
        for (String broker : activeBrokers) {
            try {
                String key = String.format("%s/%s", "/loadbalance/brokers", broker);
                Optional localData = (Optional)this.brokersData.readLock(key).get();
                if (!localData.isPresent()) {
                    brokerDataMap.remove(broker);
                    log.info("[{}] Broker load report is not present", (Object)broker);
                    continue;
                }
                if (brokerDataMap.containsKey(broker)) {
                    brokerDataMap.get(broker).setLocalData((LocalBrokerData)localData.get());
                    continue;
                }
                brokerDataMap.put(broker, new BrokerData((LocalBrokerData)localData.get()));
            }
            catch (Exception e) {
                log.warn("Error reading broker data from cache for broker - [{}], [{}]", (Object)broker, (Object)e.getMessage());
            }
        }
        for (String broker : brokerDataMap.keySet()) {
            if (activeBrokers.contains(broker)) continue;
            brokerDataMap.remove(broker);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateBundleData() {
        Map<String, BundleData> bundleData = this.loadData.getBundleData();
        HashSet<String> activeBundles = new HashSet<String>();
        for (Map.Entry<String, BrokerData> brokerEntry : this.loadData.getBrokerData().entrySet()) {
            ConcurrentOpenHashMap namespaceToBundleRange;
            Object bundle;
            String broker = brokerEntry.getKey();
            BrokerData brokerData = brokerEntry.getValue();
            Map statsMap = brokerData.getLocalData().getLastStats();
            for (Map.Entry entry : statsMap.entrySet()) {
                bundle = (String)entry.getKey();
                NamespaceBundleStats stats = (NamespaceBundleStats)entry.getValue();
                activeBundles.add((String)bundle);
                if (bundleData.containsKey(bundle)) {
                    bundleData.get(bundle).update(stats);
                    continue;
                }
                BundleData currentBundleData = this.getBundleDataOrDefault((String)bundle);
                currentBundleData.update(stats);
                bundleData.put((String)bundle, currentBundleData);
            }
            Map preallocatedBundleData = brokerData.getPreallocatedBundleData();
            Set ownedNsBundles = this.pulsar.getNamespaceService().getOwnedServiceUnits().stream().map(NamespaceBundle::toString).collect(Collectors.toSet());
            bundle = preallocatedBundleData;
            synchronized (bundle) {
                this.preallocatedBundleToBroker.keySet().removeAll(preallocatedBundleData.keySet());
                Iterator preallocatedIterator = preallocatedBundleData.entrySet().iterator();
                while (preallocatedIterator.hasNext()) {
                    String bundle2 = (String)preallocatedIterator.next().getKey();
                    if (ownedNsBundles.contains(bundle2) && (!brokerData.getLocalData().getBundles().contains(bundle2) || !bundleData.containsKey(bundle2))) continue;
                    preallocatedIterator.remove();
                }
            }
            TimeAverageBrokerData timeAverageData = new TimeAverageBrokerData();
            timeAverageData.reset(statsMap.keySet(), bundleData, this.defaultStats);
            brokerData.setTimeAverageData(timeAverageData);
            ConcurrentOpenHashMap concurrentOpenHashMap = namespaceToBundleRange = (ConcurrentOpenHashMap)this.brokerToNamespaceToBundleRange.computeIfAbsent((Object)broker, k -> ConcurrentOpenHashMap.newBuilder().build());
            synchronized (concurrentOpenHashMap) {
                namespaceToBundleRange.clear();
                LoadManagerShared.fillNamespaceToBundlesMap(statsMap.keySet(), (ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>)namespaceToBundleRange);
                LoadManagerShared.fillNamespaceToBundlesMap(preallocatedBundleData.keySet(), (ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>)namespaceToBundleRange);
            }
        }
        for (String bundle : bundleData.keySet()) {
            if (activeBundles.contains(bundle)) continue;
            bundleData.remove(bundle);
            if (!this.pulsar.getLeaderElectionService().isLeader()) continue;
            this.deleteBundleDataFromMetadataStore(bundle);
        }
    }

    @Override
    public void disableBroker() throws PulsarServerException {
        if (StringUtils.isNotEmpty((CharSequence)this.brokerZnodePath)) {
            try {
                this.brokerDataLock.release().join();
            }
            catch (CompletionException e) {
                if (e.getCause() instanceof MetadataStoreException.NotFoundException) {
                    throw new PulsarServerException.NotFoundException((Throwable)MetadataStoreException.unwrap((Throwable)e));
                }
                throw new PulsarServerException((Throwable)MetadataStoreException.unwrap((Throwable)e));
            }
        }
    }

    @Override
    public synchronized void doLoadShedding() {
        if (!LoadManagerShared.isLoadSheddingEnabled(this.pulsar)) {
            return;
        }
        if (this.getAvailableBrokers().size() <= 1) {
            log.info("Only 1 broker available: no load shedding will be performed");
            return;
        }
        long timeout = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(this.conf.getLoadBalancerSheddingGracePeriodMinutes());
        Map<String, Long> recentlyUnloadedBundles = this.loadData.getRecentlyUnloadedBundles();
        recentlyUnloadedBundles.keySet().removeIf(e -> (Long)recentlyUnloadedBundles.get(e) < timeout);
        for (LoadSheddingStrategy strategy : this.loadSheddingPipeline) {
            Multimap<String, String> bundlesToUnload = strategy.findBundlesForUnloading(this.loadData, this.conf);
            bundlesToUnload.asMap().forEach((broker, bundles) -> bundles.forEach(bundle -> {
                String bundleRange;
                String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
                if (!this.shouldNamespacePoliciesUnload(namespaceName, bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle), (String)broker)) {
                    return;
                }
                if (!this.shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, (String)broker)) {
                    return;
                }
                NamespaceBundle bundleToUnload = LoadManagerShared.getNamespaceBundle(this.pulsar, bundle);
                Optional<String> destBroker = this.selectBroker(bundleToUnload);
                if (!destBroker.isPresent()) {
                    log.info("[{}] No broker available to unload bundle {} from broker {}", new Object[]{strategy.getClass().getSimpleName(), bundle, broker});
                    return;
                }
                if (destBroker.get().equals(broker)) {
                    log.warn("[{}] The destination broker {} is the same as the current owner broker for Bundle {}", new Object[]{strategy.getClass().getSimpleName(), destBroker.get(), bundle});
                    return;
                }
                log.info("[{}] Unloading bundle: {} from broker {} to dest broker {}", new Object[]{strategy.getClass().getSimpleName(), bundle, broker, destBroker.get()});
                try {
                    this.pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName, bundleRange, destBroker.get());
                    this.loadData.getRecentlyUnloadedBundles().put((String)bundle, System.currentTimeMillis());
                }
                catch (PulsarServerException | PulsarAdminException e) {
                    log.warn("Error when trying to perform load shedding on {} for broker {}", new Object[]{bundle, broker, e});
                }
            }));
            this.updateBundleUnloadingMetrics(bundlesToUnload);
        }
    }

    private void updateBundleUnloadingMetrics(Multimap<String, String> bundlesToUnload) {
        this.unloadBrokerCount += (long)bundlesToUnload.keySet().size();
        this.unloadBundleCount += (long)bundlesToUnload.values().size();
        ArrayList<Metrics> metrics = new ArrayList<Metrics>();
        HashMap<String, String> dimensions = new HashMap<String, String>();
        dimensions.put("metric", "bundleUnloading");
        Metrics m = Metrics.create(dimensions);
        m.put("brk_lb_unload_broker_total", (Object)this.unloadBrokerCount);
        m.put("brk_lb_unload_bundle_total", (Object)this.unloadBundleCount);
        metrics.add(m);
        this.bundleUnloadMetrics.set(metrics);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean shouldNamespacePoliciesUnload(String namespace, String bundle, String currentBroker) {
        Set<String> set = this.brokerCandidateCache;
        synchronized (set) {
            this.brokerCandidateCache.clear();
            NamespaceBundle serviceUnit = this.pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespace, bundle);
            LoadManagerShared.applyNamespacePolicies(serviceUnit, this.policies, this.brokerCandidateCache, this.getAvailableBrokers(), this.brokerTopicLoadingPredicate);
            this.brokerCandidateCache.remove(currentBroker);
            return !this.brokerCandidateCache.isEmpty();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean shouldAntiAffinityNamespaceUnload(String namespace, String bundle, String currentBroker) {
        try {
            Optional<String> antiAffinityGroupOptional = LoadManagerShared.getNamespaceAntiAffinityGroup(this.pulsar, namespace);
            if (antiAffinityGroupOptional.isEmpty()) {
                return true;
            }
            Set<String> set = this.brokerCandidateCache;
            synchronized (set) {
                this.brokerCandidateCache.clear();
                NamespaceBundle serviceUnit = this.pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespace, bundle);
                LoadManagerShared.applyNamespacePolicies(serviceUnit, this.policies, this.brokerCandidateCache, this.getAvailableBrokers(), this.brokerTopicLoadingPredicate);
                return LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, bundle, currentBroker, this.pulsar, this.brokerToNamespaceToBundleRange, this.brokerCandidateCache);
            }
        }
        catch (Exception e) {
            log.warn("Failed to check anti-affinity namespace ownership for {}/{}/{}, {}", new Object[]{namespace, bundle, currentBroker, e.getMessage()});
            return true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void checkNamespaceBundleSplit() {
        if (!this.conf.isLoadBalancerAutoBundleSplitEnabled() || this.pulsar.getLeaderElectionService() == null || !this.pulsar.getLeaderElectionService().isLeader() || this.knownBrokers.size() <= 1) {
            return;
        }
        boolean unloadSplitBundles = this.pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled();
        BundleSplitStrategy bundleSplitStrategy = this.bundleSplitStrategy;
        synchronized (bundleSplitStrategy) {
            Map<String, String> bundlesToBeSplit = this.bundleSplitStrategy.findBundlesToSplit(this.loadData, this.pulsar);
            NamespaceBundleFactory namespaceBundleFactory = this.pulsar.getNamespaceService().getNamespaceBundleFactory();
            int splitCount = 0;
            for (String bundleName : bundlesToBeSplit.keySet()) {
                try {
                    String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundleName);
                    String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundleName);
                    if (!namespaceBundleFactory.canSplitBundle(namespaceBundleFactory.getBundle(namespaceName, bundleRange))) continue;
                    this.loadData.getBundleData().remove(bundleName);
                    this.localData.getLastStats().remove(bundleName);
                    this.pulsar.getNamespaceService().getNamespaceBundleFactory().invalidateBundleCache(NamespaceName.get((String)namespaceName));
                    this.deleteBundleDataFromMetadataStore(bundleName);
                    boolean isUnload = false;
                    String broker = bundlesToBeSplit.get(bundleName);
                    if (unloadSplitBundles && this.shouldNamespacePoliciesUnload(namespaceName, bundleRange, broker) && this.shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) {
                        isUnload = true;
                    }
                    log.info("Load-manager splitting bundle {} and unloading {}", (Object)bundleName, (Object)isUnload);
                    this.pulsar.getAdminClient().namespaces().splitNamespaceBundle(namespaceName, bundleRange, isUnload, null);
                    ++splitCount;
                    log.info("Successfully split namespace bundle {}", (Object)bundleName);
                }
                catch (Exception e) {
                    log.error("Failed to split namespace bundle {}", (Object)bundleName, (Object)e);
                }
            }
            this.updateBundleSplitMetrics(splitCount);
        }
    }

    private void updateBundleSplitMetrics(int bundlesSplit) {
        this.bundleSplitCount += (long)bundlesSplit;
        ArrayList<Metrics> metrics = new ArrayList<Metrics>();
        HashMap<String, String> dimensions = new HashMap<String, String>();
        dimensions.put("metric", "bundlesSplit");
        Metrics m = Metrics.create(dimensions);
        m.put("brk_lb_bundles_split_total", (Object)this.bundleSplitCount);
        metrics.add(m);
        this.bundleSplitMetrics.set(metrics);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     * Converted monitor instructions to comments
     * Lifted jumps to return sites
     */
    @Override
    public Optional<String> selectBrokerForAssignment(ServiceUnitId serviceUnit) {
        Optional<String> broker;
        String bundle;
        long startTime;
        block8: {
            Optional<String> optional;
            startTime = System.nanoTime();
            try {
                Set<String> set = this.brokerCandidateCache;
                // MONITORENTER : set
                bundle = serviceUnit.toString();
                if (this.preallocatedBundleToBroker.containsKey(bundle)) {
                    Optional<String> optional2 = Optional.of(this.preallocatedBundleToBroker.get(bundle));
                    // MONITOREXIT : set
                    selectBrokerForAssignment.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
                    return optional2;
                }
                broker = this.selectBroker(serviceUnit);
                if (broker.isPresent()) break block8;
                optional = broker;
            }
            catch (Throwable throwable) {
                selectBrokerForAssignment.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
                throw throwable;
            }
            selectBrokerForAssignment.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
            return optional;
        }
        this.preallocateBundle(bundle, broker.get());
        Optional<String> optional = broker;
        // MONITOREXIT : set
        selectBrokerForAssignment.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
        return optional;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void preallocateBundle(String bundle, String broker) {
        ConcurrentOpenHashMap namespaceToBundleRange;
        BundleData data = this.loadData.getBundleData().computeIfAbsent(bundle, key -> this.getBundleDataOrDefault(bundle));
        this.loadData.getBrokerData().get(broker).getPreallocatedBundleData().put(bundle, data);
        this.preallocatedBundleToBroker.put(bundle, broker);
        String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
        String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
        ConcurrentOpenHashMap concurrentOpenHashMap = namespaceToBundleRange = (ConcurrentOpenHashMap)this.brokerToNamespaceToBundleRange.computeIfAbsent((Object)broker, k -> ConcurrentOpenHashMap.newBuilder().build());
        synchronized (concurrentOpenHashMap) {
            ((ConcurrentOpenHashSet)namespaceToBundleRange.computeIfAbsent((Object)namespaceName, k -> ConcurrentOpenHashSet.newBuilder().build())).add((Object)bundleRange);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    Optional<String> selectBroker(ServiceUnitId serviceUnit) {
        Set<String> set = this.brokerCandidateCache;
        synchronized (set) {
            String bundle = serviceUnit.toString();
            BundleData data = this.loadData.getBundleData().computeIfAbsent(bundle, key -> this.getBundleDataOrDefault(bundle));
            this.brokerCandidateCache.clear();
            LoadManagerShared.applyNamespacePolicies(serviceUnit, this.policies, this.brokerCandidateCache, this.getAvailableBrokers(), this.brokerTopicLoadingPredicate);
            LoadManagerShared.filterBrokersWithLargeTopicCount(this.brokerCandidateCache, this.loadData, this.conf.getLoadBalancerBrokerMaxTopics());
            LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(this.pulsar, bundle, this.brokerCandidateCache, this.brokerToNamespaceToBundleRange, this.brokerToFailureDomainMap);
            if (this.conf.isLoadBalancerDistributeBundlesEvenlyEnabled() || serviceUnit.getNamespaceObject().equals((Object)NamespaceName.SYSTEM_NAMESPACE)) {
                LoadManagerShared.removeMostServicingBrokersForNamespace(bundle, this.brokerCandidateCache, this.brokerToNamespaceToBundleRange);
                if (log.isDebugEnabled()) {
                    log.debug("enable distribute bundles evenly to candidate-brokers, broker candidate count={}", (Object)this.brokerCandidateCache.size());
                }
            }
            log.info("{} brokers being considered for assignment of {}", (Object)this.brokerCandidateCache.size(), (Object)bundle);
            try {
                for (BrokerFilter filter : this.filterPipeline) {
                    filter.filter(this.brokerCandidateCache, data, this.loadData, this.conf);
                }
            }
            catch (BrokerFilterException x) {
                LoadManagerShared.applyNamespacePolicies(serviceUnit, this.policies, this.brokerCandidateCache, this.getAvailableBrokers(), this.brokerTopicLoadingPredicate);
            }
            if (this.brokerCandidateCache.isEmpty()) {
                LoadManagerShared.applyNamespacePolicies(serviceUnit, this.policies, this.brokerCandidateCache, this.getAvailableBrokers(), this.brokerTopicLoadingPredicate);
            }
            Optional<String> broker = this.placementStrategy.selectBroker(this.brokerCandidateCache, data, this.loadData, this.conf);
            if (log.isDebugEnabled()) {
                log.debug("Selected broker {} from candidate brokers {}", broker, this.brokerCandidateCache);
            }
            if (!broker.isPresent()) {
                return broker;
            }
            double overloadThreshold = (double)this.conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
            double maxUsage = this.loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
            if (maxUsage > overloadThreshold) {
                LoadManagerShared.applyNamespacePolicies(serviceUnit, this.policies, this.brokerCandidateCache, this.getAvailableBrokers(), this.brokerTopicLoadingPredicate);
                Optional<String> brokerTmp = this.placementStrategy.selectBroker(this.brokerCandidateCache, data, this.loadData, this.conf);
                if (brokerTmp.isPresent()) {
                    broker = brokerTmp;
                }
            }
            return broker;
        }
    }

    @Override
    public void start() throws PulsarServerException {
        try {
            Map<String, String> protocolData = this.pulsar.getProtocolDataToAdvertise();
            this.lastData = new LocalBrokerData(this.pulsar.getWebServiceAddress(), this.pulsar.getWebServiceAddressTls(), this.pulsar.getBrokerServiceUrl(), this.pulsar.getBrokerServiceUrlTls(), this.pulsar.getAdvertisedListeners());
            this.lastData.setProtocols(protocolData);
            this.lastData.setPersistentTopicsEnabled(this.pulsar.getConfiguration().isEnablePersistentTopics());
            this.lastData.setNonPersistentTopicsEnabled(this.pulsar.getConfiguration().isEnableNonPersistentTopics());
            this.localData = new LocalBrokerData(this.pulsar.getWebServiceAddress(), this.pulsar.getWebServiceAddressTls(), this.pulsar.getBrokerServiceUrl(), this.pulsar.getBrokerServiceUrlTls(), this.pulsar.getAdvertisedListeners());
            this.localData.setProtocols(protocolData);
            this.localData.setBrokerVersionString(this.pulsar.getBrokerVersion());
            this.localData.setPersistentTopicsEnabled(this.pulsar.getConfiguration().isEnablePersistentTopics());
            this.localData.setNonPersistentTopicsEnabled(this.pulsar.getConfiguration().isEnableNonPersistentTopics());
            this.localData.setLoadManagerClassName(this.conf.getLoadManagerClassName());
            String brokerId = this.pulsar.getBrokerId();
            this.brokerZnodePath = "/loadbalance/brokers/" + brokerId;
            this.updateLocalBrokerData();
            this.brokerDataLock = (ResourceLock)this.brokersData.acquireLock(this.brokerZnodePath, (Object)this.localData).join();
            this.pulsarResources.getLoadBalanceResources().getBrokerTimeAverageDataResources().updateTimeAverageBrokerData(brokerId, new TimeAverageBrokerData()).join();
            this.updateAll();
        }
        catch (Exception e) {
            log.error("Unable to acquire lock for broker: [{}]", (Object)this.brokerZnodePath, (Object)e);
            throw new PulsarServerException((Throwable)e);
        }
    }

    @Override
    public void stop() throws PulsarServerException {
        this.executors.shutdownNow();
        try {
            this.brokersData.close();
        }
        catch (Exception e) {
            log.warn("Failed to release broker lock: {}", (Object)e.getMessage());
        }
    }

    @Override
    public LocalBrokerData updateLocalBrokerData() {
        this.lock.lock();
        try {
            SystemResourceUsage systemResourceUsage = LoadManagerShared.getSystemResourceUsage(this.brokerHostUsage);
            this.localData.update(systemResourceUsage, this.getBundleStats());
            this.updateLoadBalancingMetrics(systemResourceUsage);
            if (this.conf.isExposeBundlesMetricsInPrometheus()) {
                this.updateLoadBalancingBundlesMetrics(this.getBundleStats());
            }
        }
        catch (Exception e) {
            log.warn("Error when attempting to update local broker data", (Throwable)e);
            if (e instanceof ConcurrentModificationException) {
                throw (ConcurrentModificationException)e;
            }
        }
        finally {
            this.lock.unlock();
        }
        return this.localData;
    }

    private void updateLoadBalancingBundlesMetrics(Map<String, NamespaceBundleStats> bundlesData) {
        ArrayList<Metrics> metrics = new ArrayList<Metrics>();
        for (Map.Entry<String, NamespaceBundleStats> entry : bundlesData.entrySet()) {
            String bundle = entry.getKey();
            NamespaceBundleStats stats = entry.getValue();
            HashMap<String, String> dimensions = new HashMap<String, String>();
            dimensions.put("broker", this.pulsar.getAdvertisedAddress());
            dimensions.put("bundle", bundle);
            dimensions.put("metric", "bundle");
            Metrics m = Metrics.create(dimensions);
            m.put("brk_bundle_msg_rate_in", (Object)stats.msgRateIn);
            m.put("brk_bundle_msg_rate_out", (Object)stats.msgRateOut);
            m.put("brk_bundle_topics_count", (Object)stats.topics);
            m.put("brk_bundle_consumer_count", (Object)stats.consumerCount);
            m.put("brk_bundle_producer_count", (Object)stats.producerCount);
            m.put("brk_bundle_msg_throughput_in", (Object)stats.msgThroughputIn);
            m.put("brk_bundle_msg_throughput_out", (Object)stats.msgThroughputOut);
            metrics.add(m);
        }
        this.bundleMetrics.set(metrics);
    }

    private void updateLoadBalancingMetrics(SystemResourceUsage systemResourceUsage) {
        ArrayList<Metrics> metrics = new ArrayList<Metrics>();
        HashMap<String, String> dimensions = new HashMap<String, String>();
        dimensions.put("broker", this.pulsar.getAdvertisedAddress());
        dimensions.put("metric", "loadBalancing");
        Metrics m = Metrics.create(dimensions);
        m.put("brk_lb_cpu_usage", (Object)Float.valueOf(systemResourceUsage.getCpu().percentUsage()));
        m.put("brk_lb_memory_usage", (Object)Float.valueOf(systemResourceUsage.getMemory().percentUsage()));
        m.put("brk_lb_directMemory_usage", (Object)Float.valueOf(systemResourceUsage.getDirectMemory().percentUsage()));
        m.put("brk_lb_bandwidth_in_usage", (Object)Float.valueOf(systemResourceUsage.getBandwidthIn().percentUsage()));
        m.put("brk_lb_bandwidth_out_usage", (Object)Float.valueOf(systemResourceUsage.getBandwidthOut().percentUsage()));
        metrics.add(m);
        this.loadBalancingMetrics.set(metrics);
    }

    @Override
    public void writeBrokerDataOnZooKeeper() {
        this.writeBrokerDataOnZooKeeper(false);
    }

    @Override
    public void writeBrokerDataOnZooKeeper(boolean force) {
        this.lock.lock();
        try {
            this.updateLocalBrokerData();
            if (this.lastMetadataSessionEvent != null && this.lastMetadataSessionEvent.isConnected() && (this.needBrokerDataUpdate() || force)) {
                this.localData.setLastUpdate(System.currentTimeMillis());
                this.brokerDataLock.updateValue((Object)this.localData).join();
                this.localData.cleanDeltas();
                this.lastData.update(this.localData);
            }
        }
        catch (Exception e) {
            log.warn("Error writing broker data on metadata store", (Throwable)e);
            if (e instanceof ConcurrentModificationException) {
                throw (ConcurrentModificationException)e;
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public void writeBundleDataOnZooKeeper() {
        BundleData data;
        this.updateBundleData();
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
        for (Map.Entry<String, BundleData> entry : this.loadData.getBundleData().entrySet()) {
            String bundle = entry.getKey();
            data = entry.getValue();
            futures.add(this.pulsarResources.getLoadBalanceResources().getBundleDataResources().updateBundleData(bundle, data));
        }
        for (Map.Entry<String, BundleData> entry : this.loadData.getBrokerData().entrySet()) {
            String broker = entry.getKey();
            data = ((BrokerData)entry.getValue()).getTimeAverageData();
            futures.add(this.pulsarResources.getLoadBalanceResources().getBrokerTimeAverageDataResources().updateTimeAverageBrokerData(broker, (TimeAverageBrokerData)data));
        }
        try {
            FutureUtil.waitForAll(futures).join();
        }
        catch (Exception e) {
            log.warn("Error when writing metadata data to store", (Throwable)e);
        }
    }

    private void deleteBundleDataFromMetadataStore(String bundle) {
        block2: {
            try {
                this.pulsarResources.getLoadBalanceResources().getBundleDataResources().deleteBundleData(bundle).join();
            }
            catch (Exception e) {
                if (e.getCause() instanceof MetadataStoreException.NotFoundException) break block2;
                log.warn("Failed to delete bundle-data {} from metadata store", (Object)bundle, (Object)e);
            }
        }
    }

    private void deleteTimeAverageDataFromMetadataStoreAsync(String broker) {
        this.pulsarResources.getLoadBalanceResources().getBrokerTimeAverageDataResources().deleteTimeAverageBrokerData(broker).whenComplete((__, ex) -> {
            if (ex != null && !(ex.getCause() instanceof MetadataStoreException.NotFoundException)) {
                log.warn("Failed to delete dead broker {} time average data from metadata store", (Object)broker, ex);
            }
        });
    }

    @Override
    public LocalBrokerData getBrokerLocalData(String broker) {
        String key = String.format("%s/%s", "/loadbalance/brokers", broker);
        try {
            return ((Optional)this.brokersData.readLock(key).join()).orElse(null);
        }
        catch (Exception e) {
            log.warn("Failed to get local-broker data for {}", (Object)broker, (Object)e);
            return null;
        }
    }

    @Override
    public List<Metrics> getLoadBalancingMetrics() {
        ArrayList<Metrics> metricsCollection = new ArrayList<Metrics>();
        if (this.loadBalancingMetrics.get() != null) {
            metricsCollection.addAll((Collection)this.loadBalancingMetrics.get());
        }
        if (this.bundleUnloadMetrics.get() != null) {
            metricsCollection.addAll((Collection)this.bundleUnloadMetrics.get());
        }
        if (this.bundleSplitMetrics.get() != null) {
            metricsCollection.addAll((Collection)this.bundleSplitMetrics.get());
        }
        if (this.bundleMetrics.get() != null) {
            metricsCollection.addAll((Collection<Metrics>)this.bundleMetrics.get());
        }
        return metricsCollection;
    }

    @Override
    public String setNamespaceBundleAffinity(String bundle, String broker) {
        if (StringUtils.isBlank((CharSequence)broker)) {
            return this.bundleBrokerAffinityMap.remove(bundle);
        }
        return this.bundleBrokerAffinityMap.put(bundle, broker);
    }
}

