/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.loadbalance.impl;

import java.util.HashMap;
import java.util.Map;
import org.apache.pulsar.broker.BrokerData;
import org.apache.pulsar.broker.BundleData;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TimeAverageMessageData;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OverloadShedder
implements LoadSheddingStrategy {
    private static final Logger log = LoggerFactory.getLogger(OverloadShedder.class);
    private Map<String, String> selectedBundlesCache = new HashMap<String, String>();

    public OverloadShedder(ServiceConfiguration conf) {
    }

    @Override
    public Map<String, String> findBundlesForUnloading(LoadData loadData, ServiceConfiguration conf) {
        this.selectedBundlesCache.clear();
        double overloadThreshold = (double)conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
        Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles();
        for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
            String broker = entry.getKey();
            BrokerData brokerData = entry.getValue();
            LocalBrokerData localData = brokerData.getLocalData();
            double maxUsage = localData.getMaxResourceUsage();
            if (!(maxUsage >= overloadThreshold)) continue;
            log.info("Attempting to shed load on {}, which has max resource usage {}%", (Object)broker, (Object)maxUsage);
            double maxMessageRate = Double.NEGATIVE_INFINITY;
            String mostTaxingBundle = null;
            if (localData.getBundles().size() > 1) {
                for (String bundle : localData.getBundles()) {
                    BundleData bundleData = loadData.getBundleData().get(bundle);
                    TimeAverageMessageData shortTermData = bundleData.getShortTermData();
                    double messageRate = shortTermData.getMsgRateIn() + shortTermData.getMsgRateOut();
                    if (!(messageRate > maxMessageRate) || recentlyUnloadedBundles.containsKey(bundle)) continue;
                    maxMessageRate = messageRate;
                    mostTaxingBundle = bundle;
                }
                if (mostTaxingBundle != null) {
                    this.selectedBundlesCache.put(broker, mostTaxingBundle);
                    continue;
                }
                log.warn("Load shedding could not be performed on broker {} because all bundles assigned to it have recently been unloaded");
                continue;
            }
            if (localData.getBundles().size() == 1) {
                log.warn("HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. No Load Shedding will be done on this broker", localData.getBundles().iterator().next(), (Object)broker);
                continue;
            }
            log.warn("Broker {} is overloaded despite having no bundles", (Object)broker);
        }
        return this.selectedBundlesCache;
    }
}

