/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.scheduler.multitenant;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Set;
import org.apache.storm.scheduler.SchedulerAssignment;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.multitenant.Node;
import org.apache.storm.scheduler.multitenant.NodePool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultPool
extends NodePool {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultPool.class);
    private Set<Node> _nodes = new HashSet<Node>();
    private HashMap<String, TopologyDetails> _tds = new HashMap();

    @Override
    public void addTopology(TopologyDetails td) {
        String topId = td.getId();
        LOG.debug("Adding in Topology {}", (Object)topId);
        this._tds.put(topId, td);
        SchedulerAssignment assignment = this._cluster.getAssignmentById(topId);
        if (assignment != null) {
            for (WorkerSlot ws : assignment.getSlots()) {
                Node n = (Node)this._nodeIdToNode.get(ws.getNodeId());
                this._nodes.add(n);
            }
        }
    }

    @Override
    public boolean canAdd(TopologyDetails td) {
        return true;
    }

    @Override
    public Collection<Node> takeNodes(int nodesNeeded) {
        HashSet<Node> ret = new HashSet<Node>();
        LinkedList<Node> sortedNodes = new LinkedList<Node>(this._nodes);
        Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC);
        for (Node n : sortedNodes) {
            if (nodesNeeded <= ret.size()) break;
            if (!n.isAlive()) continue;
            n.freeAllSlots(this._cluster);
            this._nodes.remove(n);
            ret.add(n);
        }
        return ret;
    }

    @Override
    public int nodesAvailable() {
        int total = 0;
        for (Node n : this._nodes) {
            if (!n.isAlive()) continue;
            ++total;
        }
        return total;
    }

    @Override
    public int slotsAvailable() {
        return Node.countTotalSlotsAlive(this._nodes);
    }

    @Override
    public NodePool.NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slotsNeeded) {
        int nodesFound = 0;
        int slotsFound = 0;
        LinkedList<Node> sortedNodes = new LinkedList<Node>(this._nodes);
        Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC);
        for (Node n : sortedNodes) {
            if (slotsNeeded <= 0) break;
            if (!n.isAlive()) continue;
            ++nodesFound;
            int totalSlotsFree = n.totalSlots();
            slotsFound += totalSlotsFree;
            slotsNeeded -= totalSlotsFree;
        }
        return new NodePool.NodeAndSlotCounts(nodesFound, slotsFound);
    }

    @Override
    public Collection<Node> takeNodesBySlots(int slotsNeeded) {
        HashSet<Node> ret = new HashSet<Node>();
        LinkedList<Node> sortedNodes = new LinkedList<Node>(this._nodes);
        Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC);
        for (Node n : sortedNodes) {
            if (slotsNeeded <= 0) break;
            if (!n.isAlive()) continue;
            n.freeAllSlots(this._cluster);
            this._nodes.remove(n);
            ret.add(n);
            slotsNeeded -= n.totalSlotsFree();
        }
        return ret;
    }

    @Override
    public void scheduleAsNeeded(NodePool ... lesserPools) {
        for (TopologyDetails td : this._tds.values()) {
            String topId = td.getId();
            if (this._cluster.needsScheduling(td)) {
                Node n;
                LOG.debug("Scheduling topology {}", (Object)topId);
                int totalTasks = td.getExecutors().size();
                int origRequest = td.getNumWorkers();
                int slotsRequested = Math.min(totalTasks, origRequest);
                int slotsUsed = Node.countSlotsUsed(topId, this._nodes);
                int slotsFree = Node.countFreeSlotsAlive(this._nodes);
                int slotsAvailable = 0;
                if (slotsRequested > slotsFree) {
                    slotsAvailable = NodePool.slotsAvailable(lesserPools);
                }
                int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree + slotsAvailable);
                int executorsNotRunning = this._cluster.getUnassignedExecutors(td).size();
                LOG.debug("Slots... requested {} used {} free {} available {} to be used {}, executors not running {}", new Object[]{slotsRequested, slotsUsed, slotsFree, slotsAvailable, slotsToUse, executorsNotRunning});
                if (slotsToUse <= 0) {
                    if (executorsNotRunning > 0) {
                        this._cluster.setStatus(topId, "Not fully scheduled (No free slots in default pool) " + executorsNotRunning + " executors not scheduled");
                        continue;
                    }
                    if (slotsUsed < slotsRequested) {
                        this._cluster.setStatus(topId, "Running with fewer slots than requested (" + slotsUsed + "/" + origRequest + ")");
                        continue;
                    }
                    this._cluster.setStatus(topId, "Fully Scheduled (requested " + origRequest + " slots, but could only use " + slotsUsed + ")");
                    continue;
                }
                int slotsNeeded = slotsToUse - slotsFree;
                if (slotsNeeded > 0) {
                    this._nodes.addAll(NodePool.takeNodesBySlot(slotsNeeded, lesserPools));
                }
                if (executorsNotRunning <= 0) {
                    for (Node n2 : this._nodes) {
                        n2.freeTopology(topId, this._cluster);
                    }
                    slotsFree = Node.countFreeSlotsAlive(this._nodes);
                    slotsToUse = Math.min(slotsRequested, slotsFree);
                }
                NodePool.RoundRobinSlotScheduler slotSched = new NodePool.RoundRobinSlotScheduler(td, slotsToUse, this._cluster);
                LinkedList<Node> nodes = new LinkedList<Node>(this._nodes);
                do {
                    if (nodes.isEmpty()) {
                        throw new IllegalStateException("This should not happen, we messed up and did not get enough slots");
                    }
                    n = nodes.peekFirst();
                    if (n.totalSlotsFree() != 0) continue;
                    nodes.remove();
                    n = null;
                } while (n == null || slotSched.assignSlotTo(n));
                int afterSchedSlotsUsed = Node.countSlotsUsed(topId, this._nodes);
                if (afterSchedSlotsUsed < slotsRequested) {
                    this._cluster.setStatus(topId, "Running with fewer slots than requested (" + afterSchedSlotsUsed + "/" + origRequest + ")");
                    continue;
                }
                if (afterSchedSlotsUsed < origRequest) {
                    this._cluster.setStatus(topId, "Fully Scheduled (requested " + origRequest + " slots, but could only use " + afterSchedSlotsUsed + ")");
                    continue;
                }
                this._cluster.setStatus(topId, "Fully Scheduled");
                continue;
            }
            this._cluster.setStatus(topId, "Fully Scheduled");
        }
    }

    public String toString() {
        return "DefaultPool  " + this._nodes.size() + " nodes " + this._tds.size() + " topologies";
    }
}

