package org.apache.dolphinscheduler.server.master.registry;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PreDestroy;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.class */
public class ServerNodeManager implements InitializingBean {
    private ScheduledExecutorService executorService;

    @Autowired
    private RegistryClient registryClient;

    @Autowired
    private WorkerGroupMapper workerGroupMapper;

    @Autowired
    private AlertDao alertDao;

    @Autowired
    private MasterConfig masterConfig;
    private final Logger logger = LoggerFactory.getLogger(ServerNodeManager.class);
    private final Lock masterLock = new ReentrantLock();
    private final ReentrantReadWriteLock workerGroupLock = new ReentrantReadWriteLock();
    private final ReentrantReadWriteLock.ReadLock workerGroupReadLock = this.workerGroupLock.readLock();
    private final ReentrantReadWriteLock.WriteLock workerGroupWriteLock = this.workerGroupLock.writeLock();
    private final ReentrantReadWriteLock workerNodeInfoLock = new ReentrantReadWriteLock();
    private final ReentrantReadWriteLock.ReadLock workerNodeInfoReadLock = this.workerNodeInfoLock.readLock();
    private final ReentrantReadWriteLock.WriteLock workerNodeInfoWriteLock = this.workerNodeInfoLock.writeLock();
    private final ConcurrentHashMap<String, Set<String>> workerGroupNodes = new ConcurrentHashMap<>();
    private final Set<String> masterNodes = new HashSet();
    private final Map<String, WorkerHeartBeat> workerNodeInfo = new HashMap();
    private final MasterPriorityQueue masterPriorityQueue = new MasterPriorityQueue();
    private final List<WorkerInfoChangeListener> workerInfoChangeListeners = new ArrayList();
    private volatile int currentSlot = 0;
    private volatile int totalSlot = 0;

    /* loaded from: input_file:org/apache/dolphinscheduler/server/master/registry/ServerNodeManager$MasterDataListener.class */
    class MasterDataListener implements SubscribeListener {
        MasterDataListener() {
        }

        public void notify(Event event) {
            String path = event.path();
            Event.Type type = event.type();
            if (ServerNodeManager.this.registryClient.isMasterPath(path)) {
                try {
                    if (type.equals(Event.Type.ADD)) {
                        ServerNodeManager.this.logger.info("master node : {} added.", path);
                        ServerNodeManager.this.updateMasterNodes();
                    }
                    if (type.equals(Event.Type.REMOVE)) {
                        ServerNodeManager.this.logger.info("master node : {} down.", path);
                        ServerNodeManager.this.updateMasterNodes();
                        ServerNodeManager.this.alertDao.sendServerStoppedAlert(1, path, "MASTER");
                    }
                } catch (Exception e) {
                    ServerNodeManager.this.logger.error("MasterNodeListener capture data change and get data failed.", e);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/dolphinscheduler/server/master/registry/ServerNodeManager$WorkerDataListener.class */
    class WorkerDataListener implements SubscribeListener {
        WorkerDataListener() {
        }

        public void notify(Event event) {
            String path = event.path();
            Event.Type type = event.type();
            String data = event.data();
            if (ServerNodeManager.this.registryClient.isWorkerPath(path)) {
                try {
                    String[] split = path.split("/");
                    String str = split[split.length - 1];
                    ServerNodeManager.this.logger.debug("received subscribe event : {}", event);
                    if (type == Event.Type.ADD) {
                        ServerNodeManager.this.logger.info("Worker: {} added, currentNode : {}", path, str);
                    } else if (type == Event.Type.REMOVE) {
                        ServerNodeManager.this.logger.info("Worker node : {} down.", path);
                        ServerNodeManager.this.alertDao.sendServerStoppedAlert(1, path, "WORKER");
                    } else if (type == Event.Type.UPDATE) {
                        ServerNodeManager.this.syncSingleWorkerNodeInfo(str, (WorkerHeartBeat) JSONUtils.parseObject(data, WorkerHeartBeat.class));
                    }
                } catch (Exception e) {
                    ServerNodeManager.this.logger.error("WorkerGroupListener capture data change and get data failed", e);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/dolphinscheduler/server/master/registry/ServerNodeManager$WorkerNodeInfoAndGroupDbSyncTask.class */
    class WorkerNodeInfoAndGroupDbSyncTask implements Runnable {
        WorkerNodeInfoAndGroupDbSyncTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                ServerNodeManager.this.updateWorkerNodes();
                ServerNodeManager.this.updateWorkerGroupMappings();
                ServerNodeManager.this.notifyWorkerInfoChangeListeners();
            } catch (Exception e) {
                ServerNodeManager.this.logger.error("WorkerNodeInfoAndGroupDbSyncTask error:", e);
            }
        }
    }

    public int getSlot() {
        return this.currentSlot;
    }

    public int getMasterSize() {
        return this.totalSlot;
    }

    public void afterPropertiesSet() {
        updateMasterNodes();
        updateWorkerNodes();
        updateWorkerGroupMappings();
        this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor"));
        this.executorService.scheduleWithFixedDelay(new WorkerNodeInfoAndGroupDbSyncTask(), 0L, this.masterConfig.getWorkerGroupRefreshInterval().getSeconds(), TimeUnit.SECONDS);
        this.registryClient.subscribe("/nodes/master", new MasterDataListener());
        this.registryClient.subscribe("/nodes/worker", new WorkerDataListener());
    }

    protected Set<String> getWorkerAddressByWorkerGroup(Map<String, String> map, WorkerGroup workerGroup) {
        HashSet hashSet = new HashSet();
        for (String str : workerGroup.getAddrList().split(",")) {
            if (map.containsKey(str)) {
                hashSet.add(str);
            }
        }
        return hashSet;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateMasterNodes() {
        this.currentSlot = 0;
        this.totalSlot = 0;
        this.masterNodes.clear();
        try {
            try {
                this.registryClient.getLock("/lock/masters");
                syncMasterNodes(this.registryClient.getMasterNodesDirectly(), this.registryClient.getServerList(NodeType.MASTER));
                this.registryClient.releaseLock("/lock/masters");
            } catch (Exception e) {
                this.logger.error("update master nodes error", e);
                this.registryClient.releaseLock("/lock/masters");
            }
        } catch (Throwable th) {
            this.registryClient.releaseLock("/lock/masters");
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateWorkerNodes() {
        this.workerGroupWriteLock.lock();
        try {
            for (Map.Entry entry : this.registryClient.getServerMaps(NodeType.WORKER).entrySet()) {
                this.workerNodeInfo.put((String) entry.getKey(), (WorkerHeartBeat) JSONUtils.parseObject((String) entry.getValue(), WorkerHeartBeat.class));
            }
        } finally {
            this.workerGroupWriteLock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateWorkerGroupMappings() {
        List<WorkerGroup> queryAllWorkerGroup = this.workerGroupMapper.queryAllWorkerGroup();
        HashMap hashMap = new HashMap();
        try {
            this.workerNodeInfoReadLock.lock();
            for (WorkerGroup workerGroup : queryAllWorkerGroup) {
                String name = workerGroup.getName();
                String[] split = workerGroup.getAddrList().split(",");
                if (!ArrayUtils.isEmpty(split)) {
                    Stream stream = Arrays.stream(split);
                    Map<String, WorkerHeartBeat> map = this.workerNodeInfo;
                    Objects.requireNonNull(map);
                    hashMap.put(name, (Set) stream.filter((v1) -> {
                        return r1.containsKey(v1);
                    }).collect(Collectors.toSet()));
                }
            }
            if (!hashMap.containsKey("default")) {
                hashMap.put("default", this.workerNodeInfo.keySet());
            }
            this.workerGroupWriteLock.lock();
            try {
                this.workerGroupNodes.clear();
                this.workerGroupNodes.putAll(hashMap);
                notifyWorkerInfoChangeListeners();
                this.workerGroupWriteLock.unlock();
            } catch (Throwable th) {
                this.workerGroupWriteLock.unlock();
                throw th;
            }
        } finally {
            this.workerNodeInfoReadLock.unlock();
        }
    }

    private void syncMasterNodes(Collection<String> collection, List<Server> list) {
        this.masterLock.lock();
        try {
            this.masterNodes.addAll(collection);
            this.masterPriorityQueue.clear();
            this.masterPriorityQueue.putList(list);
            int index = this.masterPriorityQueue.getIndex(this.masterConfig.getMasterAddress());
            if (index >= 0) {
                this.totalSlot = collection.size();
                this.currentSlot = index;
            } else {
                this.logger.warn("Current master is not in active master list");
            }
            this.logger.info("Update master nodes, total master size: {}, current slot: {}", Integer.valueOf(this.totalSlot), Integer.valueOf(this.currentSlot));
            this.masterLock.unlock();
        } catch (Throwable th) {
            this.masterLock.unlock();
            throw th;
        }
    }

    public Map<String, Set<String>> getWorkerGroupNodes() {
        this.workerGroupReadLock.lock();
        try {
            return Collections.unmodifiableMap(this.workerGroupNodes);
        } finally {
            this.workerGroupReadLock.unlock();
        }
    }

    public Set<String> getWorkerGroupNodes(String str) {
        this.workerGroupReadLock.lock();
        try {
            if (StringUtils.isEmpty(str)) {
                str = "default";
            }
            Set<String> set = this.workerGroupNodes.get(str);
            if (CollectionUtils.isEmpty(set)) {
                Set<String> emptySet = Collections.emptySet();
                this.workerGroupReadLock.unlock();
                return emptySet;
            }
            Set<String> unmodifiableSet = Collections.unmodifiableSet(set);
            this.workerGroupReadLock.unlock();
            return unmodifiableSet;
        } catch (Throwable th) {
            this.workerGroupReadLock.unlock();
            throw th;
        }
    }

    public Map<String, WorkerHeartBeat> getWorkerNodeInfo() {
        return Collections.unmodifiableMap(this.workerNodeInfo);
    }

    public Optional<WorkerHeartBeat> getWorkerNodeInfo(String str) {
        this.workerNodeInfoReadLock.lock();
        try {
            return Optional.ofNullable(this.workerNodeInfo.getOrDefault(str, null));
        } finally {
            this.workerNodeInfoReadLock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void syncSingleWorkerNodeInfo(String str, WorkerHeartBeat workerHeartBeat) {
        this.workerNodeInfoWriteLock.lock();
        try {
            this.workerNodeInfo.put(str, workerHeartBeat);
        } finally {
            this.workerNodeInfoWriteLock.unlock();
        }
    }

    public synchronized void addWorkerInfoChangeListener(WorkerInfoChangeListener workerInfoChangeListener) {
        this.workerInfoChangeListeners.add(workerInfoChangeListener);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyWorkerInfoChangeListeners() {
        Map<String, Set<String>> workerGroupNodes = getWorkerGroupNodes();
        Map<String, WorkerHeartBeat> workerNodeInfo = getWorkerNodeInfo();
        Iterator<WorkerInfoChangeListener> it = this.workerInfoChangeListeners.iterator();
        while (it.hasNext()) {
            it.next().notify(workerGroupNodes, workerNodeInfo);
        }
    }

    @PreDestroy
    public void destroy() {
        this.executorService.shutdownNow();
    }
}
