/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.zk;

import java.util.Date;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.utils.ThreadUtils;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class ZKMasterClient
extends AbstractZKClient {
    private static final Logger logger = LoggerFactory.getLogger(ZKMasterClient.class);
    private static final ThreadFactory defaultThreadFactory = ThreadUtils.newGenericThreadFactory((String)"Master-Main-Thread");
    private String masterZNode = null;
    private AlertDao alertDao = null;
    @Autowired
    private ProcessDao processDao;

    private ZKMasterClient() {
    }

    public void init() {
        InterProcessMutex mutex;
        block4: {
            logger.info("initialize master client...");
            this.initDao();
            mutex = null;
            try {
                String znodeLock = this.getMasterStartUpLockPath();
                mutex = new InterProcessMutex(this.zkClient, znodeLock);
                mutex.acquire();
                this.initSystemZNode();
                this.registerMaster();
                if (this.getActiveMasterNum() != 1) break block4;
                this.failoverWorker(null, true);
                this.failoverMaster(null);
            }
            catch (Exception e) {
                try {
                    logger.error("master start up  exception : " + e.getMessage(), (Throwable)e);
                }
                catch (Throwable throwable) {
                    ZKMasterClient.releaseMutex(mutex);
                    throw throwable;
                }
                ZKMasterClient.releaseMutex((InterProcessMutex)mutex);
            }
        }
        ZKMasterClient.releaseMutex((InterProcessMutex)mutex);
    }

    public void initDao() {
        this.alertDao = (AlertDao)DaoFactory.getDaoInstance(AlertDao.class);
    }

    public AlertDao getAlertDao() {
        return this.alertDao;
    }

    public void registerMaster() {
        try {
            String serverPath = this.registerServer(ZKNodeType.MASTER);
            if (StringUtils.isEmpty((String)serverPath)) {
                System.exit(-1);
            }
            this.masterZNode = serverPath;
        }
        catch (Exception e) {
            logger.error("register master failure : " + e.getMessage(), (Throwable)e);
            System.exit(-1);
        }
    }

    protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
        if (path.startsWith(this.getZNodeParentPath(ZKNodeType.MASTER) + "/")) {
            this.handleMasterEvent(event, path);
        } else if (path.startsWith(this.getZNodeParentPath(ZKNodeType.WORKER) + "/")) {
            this.handleWorkerEvent(event, path);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) {
        InterProcessMutex mutex;
        block4: {
            logger.info("{} node deleted : {}", (Object)zkNodeType.toString(), (Object)path);
            mutex = null;
            try {
                String failoverPath = this.getFailoverLockPath(zkNodeType);
                mutex = new InterProcessMutex(this.getZkClient(), failoverPath);
                mutex.acquire();
                String serverHost = this.getHostByEventDataPath(path);
                this.handleDeadServer(path, zkNodeType, "add");
                this.alertServerDown(serverHost, zkNodeType);
                if (!failover) break block4;
                this.failoverServerWhenDown(serverHost, zkNodeType);
            }
            catch (Exception e) {
                try {
                    logger.error("{} server failover failed.", (Object)zkNodeType.toString());
                    logger.error("failover exception : " + e.getMessage(), (Throwable)e);
                }
                catch (Throwable throwable) {
                    ZKMasterClient.releaseMutex(mutex);
                    throw throwable;
                }
                ZKMasterClient.releaseMutex((InterProcessMutex)mutex);
            }
        }
        ZKMasterClient.releaseMutex((InterProcessMutex)mutex);
    }

    private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception {
        if (StringUtils.isEmpty((String)serverHost)) {
            return;
        }
        switch (zkNodeType) {
            case MASTER: {
                this.failoverMaster(serverHost);
                break;
            }
            case WORKER: {
                this.failoverWorker(serverHost, true);
            }
        }
    }

    private String getFailoverLockPath(ZKNodeType zkNodeType) {
        switch (zkNodeType) {
            case MASTER: {
                return this.getMasterFailoverLockPath();
            }
            case WORKER: {
                return this.getWorkerFailoverLockPath();
            }
        }
        return "";
    }

    private void alertServerDown(String serverHost, ZKNodeType zkNodeType) {
        String serverType = zkNodeType.toString();
        this.alertDao.sendServerStopedAlert(1, serverHost, serverType);
    }

    public void handleMasterEvent(TreeCacheEvent event, String path) {
        switch (event.getType()) {
            case NODE_ADDED: {
                logger.info("master node added : {}", (Object)path);
                break;
            }
            case NODE_REMOVED: {
                String serverHost = this.getHostByEventDataPath(path);
                if (this.checkServerSelfDead(serverHost, ZKNodeType.MASTER)) {
                    return;
                }
                this.removeZKNodePath(path, ZKNodeType.MASTER, true);
                break;
            }
        }
    }

    public void handleWorkerEvent(TreeCacheEvent event, String path) {
        switch (event.getType()) {
            case NODE_ADDED: {
                logger.info("worker node added : {}", (Object)path);
                break;
            }
            case NODE_REMOVED: {
                logger.info("worker node deleted : {}", (Object)path);
                this.removeZKNodePath(path, ZKNodeType.WORKER, true);
                break;
            }
        }
    }

    public String getMasterZNode() {
        return this.masterZNode;
    }

    private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) throws Exception {
        boolean taskNeedFailover = true;
        if (taskInstance.getHost() == null) {
            return false;
        }
        if (this.checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER) && this.checkTaskAfterWorkerStart(taskInstance)) {
            taskNeedFailover = false;
        }
        return taskNeedFailover;
    }

    private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) {
        if (StringUtils.isEmpty((String)taskInstance.getHost())) {
            return false;
        }
        Date workerServerStartDate = null;
        List workerServers = this.getServersList(ZKNodeType.WORKER);
        for (Server workerServer : workerServers) {
            if (!workerServer.getHost().equals(taskInstance.getHost())) continue;
            workerServerStartDate = workerServer.getCreateTime();
            break;
        }
        if (workerServerStartDate != null) {
            return taskInstance.getStartTime().after(workerServerStartDate);
        }
        return false;
    }

    private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception {
        logger.info("start worker[{}] failover ...", (Object)workerHost);
        List needFailoverTaskInstanceList = this.processDao.queryNeedFailoverTaskInstances(workerHost);
        for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
            if (needCheckWorkerAlive && !this.checkTaskInstanceNeedFailover(taskInstance)) continue;
            ProcessInstance instance = this.processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
            if (instance != null) {
                taskInstance.setProcessInstance(instance);
            }
            ProcessUtils.killYarnJob(taskInstance);
            taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
            this.processDao.saveTaskInstance(taskInstance);
        }
        logger.info("end worker[{}] failover ...", (Object)workerHost);
    }

    private void failoverMaster(String masterHost) {
        logger.info("start master failover ...");
        List needFailoverProcessInstanceList = this.processDao.queryNeedFailoverProcessInstances(masterHost);
        for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
            this.processDao.processNeedFailoverProcessInstances(processInstance);
        }
        logger.info("master failover end");
    }
}

