/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.scheduler.resource.strategies.scheduling;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.SchedulerAssignment;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.resource.RAS_Node;
import org.apache.storm.scheduler.resource.RAS_Nodes;
import org.apache.storm.scheduler.resource.SchedulingResult;
import org.apache.storm.scheduler.resource.SchedulingStatus;
import org.apache.storm.scheduler.resource.strategies.scheduling.BaseResourceAwareStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.GenericResourceAwareStrategy;
import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConstraintSolverStrategy
extends BaseResourceAwareStrategy {
    public static final int MAX_STATE_SEARCH = 100000;
    public static final int DEFAULT_STATE_SEARCH = 10000;
    private static final Logger LOG = LoggerFactory.getLogger(ConstraintSolverStrategy.class);
    private Map<String, Map<String, Integer>> constraintMatrix;
    private HashSet<String> spreadComps = new HashSet();
    private Map<String, RAS_Node> nodes;
    private Map<ExecutorDetails, String> execToComp;
    private Map<String, Set<ExecutorDetails>> compToExecs;
    private List<String> favoredNodeIds;
    private List<String> unFavoredNodeIds;

    static Map<String, Map<String, Integer>> getConstraintMap(TopologyDetails topo, Set<String> comps) {
        HashMap<String, Map<String, Integer>> matrix = new HashMap<String, Map<String, Integer>>();
        for (String comp : comps) {
            matrix.put(comp, new HashMap());
            for (String comp2 : comps) {
                ((Map)matrix.get(comp)).put(comp2, 0);
            }
        }
        List constraints = (List)topo.getConf().get("topology.ras.constraints");
        if (constraints != null) {
            for (List constraintPair : constraints) {
                String comp1 = (String)constraintPair.get(0);
                String comp2 = (String)constraintPair.get(1);
                if (!matrix.containsKey(comp1)) {
                    LOG.warn("Comp: {} declared in constraints is not valid!", (Object)comp1);
                    continue;
                }
                if (!matrix.containsKey(comp2)) {
                    LOG.warn("Comp: {} declared in constraints is not valid!", (Object)comp2);
                    continue;
                }
                ((Map)matrix.get(comp1)).put(comp2, 1);
                ((Map)matrix.get(comp2)).put(comp1, 1);
            }
        }
        return matrix;
    }

    @VisibleForTesting
    public static boolean validateSolution(Cluster cluster, TopologyDetails td) {
        return ConstraintSolverStrategy.checkSpreadSchedulingValid(cluster, td) && ConstraintSolverStrategy.checkConstraintsSatisfied(cluster, td) && ConstraintSolverStrategy.checkResourcesCorrect(cluster, td);
    }

    private static boolean checkConstraintsSatisfied(Cluster cluster, TopologyDetails topo) {
        LOG.info("Checking constraints...");
        assert (cluster.getAssignmentById(topo.getId()) != null);
        Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
        Map<ExecutorDetails, String> execToComp = topo.getExecutorToComponent();
        Map<String, Map<String, Integer>> constraintMatrix = ConstraintSolverStrategy.getConstraintMap(topo, new HashSet<String>(topo.getExecutorToComponent().values()));
        HashMap workerCompMap = new HashMap();
        result.forEach((exec, worker) -> {
            String comp = (String)execToComp.get(exec);
            workerCompMap.computeIfAbsent(worker, k -> new HashSet()).add(comp);
        });
        for (Map.Entry entry : workerCompMap.entrySet()) {
            Set comps = (Set)entry.getValue();
            for (String comp1 : comps) {
                for (String comp2 : comps) {
                    if (comp1.equals(comp2) || constraintMatrix.get(comp1).get(comp2) == 0) continue;
                    LOG.error("Incorrect Scheduling: worker exclusion for Component {} and {} not satisfied on WorkerSlot: {}", new Object[]{comp1, comp2, entry.getKey()});
                    return false;
                }
            }
        }
        return true;
    }

    private static Map<WorkerSlot, RAS_Node> workerToNodes(Cluster cluster) {
        HashMap<WorkerSlot, RAS_Node> workerToNodes = new HashMap<WorkerSlot, RAS_Node>();
        for (RAS_Node node : RAS_Nodes.getAllNodesFrom(cluster).values()) {
            for (WorkerSlot s : node.getUsedSlots()) {
                workerToNodes.put(s, node);
            }
        }
        return workerToNodes;
    }

    private static boolean checkSpreadSchedulingValid(Cluster cluster, TopologyDetails topo) {
        LOG.info("Checking for a valid scheduling...");
        assert (cluster.getAssignmentById(topo.getId()) != null);
        Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
        Map<ExecutorDetails, String> execToComp = topo.getExecutorToComponent();
        HashMap<WorkerSlot, HashSet> workerExecMap = new HashMap<WorkerSlot, HashSet>();
        HashMap<WorkerSlot, HashSet> workerCompMap = new HashMap<WorkerSlot, HashSet>();
        HashMap<RAS_Node, HashSet> nodeCompMap = new HashMap<RAS_Node, HashSet>();
        Map<WorkerSlot, RAS_Node> workerToNodes = ConstraintSolverStrategy.workerToNodes(cluster);
        boolean ret = true;
        HashSet<String> spreadComps = ConstraintSolverStrategy.getSpreadComps(topo);
        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : result.entrySet()) {
            ExecutorDetails exec = entry.getKey();
            WorkerSlot worker = entry.getValue();
            RAS_Node node = workerToNodes.get(worker);
            if (workerExecMap.computeIfAbsent(worker, k -> new HashSet()).contains(exec)) {
                LOG.error("Incorrect Scheduling: Found duplicate in scheduling");
                return false;
            }
            ((HashSet)workerExecMap.get(worker)).add(exec);
            String comp = execToComp.get(exec);
            workerCompMap.computeIfAbsent(worker, k -> new HashSet()).add(comp);
            if (spreadComps.contains(comp) && nodeCompMap.computeIfAbsent(node, k -> new HashSet()).contains(comp)) {
                LOG.error("Incorrect Scheduling: Spread for Component: {} {} on node {} not satisfied {}", new Object[]{comp, exec, node.getId(), nodeCompMap.get(node)});
                ret = false;
            }
            nodeCompMap.computeIfAbsent(node, k -> new HashSet()).add(comp);
        }
        return ret;
    }

    private static boolean checkResourcesCorrect(Cluster cluster, TopologyDetails topo) {
        LOG.info("Checking Resources...");
        assert (cluster.getAssignmentById(topo.getId()) != null);
        Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
        HashMap<RAS_Node, Collection> nodeToExecs = new HashMap<RAS_Node, Collection>();
        HashMap<ExecutorDetails, WorkerSlot> mergedExecToWorker = new HashMap<ExecutorDetails, WorkerSlot>();
        Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster);
        if (cluster.getAssignmentById(topo.getId()) != null && cluster.getAssignmentById(topo.getId()).getExecutorToSlot() != null) {
            mergedExecToWorker.putAll(cluster.getAssignmentById(topo.getId()).getExecutorToSlot());
        }
        mergedExecToWorker.putAll(result);
        for (Map.Entry entry : mergedExecToWorker.entrySet()) {
            ExecutorDetails exec = (ExecutorDetails)entry.getKey();
            WorkerSlot worker = (WorkerSlot)entry.getValue();
            RAS_Node node = nodes.get(worker.getNodeId());
            if (node.getAvailableMemoryResources() < 0.0 && node.getAvailableCpuResources() < 0.0) {
                LOG.error("Incorrect Scheduling: found node with negative available resources");
                return false;
            }
            nodeToExecs.computeIfAbsent(node, k -> new HashSet()).add(exec);
        }
        for (Map.Entry entry : nodeToExecs.entrySet()) {
            RAS_Node node = (RAS_Node)entry.getKey();
            Collection execs = (Collection)entry.getValue();
            double cpuUsed = 0.0;
            double memoryUsed = 0.0;
            for (ExecutorDetails exec : execs) {
                cpuUsed += topo.getTotalCpuReqTask(exec).doubleValue();
                memoryUsed += topo.getTotalMemReqTask(exec).doubleValue();
            }
            if (node.getAvailableCpuResources() != node.getTotalCpuResources() - cpuUsed) {
                LOG.error("Incorrect Scheduling: node {} has consumed incorrect amount of cpu. Expected: {} Actual: {} Executors scheduled on node: {}", new Object[]{node.getId(), node.getTotalCpuResources() - cpuUsed, node.getAvailableCpuResources(), execs});
                return false;
            }
            if (node.getAvailableMemoryResources() == node.getTotalMemoryResources() - memoryUsed) continue;
            LOG.error("Incorrect Scheduling: node {} has consumed incorrect amount of memory. Expected: {} Actual: {} Executors scheduled on node: {}", new Object[]{node.getId(), node.getTotalMemoryResources() - memoryUsed, node.getAvailableMemoryResources(), execs});
            return false;
        }
        return true;
    }

    private static HashSet<String> getSpreadComps(TopologyDetails topo) {
        HashSet<String> retSet = new HashSet<String>();
        List spread = (List)topo.getConf().get("topology.spread.components");
        if (spread != null) {
            Set<String> comps = topo.getComponents().keySet();
            for (String comp : spread) {
                if (comps.contains(comp)) {
                    retSet.add(comp);
                    continue;
                }
                LOG.warn("Comp {} declared for spread not valid", (Object)comp);
            }
        }
        return retSet;
    }

    @Override
    public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
        this.prepare(cluster);
        LOG.debug("Scheduling {}", (Object)td.getId());
        this.nodes = RAS_Nodes.getAllNodesFrom(cluster);
        HashMap workerCompAssignment = new HashMap();
        HashMap nodeCompAssignment = new HashMap();
        int maxStateSearch = Math.min(100000, ObjectReader.getInt((Object)td.getConf().get("topology.ras.constraint.max.state.search"), (Integer)10000));
        long maxTimeMs = (long)ObjectReader.getInt((Object)td.getConf().get("topology.ras.constraint.max.time.secs"), (Integer)-1).intValue() * 1000L;
        this.favoredNodeIds = this.makeHostToNodeIds((List)td.getConf().get("topology.scheduler.favored.nodes"));
        this.unFavoredNodeIds = this.makeHostToNodeIds((List)td.getConf().get("topology.scheduler.unfavored.nodes"));
        this.execToComp = td.getExecutorToComponent();
        this.compToExecs = this.getCompToExecs(this.execToComp);
        this.constraintMatrix = ConstraintSolverStrategy.getConstraintMap(td, this.compToExecs.keySet());
        this.spreadComps = ConstraintSolverStrategy.getSpreadComps(td);
        HashSet<ExecutorDetails> unassignedExecutors = new HashSet<ExecutorDetails>(cluster.getUnassignedExecutors(td));
        List sortedExecs = this.getSortedExecs(this.spreadComps, this.constraintMatrix, this.compToExecs).stream().filter(unassignedExecutors::contains).collect(Collectors.toList());
        SchedulerAssignment existingAssignment = cluster.getAssignmentById(td.getId());
        if (existingAssignment != null) {
            existingAssignment.getExecutorToSlot().forEach((exec, ws) -> {
                String compId = this.execToComp.get(exec);
                RAS_Node node = this.nodes.get(ws.getNodeId());
                nodeCompAssignment.computeIfAbsent(node, k -> new HashSet()).add(compId);
                workerCompAssignment.computeIfAbsent(ws, k -> new HashSet()).add(compId);
            });
        }
        if (!this.checkSchedulingFeasibility()) {
            return SchedulingResult.failure(SchedulingStatus.FAIL_OTHER, "Scheduling not feasible!");
        }
        return this.backtrackSearch(new SearcherState(workerCompAssignment, nodeCompAssignment, maxStateSearch, maxTimeMs, sortedExecs, td)).asSchedulingResult();
    }

    private boolean checkSchedulingFeasibility() {
        for (String comp : this.spreadComps) {
            int numExecs = this.compToExecs.get(comp).size();
            if (numExecs <= this.nodes.size()) continue;
            LOG.error("Unsatisfiable constraint: Component: {} marked as spread has {} executors which is larger than number of nodes: {}", new Object[]{comp, numExecs, this.nodes.size()});
            return false;
        }
        if (this.execToComp.size() >= 100000) {
            LOG.error("Number of executors is greater than the maximum number of states allowed to be searched.  # of executors: {} Max states to search: {}", (Object)this.execToComp.size(), (Object)100000);
            return false;
        }
        return true;
    }

    @Override
    protected TreeSet<BaseResourceAwareStrategy.ObjectResources> sortObjectResources(BaseResourceAwareStrategy.AllResources allResources, ExecutorDetails exec, TopologyDetails topologyDetails, BaseResourceAwareStrategy.ExistingScheduleFunc existingScheduleFunc) {
        return GenericResourceAwareStrategy.sortObjectResourcesImpl(allResources, exec, topologyDetails, existingScheduleFunc);
    }

    @VisibleForTesting
    protected SolverResult backtrackSearch(SearcherState state) {
        state.incStatesSearched();
        if (state.areSearchLimitsExceeded()) {
            LOG.warn("Limits Exceeded");
            return new SolverResult(state, false);
        }
        if (Thread.currentThread().isInterrupted()) {
            return new SolverResult(state, false);
        }
        ExecutorDetails exec = state.currentExec();
        Iterable<String> sortedNodes = this.sortAllNodes(state.td, exec, this.favoredNodeIds, this.unFavoredNodeIds);
        for (String nodeId : sortedNodes) {
            RAS_Node node = this.nodes.get(nodeId);
            for (WorkerSlot workerSlot : node.getSlotsAvailableToScheduleOn()) {
                if (!this.isExecAssignmentToWorkerValid(workerSlot, state)) continue;
                state.tryToSchedule(this.execToComp, node, workerSlot);
                if (state.areAllExecsScheduled()) {
                    return new SolverResult(state, true);
                }
                SolverResult results = this.backtrackSearch(state.nextExecutor());
                if (results.success) {
                    return results;
                }
                if (state.areSearchLimitsExceeded()) {
                    return new SolverResult(state, false);
                }
                state.backtrack(this.execToComp, node, workerSlot);
            }
        }
        return new SolverResult(state, false);
    }

    public boolean isExecAssignmentToWorkerValid(WorkerSlot worker, SearcherState state) {
        ExecutorDetails exec = state.currentExec();
        RAS_Node node = this.nodes.get(worker.getNodeId());
        if (!node.wouldFit(worker, exec, state.td)) {
            LOG.trace("{} would not fit in resources available on {}", (Object)exec, (Object)worker);
            return false;
        }
        String execComp = this.execToComp.get(exec);
        Set components = (Set)state.workerCompAssignment.get(worker);
        if (components != null) {
            Map<String, Integer> subMatrix = this.constraintMatrix.get(execComp);
            for (String comp : components) {
                if (subMatrix.get(comp) == 0) continue;
                LOG.trace("{} found {} constraint violation {} on {}", new Object[]{exec, execComp, comp, worker});
                return false;
            }
        }
        if (this.spreadComps.contains(execComp) && state.nodeCompAssignment.computeIfAbsent(node, k -> new HashSet()).contains(execComp)) {
            LOG.trace("{} Found spread violation {} on node {}", new Object[]{exec, execComp, node.getId()});
            return false;
        }
        return true;
    }

    private Map<String, Set<ExecutorDetails>> getCompToExecs(Map<ExecutorDetails, String> executorToComp) {
        HashMap<String, Set<ExecutorDetails>> retMap = new HashMap<String, Set<ExecutorDetails>>();
        executorToComp.forEach((exec, comp) -> retMap.computeIfAbsent((String)comp, k -> new HashSet()).add(exec));
        return retMap;
    }

    private ArrayList<ExecutorDetails> getSortedExecs(HashSet<String> spreadComps, Map<String, Map<String, Integer>> constraintMatrix, Map<String, Set<ExecutorDetails>> compToExecs) {
        ArrayList<ExecutorDetails> retList = new ArrayList<ExecutorDetails>();
        HashMap compConstraintCountMap = new HashMap();
        constraintMatrix.forEach((comp, subMatrix) -> {
            int count = subMatrix.values().stream().mapToInt(Number::intValue).sum();
            if (spreadComps.contains(comp)) {
                ++count;
            }
            compConstraintCountMap.put(comp, count);
        });
        NavigableMap sortedCompConstraintCountMap = this.sortByValues(compConstraintCountMap);
        for (String comp2 : sortedCompConstraintCountMap.keySet()) {
            retList.addAll((Collection<ExecutorDetails>)compToExecs.get(comp2));
        }
        return retList;
    }

    @VisibleForTesting
    public <K extends Comparable<K>, V extends Comparable<V>> NavigableMap<K, V> sortByValues(Map<K, V> map) {
        Comparator valueComparator = (k1, k2) -> {
            int compare = ((Comparable)map.get(k2)).compareTo(map.get(k1));
            if (compare == 0) {
                return k2.compareTo(k1);
            }
            return compare;
        };
        TreeMap<K, V> sortedByValues = new TreeMap<K, V>(valueComparator);
        sortedByValues.putAll(map);
        return sortedByValues;
    }

    protected static class SearcherState {
        final long startTimeMillis;
        private final long maxEndTimeMs;
        private final Map<WorkerSlot, Set<String>> workerCompAssignment;
        private final boolean[] okToRemoveFromWorker;
        private final Map<RAS_Node, Set<String>> nodeCompAssignment;
        private final boolean[] okToRemoveFromNode;
        private final List<ExecutorDetails> execs;
        private final int maxStatesSearched;
        private final TopologyDetails td;
        private int statesSearched = 0;
        private int numBacktrack = 0;
        private int execIndex = 0;

        private SearcherState(Map<WorkerSlot, Set<String>> workerCompAssignment, Map<RAS_Node, Set<String>> nodeCompAssignment, int maxStatesSearched, long maxTimeMs, List<ExecutorDetails> execs, TopologyDetails td) {
            assert (!execs.isEmpty());
            assert (execs != null);
            this.workerCompAssignment = workerCompAssignment;
            this.nodeCompAssignment = nodeCompAssignment;
            this.maxStatesSearched = maxStatesSearched;
            this.execs = execs;
            this.okToRemoveFromWorker = new boolean[execs.size()];
            this.okToRemoveFromNode = new boolean[execs.size()];
            this.td = td;
            this.startTimeMillis = Time.currentTimeMillis();
            this.maxEndTimeMs = maxTimeMs <= 0L ? Long.MAX_VALUE : this.startTimeMillis + maxTimeMs;
        }

        public void incStatesSearched() {
            ++this.statesSearched;
            if (LOG.isDebugEnabled() && this.statesSearched % 1000 == 0) {
                LOG.debug("States Searched: {}", (Object)this.statesSearched);
                LOG.debug("backtrack: {}", (Object)this.numBacktrack);
            }
        }

        public int getStatesSearched() {
            return this.statesSearched;
        }

        public boolean areSearchLimitsExceeded() {
            return this.statesSearched > this.maxStatesSearched || Time.currentTimeMillis() > this.maxEndTimeMs;
        }

        public SearcherState nextExecutor() {
            ++this.execIndex;
            if (this.execIndex >= this.execs.size()) {
                throw new IllegalStateException("Internal Error: exceeded the exec limit " + this.execIndex + " >= " + this.execs.size());
            }
            return this;
        }

        public boolean areAllExecsScheduled() {
            return this.execIndex == this.execs.size() - 1;
        }

        public ExecutorDetails currentExec() {
            return this.execs.get(this.execIndex);
        }

        public void tryToSchedule(Map<ExecutorDetails, String> execToComp, RAS_Node node, WorkerSlot workerSlot) {
            ExecutorDetails exec = this.currentExec();
            String comp = execToComp.get(exec);
            LOG.trace("Trying assignment of {} {} to {}", new Object[]{exec, comp, workerSlot});
            this.okToRemoveFromWorker[this.execIndex] = this.workerCompAssignment.computeIfAbsent(workerSlot, k -> new HashSet()).add(comp);
            this.okToRemoveFromNode[this.execIndex] = this.nodeCompAssignment.computeIfAbsent(node, k -> new HashSet()).add(comp);
            node.assignSingleExecutor(workerSlot, exec, this.td);
        }

        public void backtrack(Map<ExecutorDetails, String> execToComp, RAS_Node node, WorkerSlot workerSlot) {
            --this.execIndex;
            if (this.execIndex < 0) {
                throw new IllegalStateException("Internal Error: exec index became negative");
            }
            ++this.numBacktrack;
            ExecutorDetails exec = this.currentExec();
            String comp = execToComp.get(exec);
            LOG.trace("Backtracking {} {} from {}", new Object[]{exec, comp, workerSlot});
            if (this.okToRemoveFromWorker[this.execIndex]) {
                this.workerCompAssignment.get(workerSlot).remove(comp);
                this.okToRemoveFromWorker[this.execIndex] = false;
            }
            if (this.okToRemoveFromNode[this.execIndex]) {
                this.nodeCompAssignment.get(node).remove(comp);
                this.okToRemoveFromNode[this.execIndex] = false;
            }
            node.freeSingleExecutor(exec, this.td);
        }
    }

    protected static class SolverResult {
        private final int statesSearched;
        private final boolean success;
        private final long timeTakenMillis;
        private final int backtracked;

        public SolverResult(SearcherState state, boolean success) {
            this.statesSearched = state.getStatesSearched();
            this.success = success;
            this.timeTakenMillis = Time.currentTimeMillis() - state.startTimeMillis;
            this.backtracked = state.numBacktrack;
        }

        public SchedulingResult asSchedulingResult() {
            if (this.success) {
                return SchedulingResult.success("Fully Scheduled by ConstraintSolverStrategy (" + this.statesSearched + " states traversed in " + this.timeTakenMillis + "ms, backtracked " + this.backtracked + " times)");
            }
            return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "Cannot find scheduling that satisfies all constraints (" + this.statesSearched + " states traversed in " + this.timeTakenMillis + "ms, backtracked " + this.backtracked + " times)");
        }
    }
}

