/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.common.zk;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.ResInfo;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractZKClient {
    private static final Logger logger = LoggerFactory.getLogger(AbstractZKClient.class);
    protected static Configuration conf;
    protected CuratorFramework zkClient = null;
    protected IStoppable stoppable = null;

    public AbstractZKClient() {
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(conf.getInt("zookeeper.retry.sleep"), conf.getInt("zookeeper.retry.maxtime"));
        try {
            this.zkClient = CuratorFrameworkFactory.builder().connectString(AbstractZKClient.getZookeeperQuorum()).retryPolicy((RetryPolicy)retryPolicy).sessionTimeoutMs(1000 * conf.getInt("zookeeper.session.timeout")).connectionTimeoutMs(1000 * conf.getInt("zookeeper.connection.timeout")).build();
            this.zkClient.start();
            this.initStateLister();
        }
        catch (Exception e) {
            logger.error("create zookeeper connect failed : " + e.getMessage(), (Throwable)e);
            System.exit(-1);
        }
    }

    public void initStateLister() {
        if (this.zkClient == null) {
            return;
        }
        ConnectionStateListener csLister = new ConnectionStateListener(){

            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                logger.info("state changed , current state : " + newState.name());
                if (newState == ConnectionState.LOST) {
                    logger.info("current zookeepr connection state : connection lost ");
                }
            }
        };
        this.zkClient.getConnectionStateListenable().addListener((Object)csLister);
    }

    public void start() {
        this.zkClient.start();
        logger.info("zookeeper start ...");
    }

    public void close() {
        this.zkClient.getZookeeperClient().close();
        this.zkClient.close();
        logger.info("zookeeper close ...");
    }

    public void heartBeatForZk(String znode, String serverType) {
        try {
            if (this.zkClient.getState() == CuratorFrameworkState.STOPPED || this.checkIsDeadServer(znode, serverType)) {
                this.stoppable.stop("i was judged to death, release resources and stop myself");
                return;
            }
            byte[] bytes = (byte[])this.zkClient.getData().forPath(znode);
            String resInfoStr = new String(bytes);
            String[] splits = resInfoStr.split(",");
            if (splits.length != 7) {
                return;
            }
            String str = splits[0] + "," + splits[1] + "," + OSUtils.cpuUsage() + "," + OSUtils.memoryUsage() + "," + OSUtils.loadAverage() + "," + splits[5] + "," + DateUtils.dateToString(new Date());
            this.zkClient.setData().forPath(znode, str.getBytes());
        }
        catch (Exception e) {
            logger.error("heartbeat for zk failed : " + e.getMessage(), (Throwable)e);
            this.stoppable.stop("heartbeat for zk exception, release resources and stop myself");
        }
    }

    protected boolean checkIsDeadServer(String zNode, String serverType) throws Exception {
        String[] zNodesPath = zNode.split("\\/");
        String ipSeqNo = zNodesPath[zNodesPath.length - 1];
        String type = serverType.equals("master") ? "master" : "worker";
        String deadServerPath = this.getDeadZNodeParentPath() + "/" + type + "_" + ipSeqNo;
        return this.zkClient.checkExists().forPath(zNode) == null || this.zkClient.checkExists().forPath(deadServerPath) != null;
    }

    public void removeDeadServerByHost(String host, String serverType) throws Exception {
        List deadServers = (List)this.zkClient.getChildren().forPath(this.getDeadZNodeParentPath());
        for (String serverPath : deadServers) {
            if (!serverPath.startsWith(serverType + "_" + host)) continue;
            String server = this.getDeadZNodeParentPath() + "/" + serverPath;
            if (this.zkClient.checkExists().forPath(server) == null) continue;
            this.zkClient.delete().forPath(server);
            logger.info("{} server {} deleted from zk dead server path success", (Object)serverType, (Object)host);
        }
    }

    private String createZNodePath(ZKNodeType zkNodeType) throws Exception {
        String heartbeatZKInfo = ResInfo.getHeartBeatInfo(new Date());
        String parentPath = this.getZNodeParentPath(zkNodeType);
        String serverPathPrefix = parentPath + "/" + OSUtils.getHost();
        String registerPath = (String)((ACLBackgroundPathAndBytesable)this.zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(serverPathPrefix + "_", heartbeatZKInfo.getBytes());
        logger.info("register {} node {} success", (Object)zkNodeType.toString(), (Object)registerPath);
        return registerPath;
    }

    public String registerServer(ZKNodeType zkNodeType) throws Exception {
        String registerPath = null;
        String host = OSUtils.getHost();
        if (this.checkZKNodeExists(host, zkNodeType)) {
            logger.error("register failure , {} server already started on host : {}", (Object)zkNodeType.toString(), (Object)host);
            return registerPath;
        }
        registerPath = this.createZNodePath(zkNodeType);
        this.handleDeadServer(registerPath, zkNodeType, "delete");
        return registerPath;
    }

    public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) throws Exception {
        String type;
        String[] zNodesPath = zNode.split("\\/");
        String ipSeqNo = zNodesPath[zNodesPath.length - 1];
        String string = type = zkNodeType == ZKNodeType.MASTER ? "master" : "worker";
        if (opType.equals("delete")) {
            String[] ipAndSeqNo = ipSeqNo.split("_");
            String ip = ipAndSeqNo[0];
            this.removeDeadServerByHost(ip, type);
        } else if (opType.equals("add")) {
            String deadServerPath = this.getDeadZNodeParentPath() + "/" + type + "_" + ipSeqNo;
            if (this.zkClient.checkExists().forPath(deadServerPath) == null) {
                this.zkClient.create().forPath(deadServerPath, (type + "_" + ipSeqNo).getBytes());
                logger.info("{} server dead , and {} added to zk dead server path success", (Object)zkNodeType.toString(), (Object)zNode);
            }
        }
    }

    public void setStoppable(IStoppable serverStoppable) {
        this.stoppable = serverStoppable;
    }

    public int getActiveMasterNum() {
        List childrenList = new ArrayList();
        try {
            if (this.zkClient.checkExists().forPath(this.getZNodeParentPath(ZKNodeType.MASTER)) != null) {
                childrenList = (List)this.zkClient.getChildren().forPath(this.getZNodeParentPath(ZKNodeType.MASTER));
                return (int)childrenList;
            }
        }
        catch (Exception e) {
            if (e.getMessage().contains("java.lang.IllegalStateException: instance must be started")) {
                logger.error("zookeeper service not started", (Throwable)e);
            }
            logger.error(e.getMessage(), (Throwable)e);
        }
        finally {
            return childrenList.size();
        }
    }

    public static String getZookeeperQuorum() {
        String[] zookeeperParamslist;
        StringBuilder sb = new StringBuilder();
        for (String param : zookeeperParamslist = conf.getStringArray("zookeeper.quorum")) {
            sb.append(param).append(",");
        }
        if (sb.length() > 0) {
            sb.deleteCharAt(sb.length() - 1);
        }
        return sb.toString();
    }

    public List<Server> getServersList(ZKNodeType zkNodeType) {
        Map<String, String> masterMap = this.getServerMaps(zkNodeType);
        String parentPath = this.getZNodeParentPath(zkNodeType);
        ArrayList<Server> masterServers = new ArrayList<Server>();
        int i = 0;
        for (Map.Entry<String, String> entry : masterMap.entrySet()) {
            Server masterServer = ResInfo.parseHeartbeatForZKInfo(entry.getValue());
            masterServer.setZkDirectory(parentPath + "/" + entry.getKey());
            masterServer.setId(i);
            ++i;
            masterServers.add(masterServer);
        }
        return masterServers;
    }

    public Map<String, String> getServerMaps(ZKNodeType zkNodeType) {
        HashMap<String, String> masterMap = new HashMap<String, String>();
        try {
            String path = this.getZNodeParentPath(zkNodeType);
            List serverList = (List)this.getZkClient().getChildren().forPath(path);
            for (String server : serverList) {
                byte[] bytes = (byte[])this.getZkClient().getData().forPath(path + "/" + server);
                masterMap.putIfAbsent(server, new String(bytes));
            }
        }
        catch (Exception e) {
            logger.error("get server list failed : " + e.getMessage(), (Throwable)e);
        }
        return masterMap;
    }

    public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) {
        String path = this.getZNodeParentPath(zkNodeType);
        if (StringUtils.isEmpty((CharSequence)path)) {
            logger.error("check zk node exists error, host:{}, zk node type:{}", (Object)host, (Object)zkNodeType.toString());
            return false;
        }
        Map<String, String> serverMaps = this.getServerMaps(zkNodeType);
        for (String hostKey : serverMaps.keySet()) {
            if (!hostKey.startsWith(host)) continue;
            return true;
        }
        return false;
    }

    public CuratorFramework getZkClient() {
        return this.zkClient;
    }

    protected String getWorkerZNodeParentPath() {
        return conf.getString("zookeeper.dolphinscheduler.workers");
    }

    protected String getMasterZNodeParentPath() {
        return conf.getString("zookeeper.dolphinscheduler.masters");
    }

    public String getMasterLockPath() {
        return conf.getString("zookeeper.dolphinscheduler.lock.masters");
    }

    public String getZNodeParentPath(ZKNodeType zkNodeType) {
        String path = "";
        switch (zkNodeType) {
            case MASTER: {
                return this.getMasterZNodeParentPath();
            }
            case WORKER: {
                return this.getWorkerZNodeParentPath();
            }
            case DEAD_SERVER: {
                return this.getDeadZNodeParentPath();
            }
        }
        return path;
    }

    protected String getDeadZNodeParentPath() {
        return conf.getString("zookeeper.dolphinscheduler.dead.servers");
    }

    public String getMasterStartUpLockPath() {
        return conf.getString("zookeeper.dolphinscheduler.lock.failover.startup.masters");
    }

    public String getMasterFailoverLockPath() {
        return conf.getString("zookeeper.dolphinscheduler.lock.failover.masters");
    }

    public String getWorkerFailoverLockPath() {
        return conf.getString("zookeeper.dolphinscheduler.lock.failover.workers");
    }

    public static void releaseMutex(InterProcessMutex mutex) {
        if (mutex != null) {
            try {
                mutex.release();
            }
            catch (Exception e) {
                if (e.getMessage().equals("instance must be started before calling this method")) {
                    logger.warn("lock release");
                }
                logger.error("lock release failed : " + e.getMessage(), (Throwable)e);
            }
        }
    }

    protected void initSystemZNode() {
        try {
            this.createNodePath(this.getMasterZNodeParentPath());
            this.createNodePath(this.getWorkerZNodeParentPath());
            this.createNodePath(this.getDeadZNodeParentPath());
        }
        catch (Exception e) {
            logger.error("init system znode failed : " + e.getMessage(), (Throwable)e);
        }
    }

    private void createNodePath(String zNodeParentPath) throws Exception {
        if (null == this.zkClient.checkExists().forPath(zNodeParentPath)) {
            ((ACLBackgroundPathAndBytesable)this.zkClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT)).forPath(zNodeParentPath);
        }
    }

    protected boolean checkServerSelfDead(String serverHost, ZKNodeType zkNodeType) {
        if (serverHost.equals(OSUtils.getHost())) {
            logger.error("{} server({}) of myself dead , stopping...", (Object)zkNodeType.toString(), (Object)serverHost);
            this.stoppable.stop(String.format(" {} server {} of myself dead , stopping...", zkNodeType.toString(), serverHost));
            return true;
        }
        return false;
    }

    protected String getHostByEventDataPath(String path) {
        int endIndex;
        int startIndex = path.lastIndexOf("/") + 1;
        if (startIndex >= (endIndex = path.lastIndexOf("_"))) {
            logger.error("parse ip error");
            return "";
        }
        return path.substring(startIndex, endIndex);
    }

    public InterProcessMutex acquireZkLock(CuratorFramework zkClient, String zNodeLockPath) throws Exception {
        InterProcessMutex mutex = new InterProcessMutex(zkClient, zNodeLockPath);
        mutex.acquire();
        return mutex;
    }

    public String toString() {
        return "AbstractZKClient{zkClient=" + this.zkClient + ", deadServerZNodeParentPath='" + this.getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\'' + ", masterZNodeParentPath='" + this.getZNodeParentPath(ZKNodeType.MASTER) + '\'' + ", workerZNodeParentPath='" + this.getZNodeParentPath(ZKNodeType.WORKER) + '\'' + ", stoppable=" + this.stoppable + '}';
    }

    static {
        try {
            conf = new PropertiesConfiguration("zookeeper.properties");
        }
        catch (ConfigurationException e) {
            logger.error("load configuration failed : " + e.getMessage(), (Throwable)e);
            System.exit(1);
        }
    }
}

