/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.daemon.nimbus;

import backtype.storm.generated.StormTopology;
import backtype.storm.scheduler.WorkerSlot;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Cluster;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.cluster.StormBase;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.cluster.StormStatus;
import com.alibaba.jstorm.daemon.nimbus.NimbusData;
import com.alibaba.jstorm.daemon.nimbus.NimbusUtils;
import com.alibaba.jstorm.daemon.nimbus.StatusType;
import com.alibaba.jstorm.daemon.nimbus.TopologyAssignEvent;
import com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable;
import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo;
import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.schedule.AssignmentBak;
import com.alibaba.jstorm.schedule.IToplogyScheduler;
import com.alibaba.jstorm.schedule.TopologyAssignContext;
import com.alibaba.jstorm.schedule.default_assign.DefaultTopologyScheduler;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.alibaba.jstorm.task.TaskInfo;
import com.alibaba.jstorm.utils.FailedAssignTopologyException;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.PathUtils;
import com.alibaba.jstorm.utils.TimeUtils;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopologyAssign
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(TopologyAssign.class);
    private static TopologyAssign instance = null;
    protected NimbusData nimbusData;
    protected Map<String, IToplogyScheduler> schedulers;
    private Thread thread;
    private int cleanupTimeoutSec = 60;
    public static final String DEFAULT_SCHEDULER_NAME = "default";
    protected static LinkedBlockingQueue<TopologyAssignEvent> queue = new LinkedBlockingQueue();
    volatile boolean runFlag = false;

    private TopologyAssign() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static TopologyAssign getInstance() {
        Class<TopologyAssign> clazz = TopologyAssign.class;
        synchronized (TopologyAssign.class) {
            if (instance == null) {
                instance = new TopologyAssign();
            }
            // ** MonitorExit[var0] (shouldn't be in output)
            return instance;
        }
    }

    public void init(NimbusData nimbusData) {
        this.nimbusData = nimbusData;
        this.schedulers = new HashMap<String, IToplogyScheduler>();
        DefaultTopologyScheduler defaultScheduler = new DefaultTopologyScheduler();
        defaultScheduler.prepare(nimbusData.getConf());
        this.schedulers.put(DEFAULT_SCHEDULER_NAME, defaultScheduler);
        this.thread = new Thread(this);
        this.thread.setName("TopologyAssign");
        this.thread.setDaemon(true);
        this.thread.start();
    }

    public void cleanup() {
        this.runFlag = false;
        this.thread.interrupt();
    }

    public static void push(TopologyAssignEvent event) {
        queue.offer(event);
    }

    @Override
    public void run() {
        LOG.info("TopologyAssign thread has been started");
        this.runFlag = true;
        while (this.runFlag) {
            boolean isSuccess;
            TopologyAssignEvent event;
            try {
                event = queue.take();
            }
            catch (InterruptedException e1) {
                continue;
            }
            if (event == null || !(isSuccess = this.doTopologyAssignment(event))) continue;
            try {
                this.cleanupDisappearedTopology();
            }
            catch (Exception e) {
                LOG.error("Failed to do cleanup disappear topology ", (Throwable)e);
            }
        }
    }

    protected boolean doTopologyAssignment(TopologyAssignEvent event) {
        Assignment assignment;
        try {
            Assignment oldAssignment = null;
            boolean isReassign = event.isScratch();
            if (isReassign) {
                oldAssignment = this.nimbusData.getStormClusterState().assignment_info(event.getTopologyId(), null);
            }
            assignment = this.mkAssignment(event);
            TopologyMetricsRunnable.TaskStartEvent taskEvent = new TopologyMetricsRunnable.TaskStartEvent();
            taskEvent.oldAssignment = oldAssignment;
            taskEvent.newAssignment = assignment;
            taskEvent.topologyId = event.getTopologyId();
            taskEvent.clusterName = this.nimbusData.getClusterName();
            taskEvent.timestamp = System.currentTimeMillis();
            Map<Integer, TaskInfo> taskInfoMap = Cluster.get_all_taskInfo(this.nimbusData.getStormClusterState(), event.getTopologyId());
            Map<Integer, String> task2Component = taskInfoMap != null ? Common.getTaskToComponent(taskInfoMap) : Common.getTaskToComponent(Cluster.get_all_taskInfo(this.nimbusData.getStormClusterState(), event.getTopologyId()));
            taskEvent.task2Component = task2Component;
            this.nimbusData.getMetricRunnable().pushEvent(taskEvent);
            if (!isReassign) {
                this.setTopologyStatus(event);
            }
        }
        catch (Throwable e) {
            LOG.error("Failed to assign topology " + event.getTopologyId(), e);
            event.fail(e.getMessage());
            return false;
        }
        if (assignment != null) {
            this.backupAssignment(assignment, event);
        }
        event.done();
        return true;
    }

    public void cleanupDisappearedTopology() throws Exception {
        StormClusterState clusterState = this.nimbusData.getStormClusterState();
        List<String> active_topologys = clusterState.active_storms();
        if (active_topologys == null) {
            return;
        }
        Set<String> cleanupIds = this.get_cleanup_ids(clusterState, active_topologys);
        for (String topologyId : cleanupIds) {
            LOG.info("Cleaning up " + topologyId);
            clusterState.try_remove_storm(topologyId);
            this.nimbusData.getTaskHeartbeatsCache().remove(topologyId);
            this.nimbusData.getTasksHeartbeat().remove(topologyId);
            NimbusUtils.removeTopologyTaskTimeout(this.nimbusData, topologyId);
            String master_stormdist_root = StormConfig.masterStormdistRoot(this.nimbusData.getConf(), topologyId);
            try {
                PathUtils.rmr(master_stormdist_root);
            }
            catch (IOException e) {
                LOG.warn("Failed to delete " + master_stormdist_root + ",", (Throwable)e);
            }
        }
    }

    private void get_code_ids(List<String> code_ids, HashSet<String> latest_code_ids) throws IOException {
        Map<Object, Object> conf = this.nimbusData.getConf();
        String master_stormdist_root = StormConfig.masterStormdistRoot(conf);
        List<String> all_code_ids = PathUtils.read_dir_contents(master_stormdist_root);
        code_ids.addAll(all_code_ids);
        long now = System.currentTimeMillis();
        for (String dir : code_ids) {
            File file = new File(master_stormdist_root + File.separator + dir);
            try {
                if (!file.exists()) continue;
                long l = file.lastModified();
            }
            catch (Exception exception) {
                LOG.error("Failed to get modify time of " + dir, (Throwable)exception);
            }
        }
    }

    private Set<String> get_cleanup_ids(StormClusterState clusterState, List<String> active_topologys) throws Exception {
        List<String> task_ids = clusterState.task_storms();
        List<String> heartbeat_ids = clusterState.heartbeat_storms();
        List<String> error_ids = clusterState.task_error_storms();
        List<String> assignment_ids = clusterState.assignments(null);
        List<String> metric_ids = clusterState.get_metrics();
        List<String> backpressure_ids = clusterState.backpressureInfos();
        ArrayList<String> code_ids = new ArrayList<String>();
        HashSet<String> latest_code_ids = new HashSet<String>();
        this.get_code_ids(code_ids, latest_code_ids);
        HashSet<String> to_cleanup_ids = new HashSet<String>();
        Set<String> pendingTopologys = this.nimbusData.getPendingSubmitTopoloygs().keySet();
        if (task_ids != null) {
            to_cleanup_ids.addAll(task_ids);
        }
        if (heartbeat_ids != null) {
            to_cleanup_ids.addAll(heartbeat_ids);
        }
        if (error_ids != null) {
            to_cleanup_ids.addAll(error_ids);
        }
        if (assignment_ids != null) {
            to_cleanup_ids.addAll(assignment_ids);
        }
        if (code_ids != null) {
            to_cleanup_ids.addAll(code_ids);
        }
        if (metric_ids != null) {
            to_cleanup_ids.addAll(metric_ids);
        }
        if (backpressure_ids != null) {
            to_cleanup_ids.addAll(backpressure_ids);
        }
        if (active_topologys != null) {
            to_cleanup_ids.removeAll(active_topologys);
            latest_code_ids.removeAll(active_topologys);
        }
        if (pendingTopologys != null) {
            to_cleanup_ids.removeAll(pendingTopologys);
        }
        to_cleanup_ids.removeAll(latest_code_ids);
        LOG.info("Skip remove topology of " + latest_code_ids);
        return to_cleanup_ids;
    }

    public void setTopologyStatus(TopologyAssignEvent event) throws Exception {
        StormClusterState stormClusterState = this.nimbusData.getStormClusterState();
        String topologyId = event.getTopologyId();
        String topologyName = event.getTopologyName();
        String group = event.getGroup();
        StormStatus status = new StormStatus(StatusType.active);
        if (event.getOldStatus() != null) {
            status = event.getOldStatus();
        }
        boolean isEnable = ConfigExtension.isEnablePerformanceMetrics(this.nimbusData.getConf());
        StormBase stormBase = stormClusterState.storm_base(topologyId, null);
        if (stormBase == null) {
            stormBase = new StormBase(topologyName, TimeUtils.current_time_secs(), status, group);
            stormBase.setEnableMonitor(isEnable);
            stormClusterState.activate_storm(topologyId, stormBase);
        } else {
            stormClusterState.update_storm(topologyId, status);
            stormClusterState.set_storm_monitor(topologyId, isEnable);
            if (topologyName == null) {
                event.setTopologyName(stormBase.getStormName());
            }
        }
        LOG.info("Update " + topologyId + " " + status);
    }

    protected TopologyAssignContext prepareTopologyAssign(TopologyAssignEvent event) throws Exception {
        TopologyAssignContext ret = new TopologyAssignContext();
        String topologyId = event.getTopologyId();
        ret.setTopologyId(topologyId);
        int topoMasterId = this.nimbusData.getTasksHeartbeat().get(topologyId).get_topologyMasterId();
        ret.setTopologyMasterTaskId(topoMasterId);
        LOG.info("prepareTopologyAssign, topoMasterId={}", (Object)topoMasterId);
        Map<Object, Object> nimbusConf = this.nimbusData.getConf();
        Map topologyConf = StormConfig.read_nimbus_topology_conf(nimbusConf, topologyId);
        StormTopology rawTopology = StormConfig.read_nimbus_topology_code(nimbusConf, topologyId);
        ret.setRawTopology(rawTopology);
        HashMap<Object, Object> stormConf = new HashMap<Object, Object>();
        stormConf.putAll(nimbusConf);
        stormConf.putAll(topologyConf);
        ret.setStormConf(stormConf);
        StormClusterState stormClusterState = this.nimbusData.getStormClusterState();
        Map<String, SupervisorInfo> supInfos = Cluster.get_all_SupervisorInfo(stormClusterState, null);
        for (Map.Entry<String, SupervisorInfo> supInfo : supInfos.entrySet()) {
            SupervisorInfo supervisor = supInfo.getValue();
            if (supervisor == null) continue;
            supervisor.setAvailableWorkerPorts(supervisor.getWorkerPorts());
        }
        this.getAliveSupervsByHb(supInfos, nimbusConf);
        if (supInfos.size() == 0) {
            throw new FailedAssignTopologyException("Failed to make assignment " + topologyId + ", due to no alive supervisor");
        }
        Map<Integer, String> taskToComponent = Cluster.get_all_task_component(stormClusterState, topologyId, null);
        ret.setTaskToComponent(taskToComponent);
        Set<Integer> allTaskIds = taskToComponent.keySet();
        if (allTaskIds == null || allTaskIds.size() == 0) {
            String errMsg = "Failed to get all task ID list from /ZK-dir/tasks/" + topologyId;
            LOG.warn(errMsg);
            throw new IOException(errMsg);
        }
        ret.setAllTaskIds(allTaskIds);
        Set<Object> aliveTasks = new HashSet();
        Set<Integer> unstoppedTasks = new HashSet<Integer>();
        HashSet<Integer> deadTasks = new HashSet<Integer>();
        HashSet<ResourceWorkerSlot> unstoppedWorkers = new HashSet();
        Assignment existingAssignment = stormClusterState.assignment_info(topologyId, null);
        if (existingAssignment != null) {
            aliveTasks = this.getAliveTasks(topologyId, allTaskIds);
            if (!aliveTasks.contains(topoMasterId)) {
                ResourceWorkerSlot worker = existingAssignment.getWorkerByTaskId(topoMasterId);
                deadTasks.addAll(worker.getTasks());
                HashSet<Integer> tempSet = new HashSet<Integer>(allTaskIds);
                tempSet.removeAll(deadTasks);
                aliveTasks.addAll(tempSet);
                aliveTasks.removeAll(deadTasks);
            } else {
                deadTasks.addAll(allTaskIds);
                deadTasks.removeAll(aliveTasks);
            }
            unstoppedTasks = this.getUnstoppedSlots(aliveTasks, supInfos, existingAssignment);
        }
        ret.setDeadTaskIds(deadTasks);
        ret.setUnstoppedTaskIds(unstoppedTasks);
        TopologyAssign.getFreeSlots(supInfos, stormClusterState);
        ret.setCluster(supInfos);
        if (existingAssignment == null) {
            ret.setAssignType(0);
            try {
                AssignmentBak lastAssignment = stormClusterState.assignment_bak(event.getTopologyName());
                if (lastAssignment != null) {
                    ret.setOldAssignment(lastAssignment.getAssignment());
                }
            }
            catch (Exception e) {
                LOG.warn("Fail to get old assignment", (Throwable)e);
            }
        } else {
            ret.setOldAssignment(existingAssignment);
            if (event.isScratch()) {
                ret.setAssignType(1);
                ret.setIsReassign(event.isReassign());
                unstoppedWorkers = this.getUnstoppedWorkers(unstoppedTasks, existingAssignment);
                ret.setUnstoppedWorkers(unstoppedWorkers);
            } else {
                ret.setAssignType(2);
                unstoppedWorkers = this.getUnstoppedWorkers(aliveTasks, existingAssignment);
                ret.setUnstoppedWorkers(unstoppedWorkers);
            }
        }
        return ret;
    }

    public Assignment mkAssignment(TopologyAssignEvent event) throws Exception {
        String topologyId = event.getTopologyId();
        LOG.info("Determining assignment for " + topologyId);
        TopologyAssignContext context = this.prepareTopologyAssign(event);
        Set<ResourceWorkerSlot> assignments = null;
        if (!StormConfig.local_mode(this.nimbusData.getConf())) {
            IToplogyScheduler scheduler = this.schedulers.get(DEFAULT_SCHEDULER_NAME);
            assignments = scheduler.assignTasks(context);
        } else {
            assignments = TopologyAssign.mkLocalAssignment(context);
        }
        Assignment assignment = null;
        if (assignments != null && assignments.size() > 0) {
            Map<String, String> nodeHost = TopologyAssign.getTopologyNodeHost(context.getCluster(), context.getOldAssignment(), assignments);
            Map<Integer, Integer> startTimes = TopologyAssign.getTaskStartTimes(context, this.nimbusData, topologyId, context.getOldAssignment(), assignments);
            String codeDir = StormConfig.masterStormdistRoot(this.nimbusData.getConf(), topologyId);
            assignment = new Assignment(codeDir, assignments, nodeHost, startTimes);
            if (event.isScaleTopology()) {
                assignment.setAssignmentType(Assignment.AssignmentType.ScaleTopology);
            }
            StormClusterState stormClusterState = this.nimbusData.getStormClusterState();
            stormClusterState.set_assignment(topologyId, assignment);
            NimbusUtils.updateTaskHbStartTime(this.nimbusData, assignment, topologyId);
            NimbusUtils.updateTopologyTaskTimeout(this.nimbusData, topologyId);
            LOG.info("Successfully make assignment for topology id " + topologyId + ": " + assignment);
        }
        return assignment;
    }

    private static Set<ResourceWorkerSlot> mkLocalAssignment(TopologyAssignContext context) throws Exception {
        HashSet<ResourceWorkerSlot> result2 = new HashSet<ResourceWorkerSlot>();
        Map<String, SupervisorInfo> cluster = context.getCluster();
        if (cluster.size() != 1) {
            throw new RuntimeException();
        }
        SupervisorInfo localSupervisor = null;
        String supervisorId = null;
        for (Map.Entry<String, SupervisorInfo> entry : cluster.entrySet()) {
            supervisorId = entry.getKey();
            localSupervisor = entry.getValue();
        }
        int port = -1;
        if (!localSupervisor.getAvailableWorkerPorts().iterator().hasNext()) {
            LOG.info(" amount of worker's ports is not enough");
            throw new FailedAssignTopologyException("Failed to make assignment , due to no enough ports");
        }
        port = localSupervisor.getAvailableWorkerPorts().iterator().next();
        ResourceWorkerSlot worker = new ResourceWorkerSlot(supervisorId, port);
        worker.setTasks(new HashSet<Integer>(context.getAllTaskIds()));
        worker.setHostname(localSupervisor.getHostName());
        result2.add(worker);
        return result2;
    }

    public static Map<Integer, Integer> getTaskStartTimes(TopologyAssignContext context, NimbusData nimbusData, String topologyId, Assignment existingAssignment, Set<ResourceWorkerSlot> workers) throws Exception {
        TreeMap<Integer, Integer> startTimes = new TreeMap<Integer, Integer>();
        if (context.getAssignType() == 0) {
            int nowSecs = TimeUtils.current_time_secs();
            for (ResourceWorkerSlot worker : workers) {
                for (Integer changedTaskId : worker.getTasks()) {
                    startTimes.put(changedTaskId, nowSecs);
                }
            }
            return startTimes;
        }
        Set<ResourceWorkerSlot> oldWorkers = new HashSet<ResourceWorkerSlot>();
        if (existingAssignment != null) {
            Map<Integer, Integer> taskStartTimeSecs = existingAssignment.getTaskStartTimeSecs();
            if (taskStartTimeSecs != null) {
                startTimes.putAll(taskStartTimeSecs);
            }
            if (existingAssignment.getWorkers() != null) {
                oldWorkers = existingAssignment.getWorkers();
            }
        }
        StormClusterState zkClusterState = nimbusData.getStormClusterState();
        Set<Integer> changedTaskIds = TopologyAssign.getNewOrChangedTaskIds(oldWorkers, workers);
        int nowSecs = TimeUtils.current_time_secs();
        for (Integer changedTaskId : changedTaskIds) {
            startTimes.put(changedTaskId, nowSecs);
            NimbusUtils.removeTopologyTaskHb(nimbusData, topologyId, changedTaskId);
        }
        Set<Integer> removedTaskIds = TopologyAssign.getRemovedTaskIds(oldWorkers, workers);
        for (Integer removedTaskId : removedTaskIds) {
            startTimes.remove(removedTaskId);
            NimbusUtils.removeTopologyTaskHb(nimbusData, topologyId, removedTaskId);
        }
        LOG.info("Task assignment has been changed: " + changedTaskIds + ", removed tasks " + removedTaskIds);
        return startTimes;
    }

    public static Map<String, String> getTopologyNodeHost(Map<String, SupervisorInfo> supervisorMap, Assignment existingAssignment, Set<ResourceWorkerSlot> workers) {
        Map<String, String> nodeHost;
        HashSet<String> usedNodes = new HashSet<String>();
        for (ResourceWorkerSlot worker : workers) {
            usedNodes.add(worker.getNodeId());
        }
        HashMap<String, String> allNodeHost = new HashMap<String, String>();
        if (existingAssignment != null) {
            allNodeHost.putAll(existingAssignment.getNodeHost());
        }
        if ((nodeHost = SupervisorInfo.getNodeHost(supervisorMap)) != null) {
            allNodeHost.putAll(nodeHost);
        }
        HashMap<String, String> ret = new HashMap<String, String>();
        for (String supervisorId : usedNodes) {
            if (allNodeHost.containsKey(supervisorId)) {
                ret.put(supervisorId, (String)allNodeHost.get(supervisorId));
                continue;
            }
            LOG.warn("Node " + supervisorId + " doesn't in the supervisor list");
        }
        return ret;
    }

    public static Set<Integer> getNewOrChangedTaskIds(Set<ResourceWorkerSlot> oldWorkers, Set<ResourceWorkerSlot> workers) {
        HashSet<Integer> rtn = new HashSet<Integer>();
        HashMap<String, ResourceWorkerSlot> workerPortMap = TopologyAssign.HostPortToWorkerMap(oldWorkers);
        for (ResourceWorkerSlot worker : workers) {
            ResourceWorkerSlot oldWorker = workerPortMap.get(worker.getHostPort());
            if (oldWorker != null) {
                Set<Integer> oldTasks = oldWorker.getTasks();
                for (Integer task : worker.getTasks()) {
                    if (oldTasks.contains(task)) continue;
                    rtn.add(task);
                }
                continue;
            }
            if (worker.getTasks() == null) continue;
            rtn.addAll(worker.getTasks());
        }
        return rtn;
    }

    public static Set<Integer> getRemovedTaskIds(Set<ResourceWorkerSlot> oldWorkers, Set<ResourceWorkerSlot> workers) {
        HashSet<Integer> rtn = new HashSet<Integer>();
        Set<Integer> oldTasks = TopologyAssign.getTaskSetFromWorkerSet(oldWorkers);
        Set<Integer> newTasks = TopologyAssign.getTaskSetFromWorkerSet(workers);
        for (Integer taskId : oldTasks) {
            if (newTasks.contains(taskId)) continue;
            rtn.add(taskId);
        }
        return rtn;
    }

    private static Set<Integer> getTaskSetFromWorkerSet(Set<ResourceWorkerSlot> workers) {
        HashSet<Integer> rtn = new HashSet<Integer>();
        for (ResourceWorkerSlot worker : workers) {
            rtn.addAll(worker.getTasks());
        }
        return rtn;
    }

    private static HashMap<String, ResourceWorkerSlot> HostPortToWorkerMap(Set<ResourceWorkerSlot> workers) {
        HashMap<String, ResourceWorkerSlot> rtn = new HashMap<String, ResourceWorkerSlot>();
        for (ResourceWorkerSlot worker : workers) {
            rtn.put(worker.getHostPort(), worker);
        }
        return rtn;
    }

    public static List<WorkerSlot> sortSlots(Set<WorkerSlot> allSlots, int needSlotNum) {
        HashMap<String, ArrayList<WorkerSlot>> nodeMap = new HashMap<String, ArrayList<WorkerSlot>>();
        for (WorkerSlot workerSlot : allSlots) {
            String node = workerSlot.getNodeId();
            ArrayList<WorkerSlot> list2 = (ArrayList<WorkerSlot>)nodeMap.get(node);
            if (list2 == null) {
                list2 = new ArrayList<WorkerSlot>();
                nodeMap.put(node, list2);
            }
            list2.add(workerSlot);
        }
        for (Map.Entry entry : nodeMap.entrySet()) {
            List ports = (List)entry.getValue();
            Collections.sort(ports, new Comparator<WorkerSlot>(){

                @Override
                public int compare(WorkerSlot first, WorkerSlot second) {
                    String secondNode;
                    String firstNode = first.getNodeId();
                    if (!firstNode.equals(secondNode = second.getNodeId())) {
                        return firstNode.compareTo(secondNode);
                    }
                    return first.getPort() - second.getPort();
                }
            });
        }
        ArrayList splitup = new ArrayList(nodeMap.values());
        Collections.sort(splitup, new Comparator<List<WorkerSlot>>(){

            @Override
            public int compare(List<WorkerSlot> o1, List<WorkerSlot> o2) {
                return o2.size() - o1.size();
            }
        });
        List<WorkerSlot> list2 = JStormUtils.interleave_all(splitup);
        if (list2.size() <= needSlotNum) {
            return list2;
        }
        return list2.subList(0, needSlotNum);
    }

    public Set<Integer> getUnstoppedSlots(Set<Integer> aliveTasks, Map<String, SupervisorInfo> supInfos, Assignment existAssignment) {
        HashSet<Integer> ret = new HashSet<Integer>();
        Set<ResourceWorkerSlot> oldWorkers = existAssignment.getWorkers();
        Set<String> aliveSupervisors = supInfos.keySet();
        for (ResourceWorkerSlot worker : oldWorkers) {
            for (Integer taskId : worker.getTasks()) {
                String oldTaskSupervisorId;
                if (!aliveTasks.contains(taskId) || aliveSupervisors.contains(oldTaskSupervisorId = worker.getNodeId())) continue;
                ret.add(taskId);
            }
        }
        return ret;
    }

    private Set<ResourceWorkerSlot> getUnstoppedWorkers(Set<Integer> aliveTasks, Assignment existAssignment) {
        HashSet<ResourceWorkerSlot> ret = new HashSet<ResourceWorkerSlot>();
        for (ResourceWorkerSlot worker : existAssignment.getWorkers()) {
            boolean alive = true;
            for (Integer task : worker.getTasks()) {
                if (aliveTasks.contains(task)) continue;
                alive = false;
                break;
            }
            if (!alive) continue;
            ret.add(worker);
        }
        return ret;
    }

    public static void getFreeSlots(Map<String, SupervisorInfo> supervisorInfos, StormClusterState stormClusterState) throws Exception {
        Map<String, Assignment> assignments = Cluster.get_all_assignment(stormClusterState, null);
        for (Map.Entry<String, Assignment> entry : assignments.entrySet()) {
            String topologyId = entry.getKey();
            Assignment assignment = entry.getValue();
            Set<ResourceWorkerSlot> workers = assignment.getWorkers();
            for (ResourceWorkerSlot worker : workers) {
                SupervisorInfo supervisorInfo = supervisorInfos.get(worker.getNodeId());
                if (supervisorInfo == null) continue;
                supervisorInfo.getAvailableWorkerPorts().remove(worker.getPort());
            }
        }
    }

    public Set<Integer> getAliveTasks(String topologyId, Set<Integer> taskIds) throws Exception {
        HashSet<Integer> aliveTasks = new HashSet<Integer>();
        for (int taskId : taskIds) {
            boolean isDead = NimbusUtils.isTaskDead(this.nimbusData, topologyId, taskId);
            if (isDead) continue;
            aliveTasks.add(taskId);
        }
        return aliveTasks;
    }

    public void backupAssignment(Assignment assignment, TopologyAssignEvent event) {
        String topologyId = event.getTopologyId();
        String topologyName = event.getTopologyName();
        try {
            StormClusterState zkClusterState = this.nimbusData.getStormClusterState();
            Map<Integer, String> tasks = Cluster.get_all_task_component(zkClusterState, topologyId, null);
            HashMap<String, List<Integer>> componentTasks = JStormUtils.reverse_map(tasks);
            for (Map.Entry entry : componentTasks.entrySet()) {
                List keys = (List)entry.getValue();
                Collections.sort(keys);
            }
            AssignmentBak assignmentBak = new AssignmentBak(componentTasks, assignment);
            zkClusterState.backup_assignment(topologyName, assignmentBak);
        }
        catch (Exception e) {
            LOG.warn("Failed to backup " + topologyId + " assignment " + assignment, (Throwable)e);
        }
    }

    private void getAliveSupervsByHb(Map<String, SupervisorInfo> supervisorInfos, Map conf) {
        int currentTime = TimeUtils.current_time_secs();
        int hbTimeout = JStormUtils.parseInt(conf.get("nimbus.supervisor.timeout.secs"), 180);
        HashSet<String> supervisorTobeRemoved = new HashSet<String>();
        for (Map.Entry<String, SupervisorInfo> entry : supervisorInfos.entrySet()) {
            SupervisorInfo supInfo = entry.getValue();
            int lastReportTime = supInfo.getTimeSecs();
            if (currentTime - lastReportTime <= hbTimeout) continue;
            LOG.warn("Supervisor-" + supInfo.getHostName() + " is dead. lastReportTime=" + lastReportTime);
            supervisorTobeRemoved.add(entry.getKey());
        }
        for (String name : supervisorTobeRemoved) {
            supervisorInfos.remove(name);
        }
    }

    public static void main(String[] args) {
    }
}

