/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.schedule.default_assign;

import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.client.WorkerAssignment;
import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo;
import com.alibaba.jstorm.schedule.default_assign.DefaultTopologyAssignContext;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.alibaba.jstorm.utils.FailedAssignTopologyException;
import com.alibaba.jstorm.utils.NetWorkUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkerScheduler {
    public static Logger LOG = LoggerFactory.getLogger(WorkerScheduler.class);
    private static WorkerScheduler instance;

    private WorkerScheduler() {
    }

    public static WorkerScheduler getInstance() {
        if (instance == null) {
            instance = new WorkerScheduler();
        }
        return instance;
    }

    public List<ResourceWorkerSlot> getAvailableWorkers(DefaultTopologyAssignContext context, Set<Integer> needAssign, int allocWorkerNum) {
        int workersNum = this.getAvailableWorkersNum(context);
        if (workersNum < allocWorkerNum) {
            throw new FailedAssignTopologyException("there's no enough worker. allocWorkerNum=" + allocWorkerNum + ", availableWorkerNum=" + workersNum);
        }
        workersNum = allocWorkerNum;
        ArrayList<ResourceWorkerSlot> assignedWorkers = new ArrayList<ResourceWorkerSlot>();
        this.getRightWorkers(context, needAssign, assignedWorkers, workersNum, this.getUserDefineWorkers(context, ConfigExtension.getUserDefineAssignment(context.getStormConf())));
        if (ConfigExtension.isUseOldAssignment(context.getStormConf())) {
            this.getRightWorkers(context, needAssign, assignedWorkers, workersNum, context.getOldWorkers());
        } else if (context.getAssignType() == 1 && !context.isReassign()) {
            int cnt = 0;
            for (ResourceWorkerSlot worker : context.getOldWorkers()) {
                if (cnt >= workersNum) break;
                ResourceWorkerSlot resFreeWorker = new ResourceWorkerSlot();
                resFreeWorker.setPort(worker.getPort());
                resFreeWorker.setHostname(worker.getHostname());
                resFreeWorker.setNodeId(worker.getNodeId());
                assignedWorkers.add(resFreeWorker);
                ++cnt;
            }
        }
        int workersForSingleTM = 0;
        if (context.getAssignSingleWorkerForTM()) {
            for (Integer taskId : needAssign) {
                String componentName = context.getTaskToComponent().get(taskId);
                if (!componentName.equals("__topology_master")) continue;
                ++workersForSingleTM;
            }
        }
        LOG.info("Get workers from user define and old assignments: " + assignedWorkers);
        int restWokerNum = workersNum - assignedWorkers.size();
        if (restWokerNum < 0) {
            throw new FailedAssignTopologyException("Too much workers are needed for user define or old assignments. workersNum=" + workersNum + ", assignedWokersNum=" + assignedWorkers.size());
        }
        for (int i = 0; i < restWokerNum; ++i) {
            assignedWorkers.add(new ResourceWorkerSlot());
        }
        List<SupervisorInfo> isolationSupervisors = this.getIsolationSupervisors(context);
        if (isolationSupervisors.size() != 0) {
            this.putAllWorkerToSupervisor(assignedWorkers, this.getResAvailSupervisors(isolationSupervisors));
        } else {
            this.putAllWorkerToSupervisor(assignedWorkers, this.getResAvailSupervisors(context.getCluster()));
        }
        this.setAllWorkerMemAndCpu(context.getStormConf(), assignedWorkers);
        LOG.info("Assigned workers=" + assignedWorkers);
        return assignedWorkers;
    }

    private void setAllWorkerMemAndCpu(Map conf, List<ResourceWorkerSlot> assignedWorkers) {
        long defaultSize = ConfigExtension.getMemSizePerWorker(conf);
        int defaultCpu = ConfigExtension.getCpuSlotPerWorker(conf);
        for (ResourceWorkerSlot worker : assignedWorkers) {
            if (worker.getMemSize() <= 0L) {
                worker.setMemSize(defaultSize);
            }
            if (worker.getCpu() > 0) continue;
            worker.setCpu(defaultCpu);
        }
    }

    private void putAllWorkerToSupervisor(List<ResourceWorkerSlot> assignedWorkers, List<SupervisorInfo> supervisors) {
        block0: for (ResourceWorkerSlot worker : assignedWorkers) {
            if (worker.getHostname() == null) continue;
            for (SupervisorInfo supervisor : supervisors) {
                if (!NetWorkUtils.equals(supervisor.getHostName(), worker.getHostname()) || supervisor.getAvailableWorkerPorts().size() <= 0) continue;
                this.putWorkerToSupervisor(supervisor, worker);
                continue block0;
            }
        }
        supervisors = this.getResAvailSupervisors(supervisors);
        Collections.sort(supervisors, new Comparator<SupervisorInfo>(){

            @Override
            public int compare(SupervisorInfo o1, SupervisorInfo o2) {
                return -NumberUtils.compare((float)o1.getAvailableWorkerPorts().size(), (float)o2.getAvailableWorkerPorts().size());
            }
        });
        this.putWorkerToSupervisor(assignedWorkers, supervisors);
    }

    private void putWorkerToSupervisor(SupervisorInfo supervisor, ResourceWorkerSlot worker) {
        int port = worker.getPort();
        if (!supervisor.getAvailableWorkerPorts().contains(worker.getPort())) {
            port = supervisor.getAvailableWorkerPorts().iterator().next();
        }
        worker.setPort(port);
        supervisor.getAvailableWorkerPorts().remove(port);
        worker.setNodeId(supervisor.getSupervisorId());
    }

    private void putWorkerToSupervisor(List<ResourceWorkerSlot> assignedWorkers, List<SupervisorInfo> supervisors) {
        int allUsedPorts = 0;
        for (SupervisorInfo supervisor : supervisors) {
            int supervisorUsedPorts = supervisor.getWorkerPorts().size() - supervisor.getAvailableWorkerPorts().size();
            allUsedPorts += supervisorUsedPorts;
        }
        int theoryAveragePorts = (allUsedPorts + assignedWorkers.size()) / supervisors.size() + 1;
        ArrayList<SupervisorInfo> overLoadSupervisors = new ArrayList<SupervisorInfo>();
        int key = 0;
        Iterator<ResourceWorkerSlot> iterator = assignedWorkers.iterator();
        while (iterator.hasNext() && supervisors.size() != 0) {
            SupervisorInfo supervisor;
            int supervisorUsedPorts;
            if (key >= supervisors.size()) {
                key = 0;
            }
            if ((supervisorUsedPorts = (supervisor = supervisors.get(key)).getWorkerPorts().size() - supervisor.getAvailableWorkerPorts().size()) < theoryAveragePorts) {
                ResourceWorkerSlot worker = iterator.next();
                if (worker.getNodeId() != null) continue;
                worker.setHostname(supervisor.getHostName());
                worker.setNodeId(supervisor.getSupervisorId());
                worker.setPort(supervisor.getAvailableWorkerPorts().iterator().next());
                supervisor.getAvailableWorkerPorts().remove(worker.getPort());
                if (supervisor.getAvailableWorkerPorts().size() == 0) {
                    supervisors.remove(supervisor);
                }
                ++key;
                continue;
            }
            overLoadSupervisors.add(supervisor);
            supervisors.remove(supervisor);
        }
        Collections.sort(overLoadSupervisors, new Comparator<SupervisorInfo>(){

            @Override
            public int compare(SupervisorInfo o1, SupervisorInfo o2) {
                return -NumberUtils.compare((float)o1.getAvailableWorkerPorts().size(), (float)o2.getAvailableWorkerPorts().size());
            }
        });
        key = 0;
        while (iterator.hasNext() && overLoadSupervisors.size() != 0) {
            ResourceWorkerSlot worker;
            if (key >= overLoadSupervisors.size()) {
                key = 0;
            }
            if ((worker = iterator.next()).getNodeId() != null) continue;
            SupervisorInfo supervisor = (SupervisorInfo)overLoadSupervisors.get(key);
            worker.setHostname(supervisor.getHostName());
            worker.setNodeId(supervisor.getSupervisorId());
            worker.setPort(supervisor.getAvailableWorkerPorts().iterator().next());
            supervisor.getAvailableWorkerPorts().remove(worker.getPort());
            if (supervisor.getAvailableWorkerPorts().size() == 0) {
                overLoadSupervisors.remove(supervisor);
            }
            ++key;
        }
    }

    private void getRightWorkers(DefaultTopologyAssignContext context, Set<Integer> needAssign, List<ResourceWorkerSlot> assignedWorkers, int workersNum, Collection<ResourceWorkerSlot> workers) {
        HashSet<Integer> assigned = new HashSet<Integer>();
        ArrayList<ResourceWorkerSlot> users = new ArrayList<ResourceWorkerSlot>();
        if (workers == null) {
            return;
        }
        for (ResourceWorkerSlot worker : workers) {
            boolean right = true;
            Set<Integer> tasks = worker.getTasks();
            if (tasks == null) continue;
            for (Integer task : tasks) {
                if (needAssign.contains(task) && !assigned.contains(task)) continue;
                right = false;
                break;
            }
            if (!right) continue;
            assigned.addAll(tasks);
            users.add(worker);
        }
        if (users.size() + assignedWorkers.size() > workersNum) {
            LOG.warn("There are no enough workers for user define scheduler / keeping old assignment, userdefineWorkers={}, assignedWorkers={}, workerNum={}", new Object[]{users, assignedWorkers, workersNum});
            return;
        }
        assignedWorkers.addAll(users);
        needAssign.removeAll(assigned);
    }

    private int getAvailableWorkersNum(DefaultTopologyAssignContext context) {
        Map<String, SupervisorInfo> supervisors = context.getCluster();
        List<SupervisorInfo> isolationSupervisors = this.getIsolationSupervisors(context);
        int slotNum = 0;
        if (isolationSupervisors.size() != 0) {
            for (SupervisorInfo superivsor : isolationSupervisors) {
                slotNum += superivsor.getAvailableWorkerPorts().size();
            }
        } else {
            for (Map.Entry<String, SupervisorInfo> entry : supervisors.entrySet()) {
                slotNum += entry.getValue().getAvailableWorkerPorts().size();
            }
        }
        return slotNum;
    }

    private List<ResourceWorkerSlot> getUserDefineWorkers(DefaultTopologyAssignContext context, List<WorkerAssignment> workers) {
        ArrayList<ResourceWorkerSlot> ret = new ArrayList<ResourceWorkerSlot>();
        if (workers == null) {
            return ret;
        }
        HashMap componentToTask = (HashMap)((HashMap)context.getComponentTasks()).clone();
        if (context.getAssignType() != 0) {
            this.checkUserDefineWorkers(context, workers, context.getTaskToComponent());
        }
        for (WorkerAssignment worker : workers) {
            ResourceWorkerSlot workerSlot = new ResourceWorkerSlot(worker, componentToTask);
            if (workerSlot.getTasks().size() == 0) continue;
            ret.add(workerSlot);
        }
        return ret;
    }

    private void checkUserDefineWorkers(DefaultTopologyAssignContext context, List<WorkerAssignment> workers, Map<Integer, String> taskToComponent) {
        Set<ResourceWorkerSlot> unstoppedWorkers = context.getUnstoppedWorkers();
        ArrayList<WorkerAssignment> re = new ArrayList<WorkerAssignment>();
        for (WorkerAssignment worker : workers) {
            for (ResourceWorkerSlot unstopped : unstoppedWorkers) {
                if (!unstopped.compareToUserDefineWorker(worker, taskToComponent)) continue;
                re.add(worker);
            }
        }
        workers.removeAll(re);
    }

    private List<SupervisorInfo> getResAvailSupervisors(Map<String, SupervisorInfo> supervisors) {
        ArrayList<SupervisorInfo> availableSupervisors = new ArrayList<SupervisorInfo>();
        for (Map.Entry<String, SupervisorInfo> entry : supervisors.entrySet()) {
            SupervisorInfo supervisor = entry.getValue();
            if (supervisor.getAvailableWorkerPorts().size() <= 0) continue;
            availableSupervisors.add(entry.getValue());
        }
        return availableSupervisors;
    }

    private List<SupervisorInfo> getResAvailSupervisors(List<SupervisorInfo> supervisors) {
        ArrayList<SupervisorInfo> availableSupervisors = new ArrayList<SupervisorInfo>();
        for (SupervisorInfo supervisor : supervisors) {
            if (supervisor.getAvailableWorkerPorts().size() <= 0) continue;
            availableSupervisors.add(supervisor);
        }
        return availableSupervisors;
    }

    private List<SupervisorInfo> getIsolationSupervisors(DefaultTopologyAssignContext context) {
        List isolationHosts = (List)context.getStormConf().get("isolation.scheduler.machines");
        LOG.info("Isolation machines: " + isolationHosts);
        if (isolationHosts == null) {
            return new ArrayList<SupervisorInfo>();
        }
        ArrayList<SupervisorInfo> isolationSupervisors = new ArrayList<SupervisorInfo>();
        for (Map.Entry<String, SupervisorInfo> entry : context.getCluster().entrySet()) {
            if (!this.containTargetHost(isolationHosts, entry.getValue().getHostName())) continue;
            isolationSupervisors.add(entry.getValue());
        }
        return isolationSupervisors;
    }

    private boolean containTargetHost(Collection<String> hosts, String target) {
        for (String host : hosts) {
            if (!NetWorkUtils.equals(host, target)) continue;
            return true;
        }
        return false;
    }
}

