/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.dispatch.host;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.PostConstruct;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.common.enums.ServerStatus;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.extract.base.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
import org.apache.dolphinscheduler.server.master.dispatch.host.CommonHostManager;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin;
import org.apache.dolphinscheduler.server.master.registry.WorkerInfoChangeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LowerWeightHostManager
extends CommonHostManager {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(LowerWeightHostManager.class);
    private LowerWeightRoundRobin selector;
    private ConcurrentHashMap<String, Set<HostWeight>> workerHostWeightsMap;
    private final ReentrantReadWriteLock workerGroupLock = new ReentrantReadWriteLock();
    private final ReentrantReadWriteLock.ReadLock workerGroupReadLock = this.workerGroupLock.readLock();
    private final ReentrantReadWriteLock.WriteLock workerGroupWriteLock = this.workerGroupLock.writeLock();

    @PostConstruct
    public void init() {
        this.selector = new LowerWeightRoundRobin();
        this.workerHostWeightsMap = new ConcurrentHashMap();
        this.serverNodeManager.addWorkerInfoChangeListener(new WorkerWeightListener());
    }

    @Override
    public Optional<Host> select(String workerGroup) throws WorkerGroupNotFoundException {
        Set<HostWeight> workerHostWeights = this.getWorkerHostWeights(workerGroup);
        if (CollectionUtils.isNotEmpty(workerHostWeights)) {
            return Optional.ofNullable(this.selector.select(workerHostWeights).getHost());
        }
        return Optional.empty();
    }

    @Override
    public HostWorker select(Collection<HostWorker> nodes) {
        throw new UnsupportedOperationException("not support");
    }

    public Optional<HostWeight> getHostWeight(String workerAddress, String workerGroup, WorkerHeartBeat heartBeat) {
        if (heartBeat == null) {
            log.warn("Worker {} in WorkerGroup {} have not received the heartbeat", (Object)workerAddress, (Object)workerGroup);
            return Optional.empty();
        }
        if (ServerStatus.BUSY == heartBeat.getServerStatus()) {
            log.warn("Worker {} in workerGroup {} is Busy, heartbeat is {}", new Object[]{workerAddress, workerGroup, heartBeat});
            return Optional.empty();
        }
        return Optional.of(new HostWeight(HostWorker.of(workerAddress, heartBeat.getWorkerHostWeight(), workerGroup), heartBeat.getCpuUsage(), heartBeat.getMemoryUsage(), heartBeat.getDiskUsage(), heartBeat.getThreadPoolUsage(), heartBeat.getStartupTime()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Set<HostWeight> getWorkerHostWeights(String workerGroup) throws WorkerGroupNotFoundException {
        this.workerGroupReadLock.lock();
        try {
            Set<HostWeight> hostWeights = this.workerHostWeightsMap.get(workerGroup);
            if (hostWeights == null) {
                throw new WorkerGroupNotFoundException("Can not find worker group " + workerGroup);
            }
            Set<HostWeight> set = hostWeights;
            return set;
        }
        finally {
            this.workerGroupReadLock.unlock();
        }
    }

    private class WorkerWeightListener
    implements WorkerInfoChangeListener {
        private WorkerWeightListener() {
        }

        @Override
        public void notify(Map<String, Set<String>> workerGroups, Map<String, WorkerHeartBeat> workerNodeInfo) {
            this.syncWorkerResources(workerGroups, workerNodeInfo);
        }

        private void syncWorkerResources(Map<String, Set<String>> workerGroupNodes, Map<String, WorkerHeartBeat> workerNodeInfoMap) {
            try {
                HashMap<String, Set<HostWeight>> workerHostWeights = new HashMap<String, Set<HostWeight>>();
                for (Map.Entry<String, Set<String>> entry : workerGroupNodes.entrySet()) {
                    String workerGroup = entry.getKey();
                    Set<String> nodes = entry.getValue();
                    HashSet hostWeights = new HashSet(nodes.size());
                    for (String node : nodes) {
                        WorkerHeartBeat heartbeat = workerNodeInfoMap.getOrDefault(node, null);
                        Optional<HostWeight> hostWeightOpt = LowerWeightHostManager.this.getHostWeight(node, workerGroup, heartbeat);
                        hostWeightOpt.ifPresent(hostWeights::add);
                    }
                    if (hostWeights.isEmpty()) continue;
                    workerHostWeights.put(workerGroup, hostWeights);
                }
                this.syncWorkerHostWeight(workerHostWeights);
            }
            catch (Throwable ex) {
                log.error("Sync worker resource error", ex);
            }
        }

        private void syncWorkerHostWeight(Map<String, Set<HostWeight>> workerHostWeights) {
            LowerWeightHostManager.this.workerGroupWriteLock.lock();
            try {
                LowerWeightHostManager.this.workerHostWeightsMap.clear();
                LowerWeightHostManager.this.workerHostWeightsMap.putAll(workerHostWeights);
            }
            finally {
                LowerWeightHostManager.this.workerGroupWriteLock.unlock();
            }
        }
    }
}

