/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.dispatch.host;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.host.CommonHostManager;
import org.apache.dolphinscheduler.server.master.dispatch.host.RoundRobinHostManager;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

public class LowerWeightHostManager
extends CommonHostManager {
    private final Logger logger = LoggerFactory.getLogger(LowerWeightHostManager.class);
    @Autowired
    private ZookeeperRegistryCenter registryCenter;
    private RoundRobinHostManager roundRobinHostManager;
    private LowerWeightRoundRobin selector;
    private ConcurrentHashMap<String, Set<HostWeight>> workerHostWeightsMap;
    private Lock lock;
    private ScheduledExecutorService executorService;

    @PostConstruct
    public void init() {
        this.selector = new LowerWeightRoundRobin();
        this.workerHostWeightsMap = new ConcurrentHashMap();
        this.lock = new ReentrantLock();
        this.executorService = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new NamedThreadFactory("LowerWeightHostManagerExecutor"));
        this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(), 0L, 5L, TimeUnit.SECONDS);
        this.roundRobinHostManager = new RoundRobinHostManager();
        this.roundRobinHostManager.setZookeeperNodeManager(this.getZookeeperNodeManager());
    }

    @PreDestroy
    public void close() {
        this.executorService.shutdownNow();
    }

    @Override
    public Host select(ExecutionContext context) {
        Set<HostWeight> workerHostWeights = this.getWorkerHostWeights(context.getWorkerGroup());
        if (CollectionUtils.isNotEmpty(workerHostWeights)) {
            return ((HostWeight)this.selector.select(workerHostWeights)).getHost();
        }
        return new Host();
    }

    @Override
    public Host select(Collection<Host> nodes) {
        throw new UnsupportedOperationException("not support");
    }

    private void syncWorkerHostWeight(Map<String, Set<HostWeight>> workerHostWeights) {
        this.lock.lock();
        try {
            this.workerHostWeightsMap.clear();
            this.workerHostWeightsMap.putAll(workerHostWeights);
        }
        finally {
            this.lock.unlock();
        }
    }

    private Set<HostWeight> getWorkerHostWeights(String workerGroup) {
        this.lock.lock();
        try {
            Set<HostWeight> set = this.workerHostWeightsMap.get(workerGroup);
            return set;
        }
        finally {
            this.lock.unlock();
        }
    }

    class RefreshResourceTask
    implements Runnable {
        RefreshResourceTask() {
        }

        @Override
        public void run() {
            try {
                Map<String, Set<String>> workerGroupNodes = LowerWeightHostManager.this.zookeeperNodeManager.getWorkerGroupNodes();
                Set<Map.Entry<String, Set<String>>> entries = workerGroupNodes.entrySet();
                HashMap workerHostWeights = new HashMap();
                for (Map.Entry<String, Set<String>> entry : entries) {
                    String workerGroup = entry.getKey();
                    Set<String> nodes = entry.getValue();
                    String workerGroupPath = LowerWeightHostManager.this.registryCenter.getWorkerGroupPath(workerGroup);
                    HashSet<HostWeight> hostWeights = new HashSet<HostWeight>(nodes.size());
                    for (String node : nodes) {
                        String heartbeat = LowerWeightHostManager.this.registryCenter.getZookeeperCachedOperator().get(workerGroupPath + "/" + node);
                        if (!StringUtils.isNotEmpty((CharSequence)heartbeat) || heartbeat.split(",").length != 10) continue;
                        String[] parts = heartbeat.split(",");
                        int status = Integer.parseInt(parts[8]);
                        if (status == 1) {
                            LowerWeightHostManager.this.logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}", (Object)Double.parseDouble(parts[3]), (Object)Double.parseDouble(parts[2]));
                            continue;
                        }
                        double cpu = Double.parseDouble(parts[0]);
                        double memory = Double.parseDouble(parts[1]);
                        double loadAverage = Double.parseDouble(parts[2]);
                        HostWeight hostWeight = new HostWeight(Host.of((String)node), cpu, memory, loadAverage);
                        hostWeights.add(hostWeight);
                    }
                    workerHostWeights.put(workerGroup, hostWeights);
                }
                LowerWeightHostManager.this.syncWorkerHostWeight(workerHostWeights);
            }
            catch (Throwable ex) {
                LowerWeightHostManager.this.logger.error("RefreshResourceTask error", ex);
            }
        }
    }
}

