/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.scheduler.blacklist.strategies;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.SupervisorDetails;
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.blacklist.reporters.IReporter;
import org.apache.storm.scheduler.blacklist.reporters.LogReporter;
import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
import org.apache.storm.utils.ObjectReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultBlacklistStrategy
implements IBlacklistStrategy {
    public static final int DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME = 1800;
    public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT = 3;
    private static final Logger LOG = LoggerFactory.getLogger(DefaultBlacklistStrategy.class);
    private IReporter reporter;
    private int toleranceCount;
    private int resumeTime;
    private int nimbusMonitorFreqSecs;
    private TreeMap<String, Integer> blacklist;

    @Override
    public void prepare(Map<String, Object> conf) {
        this.toleranceCount = ObjectReader.getInt((Object)conf.get("blacklist.scheduler.tolerance.count"), (Integer)3);
        this.resumeTime = ObjectReader.getInt((Object)conf.get("blacklist.scheduler.resume.time.secs"), (Integer)1800);
        String reporterClassName = ObjectReader.getString((Object)conf.get("blacklist.scheduler.reporter"), (String)LogReporter.class.getName());
        this.reporter = (IReporter)this.initializeInstance(reporterClassName, "blacklist reporter");
        this.nimbusMonitorFreqSecs = ObjectReader.getInt((Object)conf.get("nimbus.monitor.freq.secs"));
        this.blacklist = new TreeMap();
    }

    @Override
    public Set<String> getBlacklist(List<Map<String, Set<Integer>>> supervisorsWithFailures, Cluster cluster, Topologies topologies) {
        HashMap<String, Integer> countMap = new HashMap<String, Integer>();
        for (Map<String, Set<Integer>> map : supervisorsWithFailures) {
            Set<String> supervisors = map.keySet();
            for (String supervisor : supervisors) {
                int supervisorCount = countMap.getOrDefault(supervisor, 0);
                countMap.put(supervisor, supervisorCount + 1);
            }
        }
        for (Map.Entry entry : countMap.entrySet()) {
            String supervisor = (String)entry.getKey();
            int count = (Integer)entry.getValue();
            if (count < this.toleranceCount || this.blacklist.containsKey(supervisor)) continue;
            LOG.debug("Added supervisor {} to blacklist", (Object)supervisor);
            LOG.debug("supervisorsWithFailures : {}", supervisorsWithFailures);
            this.reporter.reportBlacklist(supervisor, supervisorsWithFailures);
            this.blacklist.put(supervisor, this.resumeTime / this.nimbusMonitorFreqSecs);
        }
        Set<String> toRelease = this.releaseBlacklistWhenNeeded(cluster, new ArrayList<String>(this.blacklist.keySet()));
        if (toRelease != null) {
            LOG.debug("Releasing {} nodes because of low resources", (Object)toRelease.size());
            for (String key : toRelease) {
                this.blacklist.remove(key);
            }
        }
        return this.blacklist.keySet();
    }

    @Override
    public void resumeFromBlacklist() {
        HashSet<String> readyToRemove = new HashSet<String>();
        for (Map.Entry<String, Integer> entry : this.blacklist.entrySet()) {
            String supervisor = entry.getKey();
            int countUntilResume = entry.getValue() - 1;
            if (countUntilResume == 0) {
                readyToRemove.add(supervisor);
                continue;
            }
            this.blacklist.put(supervisor, countUntilResume);
        }
        for (String key : readyToRemove) {
            this.blacklist.remove(key);
            LOG.info("Supervisor {} has been blacklisted more than resume period. Removed from blacklist.", (Object)key);
        }
    }

    protected Set<String> releaseBlacklistWhenNeeded(Cluster cluster, List<String> blacklistedNodeIds) {
        HashSet<String> readyToRemove = new HashSet<String>();
        if (blacklistedNodeIds.size() > 0) {
            int availableSlots = cluster.getNonBlacklistedAvailableSlots(blacklistedNodeIds).size();
            int neededSlots = 0;
            for (TopologyDetails td : cluster.needsSchedulingTopologies()) {
                int slots = td.getNumWorkers();
                int assignedSlots = cluster.getAssignedNumWorkers(td);
                int tdSlotsNeeded = slots - assignedSlots;
                neededSlots += tdSlotsNeeded;
            }
            Map<String, SupervisorDetails> availableSupervisors = cluster.getSupervisors();
            int shortageSlots = neededSlots - availableSlots;
            LOG.debug("Need {} slots.", (Object)neededSlots);
            LOG.debug("Available {} slots.", (Object)availableSlots);
            LOG.debug("Shortage {} slots.", (Object)shortageSlots);
            if (shortageSlots > 0) {
                LOG.info("Need {} slots more. Releasing some blacklisted nodes to cover it.", (Object)shortageSlots);
                Map<String, Set<String>> hostToSupervisorIds = this.createHostToSupervisorMap(blacklistedNodeIds, cluster);
                for (Set<String> supervisorIds : hostToSupervisorIds.values()) {
                    for (String supervisorId : supervisorIds) {
                        SupervisorDetails sd = availableSupervisors.get(supervisorId);
                        if (sd == null) continue;
                        int sdAvailableSlots = cluster.getAvailablePorts(sd).size();
                        readyToRemove.add(supervisorId);
                        LOG.debug("Releasing {} with {} slots leaving {} slots to go", new Object[]{supervisorId, sdAvailableSlots, shortageSlots -= sdAvailableSlots});
                    }
                    if (shortageSlots > 0) continue;
                    break;
                }
            }
        }
        return readyToRemove;
    }

    private Object initializeInstance(String className, String representation) {
        try {
            return Class.forName(className).newInstance();
        }
        catch (ClassNotFoundException e) {
            LOG.error("Can't find {} for name {}", (Object)representation, (Object)className);
            throw new RuntimeException(e);
        }
        catch (InstantiationException e) {
            LOG.error("Throw InstantiationException {} for name {}", (Object)representation, (Object)className);
            throw new RuntimeException(e);
        }
        catch (IllegalAccessException e) {
            LOG.error("Throw IllegalAccessException {} for name {}", (Object)representation, (Object)className);
            throw new RuntimeException(e);
        }
    }

    protected Map<String, Set<String>> createHostToSupervisorMap(List<String> blacklistedNodeIds, Cluster cluster) {
        TreeMap<String, Set<String>> hostToSupervisorMap = new TreeMap<String, Set<String>>();
        for (String supervisorId : blacklistedNodeIds) {
            String hostname = cluster.getHost(supervisorId);
            if (hostname == null) continue;
            HashSet<String> supervisorIds = (HashSet<String>)hostToSupervisorMap.get(hostname);
            if (supervisorIds == null) {
                supervisorIds = new HashSet<String>();
                hostToSupervisorMap.put(hostname, supervisorIds);
            }
            supervisorIds.add(supervisorId);
        }
        return hostToSupervisorMap;
    }
}

