/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.schedule.default_assign;

import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo;
import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.schedule.IToplogyScheduler;
import com.alibaba.jstorm.schedule.TopologyAssignContext;
import com.alibaba.jstorm.schedule.default_assign.DefaultTopologyAssignContext;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.alibaba.jstorm.schedule.default_assign.TaskScheduler;
import com.alibaba.jstorm.schedule.default_assign.WorkerScheduler;
import com.alibaba.jstorm.utils.FailedAssignTopologyException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultTopologyScheduler
implements IToplogyScheduler {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultTopologyScheduler.class);
    private Map nimbusConf;

    @Override
    public void prepare(Map conf) {
        this.nimbusConf = conf;
    }

    protected void freeUsed(TopologyAssignContext context) {
        HashSet<Integer> canFree = new HashSet<Integer>();
        canFree.addAll(context.getAllTaskIds());
        canFree.removeAll(context.getUnstoppedTaskIds());
        Map<String, SupervisorInfo> cluster = context.getCluster();
        Assignment oldAssigns = context.getOldAssignment();
        for (Integer task : canFree) {
            ResourceWorkerSlot worker = oldAssigns.getWorkerByTaskId(task);
            if (worker == null) {
                LOG.warn("When free rebalance resource, no ResourceAssignment of task " + task);
                continue;
            }
            SupervisorInfo supervisorInfo = cluster.get(worker.getNodeId());
            if (supervisorInfo == null) continue;
            supervisorInfo.getAvailableWorkerPorts().add(worker.getPort());
        }
    }

    private Set<Integer> getNeedAssignTasks(DefaultTopologyAssignContext context) {
        HashSet<Integer> needAssign = new HashSet<Integer>();
        int assignType = context.getAssignType();
        if (assignType == 0) {
            needAssign.addAll(context.getAllTaskIds());
        } else if (assignType == 1) {
            needAssign.addAll(context.getAllTaskIds());
            needAssign.removeAll(context.getUnstoppedTaskIds());
        } else {
            Set<Integer> deadTasks = context.getDeadTaskIds();
            needAssign.addAll(deadTasks);
        }
        return needAssign;
    }

    public Set<ResourceWorkerSlot> getKeepAssign(DefaultTopologyAssignContext defaultContext, Set<Integer> needAssigns) {
        HashSet<Integer> keepAssignIds = new HashSet<Integer>();
        keepAssignIds.addAll(defaultContext.getAllTaskIds());
        keepAssignIds.removeAll(defaultContext.getUnstoppedTaskIds());
        keepAssignIds.removeAll(needAssigns);
        HashSet<ResourceWorkerSlot> keeps = new HashSet<ResourceWorkerSlot>();
        if (keepAssignIds.isEmpty()) {
            return keeps;
        }
        Assignment oldAssignment = defaultContext.getOldAssignment();
        if (oldAssignment == null) {
            return keeps;
        }
        keeps.addAll(defaultContext.getOldWorkers());
        block0: for (ResourceWorkerSlot worker : defaultContext.getOldWorkers()) {
            for (Integer task : worker.getTasks()) {
                if (keepAssignIds.contains(task)) continue;
                keeps.remove(worker);
                continue block0;
            }
        }
        return keeps;
    }

    @Override
    public Set<ResourceWorkerSlot> assignTasks(TopologyAssignContext context) throws FailedAssignTopologyException {
        int assignType = context.getAssignType();
        if (!TopologyAssignContext.isAssignTypeValid(assignType)) {
            throw new FailedAssignTopologyException("Invalide Assign Type " + assignType);
        }
        DefaultTopologyAssignContext defaultContext = new DefaultTopologyAssignContext(context);
        if (assignType == 1) {
            this.freeUsed(defaultContext);
        }
        LOG.info("Dead tasks:" + defaultContext.getDeadTaskIds());
        LOG.info("Unstopped tasks:" + defaultContext.getUnstoppedTaskIds());
        Set<Integer> needAssignTasks = this.getNeedAssignTasks(defaultContext);
        Set<ResourceWorkerSlot> keepAssigns = this.getKeepAssign(defaultContext, needAssignTasks);
        HashSet<ResourceWorkerSlot> ret = new HashSet<ResourceWorkerSlot>();
        ret.addAll(keepAssigns);
        ret.addAll(defaultContext.getUnstoppedWorkers());
        int allocWorkerNum = defaultContext.getTotalWorkerNum() - defaultContext.getUnstoppedWorkerNum() - keepAssigns.size();
        LOG.info("allocWorkerNum=" + allocWorkerNum + ", totalWorkerNum=" + defaultContext.getTotalWorkerNum() + ", keepWorkerNum=" + keepAssigns.size());
        if (allocWorkerNum <= 0) {
            LOG.warn("Don't need assign workers, all workers are fine " + defaultContext.toDetailString());
            throw new FailedAssignTopologyException("Don't need assign worker, all workers are fine ");
        }
        List<ResourceWorkerSlot> availableWorkers = WorkerScheduler.getInstance().getAvailableWorkers(defaultContext, needAssignTasks, allocWorkerNum);
        TaskScheduler taskScheduler = new TaskScheduler(defaultContext, needAssignTasks, availableWorkers);
        HashSet<ResourceWorkerSlot> assignment = new HashSet<ResourceWorkerSlot>(taskScheduler.assign());
        ret.addAll(assignment);
        LOG.info("Keep Alive slots:" + keepAssigns);
        LOG.info("Unstopped slots:" + defaultContext.getUnstoppedWorkers());
        LOG.info("New assign slots:" + assignment);
        return ret;
    }
}

