/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.loadbalance.impl;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableDouble;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
import org.apache.pulsar.policies.data.loadbalancer.BundleData;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThresholdShedder
implements LoadSheddingStrategy {
    private static final Logger log = LoggerFactory.getLogger(ThresholdShedder.class);
    private final Multimap<String, String> selectedBundlesCache = ArrayListMultimap.create();
    private static final double ADDITIONAL_THRESHOLD_PERCENT_MARGIN = 0.05;
    private static final double LOWER_BOUNDARY_THRESHOLD_MARGIN = 0.5;
    private static final double MB = 1048576.0;
    private final Map<String, Double> brokerAvgResourceUsage = new HashMap<String, Double>();

    @Override
    public synchronized Multimap<String, String> findBundlesForUnloading(LoadData loadData, ServiceConfiguration conf) {
        this.selectedBundlesCache.clear();
        double threshold = (double)conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
        Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles();
        double minThroughputThreshold = conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * 1048576.0;
        double avgUsage = this.getBrokerAvgUsage(loadData, conf.getLoadBalancerHistoryResourcePercentage(), conf);
        if (avgUsage == 0.0) {
            log.warn("average max resource usage is 0");
            return this.selectedBundlesCache;
        }
        loadData.getBrokerData().forEach((broker, brokerData) -> {
            LocalBrokerData localData = brokerData.getLocalData();
            double currentUsage = this.brokerAvgResourceUsage.getOrDefault(broker, 0.0);
            if (currentUsage < avgUsage + threshold) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] broker is not overloaded, ignoring at this point", broker);
                }
                return;
            }
            double percentOfTrafficToOffload = currentUsage - avgUsage - threshold + 0.05;
            double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
            double minimumThroughputToOffload = brokerCurrentThroughput * percentOfTrafficToOffload;
            if (minimumThroughputToOffload < minThroughputThreshold) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] broker is planning to shed throughput {} MByte/s less than minimumThroughputThreshold {} MByte/s, skipping bundle unload.", new Object[]{broker, minimumThroughputToOffload / 1048576.0, minThroughputThreshold / 1048576.0});
                }
                return;
            }
            log.info("Attempting to shed load on {}, which has max resource usage above avgUsage  and threshold {}% > {}% + {}% -- Offloading at least {} MByte/s of traffic, left throughput {} MByte/s", new Object[]{broker, 100.0 * currentUsage, 100.0 * avgUsage, 100.0 * threshold, minimumThroughputToOffload / 1048576.0, (brokerCurrentThroughput - minimumThroughputToOffload) / 1048576.0});
            if (localData.getBundles().size() > 1) {
                this.filterAndSelectBundle(loadData, recentlyUnloadedBundles, (String)broker, localData, minimumThroughputToOffload);
            } else if (localData.getBundles().size() == 1) {
                log.warn("HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. No Load Shedding will be done on this broker", localData.getBundles().iterator().next(), broker);
            } else {
                log.warn("Broker {} is overloaded despite having no bundles", broker);
            }
        });
        if (this.selectedBundlesCache.isEmpty() && conf.isLowerBoundarySheddingEnabled()) {
            this.tryLowerBoundaryShedding(loadData, conf);
        }
        return this.selectedBundlesCache;
    }

    private void filterAndSelectBundle(LoadData loadData, Map<String, Long> recentlyUnloadedBundles, String broker, LocalBrokerData localData, double minimumThroughputToOffload) {
        MutableDouble trafficMarkedToOffload = new MutableDouble(0.0);
        MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false);
        loadData.getBundleDataForLoadShedding().entrySet().stream().map(e -> {
            String bundle = (String)e.getKey();
            BundleData bundleData = (BundleData)e.getValue();
            TimeAverageMessageData shortTermData = bundleData.getShortTermData();
            double throughput = shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut();
            return Pair.of((Object)bundle, (Object)throughput);
        }).filter(e -> !recentlyUnloadedBundles.containsKey(e.getLeft())).filter(e -> localData.getBundles().contains(e.getLeft())).sorted((e1, e2) -> Double.compare((Double)e2.getRight(), (Double)e1.getRight())).forEach(e -> {
            if (trafficMarkedToOffload.doubleValue() < minimumThroughputToOffload || atLeastOneBundleSelected.isFalse()) {
                this.selectedBundlesCache.put((Object)broker, (Object)((String)e.getLeft()));
                trafficMarkedToOffload.add((Number)e.getRight());
                atLeastOneBundleSelected.setTrue();
            }
        });
    }

    private double getBrokerAvgUsage(LoadData loadData, double historyPercentage, ServiceConfiguration conf) {
        double totalUsage = 0.0;
        int totalBrokers = 0;
        for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
            LocalBrokerData localBrokerData = entry.getValue().getLocalData();
            String broker = entry.getKey();
            totalUsage += this.updateAvgResourceUsage(broker, localBrokerData, historyPercentage, conf);
            ++totalBrokers;
        }
        return totalBrokers > 0 ? totalUsage / (double)totalBrokers : 0.0;
    }

    private double updateAvgResourceUsage(String broker, LocalBrokerData localBrokerData, double historyPercentage, ServiceConfiguration conf) {
        Double historyUsage = this.brokerAvgResourceUsage.get(broker);
        double resourceUsage = localBrokerData.getMaxResourceUsageWithWeight(conf.getLoadBalancerCPUResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(), conf.getLoadBalancerBandwithInResourceWeight(), conf.getLoadBalancerBandwithOutResourceWeight());
        historyUsage = historyUsage == null ? resourceUsage : historyUsage * historyPercentage + (1.0 - historyPercentage) * resourceUsage;
        this.brokerAvgResourceUsage.put(broker, historyUsage);
        return historyUsage;
    }

    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
        double threshold = (double)conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
        double avgUsage = this.getBrokerAvgUsage(loadData, conf.getLoadBalancerHistoryResourcePercentage(), conf);
        Pair<Boolean, String> result = this.getMaxUsageBroker(loadData, threshold, avgUsage);
        boolean hasBrokerBelowLowerBound = (Boolean)result.getLeft();
        String maxUsageBroker = (String)result.getRight();
        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
        if (brokerData == null) {
            log.info("Load data is null or bundle <=1, skipping bundle unload.");
            return;
        }
        if (!hasBrokerBelowLowerBound) {
            log.info("No broker is below the lower bound, threshold is {}, avgUsage usage is {}, max usage of Broker {} is {}", new Object[]{threshold, avgUsage, maxUsageBroker, this.brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0)});
            return;
        }
        LocalBrokerData localData = brokerData.getLocalData();
        double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
        double minimumThroughputToOffload = brokerCurrentThroughput * threshold * 0.5;
        double minThroughputThreshold = conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * 1048576.0;
        if (minThroughputThreshold > minimumThroughputToOffload) {
            log.info("broker {} in lower boundary shedding is planning to shed throughput {} MByte/s less than minimumThroughputThreshold {} MByte/s, skipping bundle unload.", new Object[]{maxUsageBroker, minimumThroughputToOffload / 1048576.0, minThroughputThreshold / 1048576.0});
            return;
        }
        this.filterAndSelectBundle(loadData, loadData.getRecentlyUnloadedBundles(), maxUsageBroker, localData, minimumThroughputToOffload);
    }

    private Pair<Boolean, String> getMaxUsageBroker(LoadData loadData, double threshold, double avgUsage) {
        String maxUsageBrokerName = "";
        double maxUsage = avgUsage - threshold;
        boolean hasBrokerBelowLowerBound = false;
        for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
            String broker = entry.getKey();
            BrokerData brokerData = entry.getValue();
            double currentUsage = this.brokerAvgResourceUsage.getOrDefault(broker, 0.0);
            if (currentUsage > maxUsage && brokerData.getLocalData() != null && brokerData.getLocalData().getBundles().size() > 1) {
                maxUsage = currentUsage;
                maxUsageBrokerName = broker;
            }
            if (!(currentUsage < avgUsage - threshold)) continue;
            hasBrokerBelowLowerBound = true;
        }
        return Pair.of((Object)hasBrokerBelowLowerBound, (Object)maxUsageBrokerName);
    }

    @Override
    public synchronized void onActiveBrokersChange(Set<String> newBrokers) {
        this.brokerAvgResourceUsage.keySet().removeIf(key -> !newBrokers.contains(key));
    }
}

