/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.registry;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
import org.apache.dolphinscheduler.server.master.registry.MasterInfoChangeListener;
import org.apache.dolphinscheduler.server.master.registry.WorkerInfoChangeListener;
import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class ServerNodeManager
implements InitializingBean {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ServerNodeManager.class);
    private final ReentrantReadWriteLock workerGroupLock = new ReentrantReadWriteLock();
    private final ReentrantReadWriteLock.ReadLock workerGroupReadLock = this.workerGroupLock.readLock();
    private final ReentrantReadWriteLock.WriteLock workerGroupWriteLock = this.workerGroupLock.writeLock();
    private final ReentrantReadWriteLock workerNodeInfoLock = new ReentrantReadWriteLock();
    private final ReentrantReadWriteLock.ReadLock workerNodeInfoReadLock = this.workerNodeInfoLock.readLock();
    private final ReentrantReadWriteLock.WriteLock workerNodeInfoWriteLock = this.workerNodeInfoLock.writeLock();
    private final ReentrantLock masterNodeInfoLock = new ReentrantLock();
    private final ConcurrentHashMap<String, Set<String>> workerGroupNodes = new ConcurrentHashMap();
    private final Map<String, MasterHeartBeat> masterNodeInfo = new HashMap<String, MasterHeartBeat>();
    private final Map<String, WorkerHeartBeat> workerNodeInfo = new HashMap<String, WorkerHeartBeat>();
    private ScheduledExecutorService executorService;
    @Autowired
    private RegistryClient registryClient;
    @Autowired
    private WorkerGroupMapper workerGroupMapper;
    @Autowired
    private AlertDao alertDao;
    @Autowired
    private MasterConfig masterConfig;
    @Autowired
    private ListenerEventAlertManager listenerEventAlertManager;
    private final List<WorkerInfoChangeListener> workerInfoChangeListeners = new ArrayList<WorkerInfoChangeListener>();
    private final List<MasterInfoChangeListener> masterInfoChangeListeners = new ArrayList<MasterInfoChangeListener>();

    public void afterPropertiesSet() {
        this.refreshNodesAndGroupMappings();
        this.executorService = Executors.newSingleThreadScheduledExecutor(ThreadUtils.newDaemonThreadFactory((String)"ServerNodeManagerExecutor"));
        this.executorService.scheduleWithFixedDelay(new WorkerNodeInfoAndGroupDbSyncTask(), 0L, this.masterConfig.getWorkerGroupRefreshInterval().getSeconds(), TimeUnit.SECONDS);
        this.registryClient.subscribe(RegistryNodeType.MASTER.getRegistryPath(), (SubscribeListener)new MasterDataListener());
        this.registryClient.subscribe(RegistryNodeType.WORKER.getRegistryPath(), (SubscribeListener)new WorkerDataListener());
    }

    private void refreshNodesAndGroupMappings() {
        this.updateWorkerNodes();
        this.updateWorkerGroupMappings();
        this.notifyWorkerInfoChangeListeners();
        this.updateMasterNodes();
        this.notifyMasterInfoChangeListeners();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateMasterNodes() {
        this.masterNodeInfoLock.lock();
        try {
            this.masterNodeInfo.clear();
            Map masterNodeMaps = this.registryClient.getServerMaps(RegistryNodeType.MASTER);
            for (Map.Entry entry : masterNodeMaps.entrySet()) {
                this.masterNodeInfo.put((String)entry.getKey(), (MasterHeartBeat)JSONUtils.parseObject((String)((String)entry.getValue()), MasterHeartBeat.class));
            }
        }
        catch (Exception e) {
            log.error("update master nodes error", (Throwable)e);
        }
        finally {
            this.masterNodeInfoLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateWorkerNodes() {
        this.workerGroupWriteLock.lock();
        try {
            Map workerNodeMaps = this.registryClient.getServerMaps(RegistryNodeType.WORKER);
            for (Map.Entry entry : workerNodeMaps.entrySet()) {
                this.workerNodeInfo.put((String)entry.getKey(), (WorkerHeartBeat)JSONUtils.parseObject((String)((String)entry.getValue()), WorkerHeartBeat.class));
            }
        }
        finally {
            this.workerGroupWriteLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateWorkerGroupMappings() {
        List workerGroups = this.workerGroupMapper.queryAllWorkerGroup();
        HashMap<String, Set<Object>> tmpWorkerGroupMappings = new HashMap<String, Set<Object>>();
        try {
            this.workerNodeInfoReadLock.lock();
            for (WorkerGroup workerGroup : workerGroups) {
                String workerGroupName = workerGroup.getName();
                Object[] workerAddresses = workerGroup.getAddrList().split(",");
                if (ArrayUtils.isEmpty((Object[])workerAddresses)) continue;
                Set activeWorkerNodes = Arrays.stream(workerAddresses).filter(this.workerNodeInfo::containsKey).collect(Collectors.toSet());
                tmpWorkerGroupMappings.put(workerGroupName, activeWorkerNodes);
            }
            if (!tmpWorkerGroupMappings.containsKey("default")) {
                tmpWorkerGroupMappings.put("default", this.workerNodeInfo.keySet());
            }
        }
        finally {
            this.workerNodeInfoReadLock.unlock();
        }
        this.workerGroupWriteLock.lock();
        try {
            this.workerGroupNodes.clear();
            this.workerGroupNodes.putAll(tmpWorkerGroupMappings);
        }
        finally {
            this.workerGroupWriteLock.unlock();
        }
    }

    public Map<String, Set<String>> getWorkerGroupNodes() {
        this.workerGroupReadLock.lock();
        try {
            Map<String, Set<String>> map = Collections.unmodifiableMap(this.workerGroupNodes);
            return map;
        }
        finally {
            this.workerGroupReadLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Set<String> getWorkerGroupNodes(String workerGroup) throws WorkerGroupNotFoundException {
        this.workerGroupReadLock.lock();
        try {
            Set<String> nodes;
            if (StringUtils.isEmpty((CharSequence)workerGroup)) {
                workerGroup = "default";
            }
            if ((nodes = this.workerGroupNodes.get(workerGroup)) == null) {
                throw new WorkerGroupNotFoundException(String.format("WorkerGroup: %s is invalidated", workerGroup));
            }
            if (CollectionUtils.isEmpty(nodes)) {
                Set<String> set = Collections.emptySet();
                return set;
            }
            Set<String> set = Collections.unmodifiableSet(nodes);
            return set;
        }
        finally {
            this.workerGroupReadLock.unlock();
        }
    }

    public Map<String, WorkerHeartBeat> getWorkerNodeInfo() {
        return Collections.unmodifiableMap(this.workerNodeInfo);
    }

    public Optional<WorkerHeartBeat> getWorkerNodeInfo(String workerServerAddress) {
        this.workerNodeInfoReadLock.lock();
        try {
            Optional<Object> optional = Optional.ofNullable(this.workerNodeInfo.getOrDefault(workerServerAddress, null));
            return optional;
        }
        finally {
            this.workerNodeInfoReadLock.unlock();
        }
    }

    public Map<String, MasterHeartBeat> getMasterNodeInfo() {
        return Collections.unmodifiableMap(this.masterNodeInfo);
    }

    public synchronized void addWorkerInfoChangeListener(WorkerInfoChangeListener listener) {
        this.workerInfoChangeListeners.add(listener);
    }

    private void notifyWorkerInfoChangeListeners() {
        Map<String, Set<String>> workerGroupNodeMap = this.getWorkerGroupNodes();
        Map<String, WorkerHeartBeat> workerNodeInfoMap = this.getWorkerNodeInfo();
        for (WorkerInfoChangeListener listener : this.workerInfoChangeListeners) {
            listener.notify(workerGroupNodeMap, workerNodeInfoMap);
        }
    }

    public synchronized void addMasterInfoChangeListener(MasterInfoChangeListener listener) {
        this.masterInfoChangeListeners.add(listener);
    }

    private void notifyMasterInfoChangeListeners() {
        Map<String, MasterHeartBeat> masterNodeInfoMap = this.getMasterNodeInfo();
        for (MasterInfoChangeListener listener : this.masterInfoChangeListeners) {
            listener.notify(masterNodeInfoMap);
        }
    }

    @PreDestroy
    public void destroy() {
        this.executorService.shutdownNow();
    }

    class MasterDataListener
    implements SubscribeListener {
        MasterDataListener() {
        }

        public void notify(Event event) {
            String path = event.path();
            Event.Type type = event.type();
            if (ServerNodeManager.this.registryClient.isMasterPath(path)) {
                try {
                    if (type.equals((Object)Event.Type.ADD)) {
                        log.info("master node : {} added.", (Object)path);
                    } else if (type.equals((Object)Event.Type.REMOVE)) {
                        log.info("master node : {} down.", (Object)path);
                        ServerNodeManager.this.alertDao.sendServerStoppedAlert(1, path, "MASTER");
                        ServerNodeManager.this.listenerEventAlertManager.publishServerDownListenerEvent(path, "MASTER");
                    }
                }
                catch (Exception ex) {
                    log.error("MasterNodeListener capture data change and get data failed.", (Throwable)ex);
                }
            }
        }
    }

    class WorkerDataListener
    implements SubscribeListener {
        WorkerDataListener() {
        }

        public void notify(Event event) {
            String path = event.path();
            Event.Type type = event.type();
            String data = event.data();
            if (ServerNodeManager.this.registryClient.isWorkerPath(path)) {
                try {
                    String[] parts = path.split("/");
                    String workerAddress = parts[parts.length - 1];
                    log.debug("received subscribe event : {}", (Object)event);
                    if (type == Event.Type.ADD) {
                        log.info("Worker: {} added, currentNode : {}", (Object)path, (Object)workerAddress);
                    } else if (type == Event.Type.REMOVE) {
                        log.info("Worker node : {} down.", (Object)path);
                        this.removeSingleWorkerNode(workerAddress);
                        ServerNodeManager.this.alertDao.sendServerStoppedAlert(1, path, "WORKER");
                        ServerNodeManager.this.listenerEventAlertManager.publishServerDownListenerEvent(path, "WORKER");
                    } else if (type == Event.Type.UPDATE) {
                        this.syncSingleWorkerNodeInfo(workerAddress, (WorkerHeartBeat)JSONUtils.parseObject((String)data, WorkerHeartBeat.class));
                    }
                }
                catch (Exception ex) {
                    log.error("WorkerGroupListener capture data change and get data failed", (Throwable)ex);
                }
            }
        }

        private void syncSingleWorkerNodeInfo(String workerAddress, WorkerHeartBeat info) {
            ServerNodeManager.this.workerNodeInfoWriteLock.lock();
            try {
                ServerNodeManager.this.workerNodeInfo.put(workerAddress, info);
            }
            finally {
                ServerNodeManager.this.workerNodeInfoWriteLock.unlock();
            }
        }

        private void removeSingleWorkerNode(String workerAddress) {
            ServerNodeManager.this.workerNodeInfoWriteLock.lock();
            try {
                ServerNodeManager.this.workerNodeInfo.remove(workerAddress);
                log.info("remove worker node {} from workerNodeInfo when worker server down", (Object)workerAddress);
            }
            finally {
                ServerNodeManager.this.workerNodeInfoWriteLock.unlock();
            }
        }
    }

    class WorkerNodeInfoAndGroupDbSyncTask
    implements Runnable {
        WorkerNodeInfoAndGroupDbSyncTask() {
        }

        @Override
        public void run() {
            try {
                ServerNodeManager.this.refreshNodesAndGroupMappings();
            }
            catch (Exception e) {
                log.error("WorkerNodeInfoAndGroupDbSyncTask error:", (Throwable)e);
            }
        }
    }
}

