/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.registry;

import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.collections.CollectionUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.service.zk.AbstractListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class ZookeeperNodeManager
implements InitializingBean {
    private final Logger logger = LoggerFactory.getLogger(ZookeeperNodeManager.class);
    private final Lock masterLock = new ReentrantLock();
    private final Lock workerGroupLock = new ReentrantLock();
    private final ConcurrentHashMap<String, Set<String>> workerGroupNodes = new ConcurrentHashMap();
    private final Set<String> masterNodes = new HashSet<String>();
    @Autowired
    private ZookeeperRegistryCenter registryCenter;
    @Autowired
    private AlertDao alertDao;

    public void afterPropertiesSet() throws Exception {
        this.load();
        this.registryCenter.getZookeeperCachedOperator().addListener((TreeCacheListener)new MasterNodeListener());
        this.registryCenter.getZookeeperCachedOperator().addListener((TreeCacheListener)new WorkerGroupNodeListener());
    }

    private void load() {
        Set<String> masterNodes = this.registryCenter.getMasterNodesDirectly();
        this.syncMasterNodes(masterNodes);
        Set<String> workerGroups = this.registryCenter.getWorkerGroupDirectly();
        for (String workerGroup : workerGroups) {
            this.syncWorkerGroupNodes(workerGroup, this.registryCenter.getWorkerGroupNodesDirectly(workerGroup));
        }
    }

    public Set<String> getMasterNodes() {
        this.masterLock.lock();
        try {
            Set<String> set = Collections.unmodifiableSet(this.masterNodes);
            return set;
        }
        finally {
            this.masterLock.unlock();
        }
    }

    private void syncMasterNodes(Set<String> nodes) {
        this.masterLock.lock();
        try {
            this.masterNodes.clear();
            this.masterNodes.addAll(nodes);
        }
        finally {
            this.masterLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void syncWorkerGroupNodes(String workerGroup, Set<String> nodes) {
        this.workerGroupLock.lock();
        try {
            workerGroup = workerGroup.toLowerCase();
            Set workerNodes = this.workerGroupNodes.getOrDefault(workerGroup, new HashSet());
            workerNodes.clear();
            workerNodes.addAll(nodes);
            this.workerGroupNodes.put(workerGroup, workerNodes);
        }
        finally {
            this.workerGroupLock.unlock();
        }
    }

    public Map<String, Set<String>> getWorkerGroupNodes() {
        return Collections.unmodifiableMap(this.workerGroupNodes);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Set<String> getWorkerGroupNodes(String workerGroup) {
        this.workerGroupLock.lock();
        try {
            Set<String> nodes;
            if (StringUtils.isEmpty((CharSequence)workerGroup)) {
                workerGroup = "default";
            }
            if (CollectionUtils.isNotEmpty(nodes = this.workerGroupNodes.get(workerGroup = workerGroup.toLowerCase()))) {
                Set<String> set = Collections.unmodifiableSet(nodes);
                return set;
            }
            Set<String> set = nodes;
            return set;
        }
        finally {
            this.workerGroupLock.unlock();
        }
    }

    public void close() {
        this.registryCenter.close();
    }

    class MasterNodeListener
    extends AbstractListener {
        MasterNodeListener() {
        }

        protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
            if (ZookeeperNodeManager.this.registryCenter.isMasterPath(path)) {
                try {
                    if (event.getType() == TreeCacheEvent.Type.NODE_ADDED) {
                        ZookeeperNodeManager.this.logger.info("master node : {} added.", (Object)path);
                        HashSet previousNodes = new HashSet(ZookeeperNodeManager.this.masterNodes);
                        Set<String> currentNodes = ZookeeperNodeManager.this.registryCenter.getMasterNodesDirectly();
                        ZookeeperNodeManager.this.syncMasterNodes(currentNodes);
                    } else if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED) {
                        ZookeeperNodeManager.this.logger.info("master node : {} down.", (Object)path);
                        HashSet previousNodes = new HashSet(ZookeeperNodeManager.this.masterNodes);
                        Set<String> currentNodes = ZookeeperNodeManager.this.registryCenter.getMasterNodesDirectly();
                        ZookeeperNodeManager.this.syncMasterNodes(currentNodes);
                        ZookeeperNodeManager.this.alertDao.sendServerStopedAlert(1, path, "MASTER");
                    }
                }
                catch (Exception ex) {
                    ZookeeperNodeManager.this.logger.error("MasterNodeListener capture data change and get data failed.", (Throwable)ex);
                }
            }
        }
    }

    class WorkerGroupNodeListener
    extends AbstractListener {
        WorkerGroupNodeListener() {
        }

        protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
            if (ZookeeperNodeManager.this.registryCenter.isWorkerPath(path)) {
                try {
                    if (event.getType() == TreeCacheEvent.Type.NODE_ADDED) {
                        ZookeeperNodeManager.this.logger.info("worker group node : {} added.", (Object)path);
                        String group = this.parseGroup(path);
                        Set workerNodes = ZookeeperNodeManager.this.workerGroupNodes.getOrDefault(group, new HashSet());
                        HashSet previousNodes = new HashSet(workerNodes);
                        Set<String> currentNodes = ZookeeperNodeManager.this.registryCenter.getWorkerGroupNodesDirectly(group);
                        ZookeeperNodeManager.this.logger.info("currentNodes : {}", currentNodes);
                        ZookeeperNodeManager.this.syncWorkerGroupNodes(group, currentNodes);
                    } else if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED) {
                        ZookeeperNodeManager.this.logger.info("worker group node : {} down.", (Object)path);
                        String group = this.parseGroup(path);
                        Set workerNodes = ZookeeperNodeManager.this.workerGroupNodes.getOrDefault(group, new HashSet());
                        HashSet previousNodes = new HashSet(workerNodes);
                        Set<String> currentNodes = ZookeeperNodeManager.this.registryCenter.getWorkerGroupNodesDirectly(group);
                        ZookeeperNodeManager.this.syncWorkerGroupNodes(group, currentNodes);
                        ZookeeperNodeManager.this.alertDao.sendServerStopedAlert(1, path, "WORKER");
                    }
                }
                catch (IllegalArgumentException ignore) {
                    ZookeeperNodeManager.this.logger.warn(ignore.getMessage());
                }
                catch (Exception ex) {
                    ZookeeperNodeManager.this.logger.error("WorkerGroupListener capture data change and get data failed", (Throwable)ex);
                }
            }
        }

        private String parseGroup(String path) {
            String[] parts = path.split("\\/");
            if (parts.length != 6) {
                throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path));
            }
            String group = parts[4];
            return group;
        }
    }
}

