/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.zk;

import java.util.Date;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class ZKMasterClient
extends AbstractZKClient {
    private static final Logger logger = LoggerFactory.getLogger(ZKMasterClient.class);
    @Autowired
    private ProcessService processService;

    public void start() {
        InterProcessMutex mutex = null;
        try {
            String znodeLock = this.getMasterStartUpLockPath();
            mutex = new InterProcessMutex(this.getZkClient(), znodeLock);
            mutex.acquire();
            this.initSystemZNode();
            while (!this.checkZKNodeExists(OSUtils.getHost(), ZKNodeType.MASTER)) {
                ThreadUtils.sleep((long)1000L);
            }
            if (this.getActiveMasterNum() == 1) {
                this.removeZKNodePath(null, ZKNodeType.MASTER, true);
                this.removeZKNodePath(null, ZKNodeType.WORKER, true);
            }
            this.registerListener();
        }
        catch (Exception e) {
            try {
                logger.error("master start up exception", (Throwable)e);
            }
            catch (Throwable throwable) {
                this.releaseMutex(mutex);
                throw throwable;
            }
            this.releaseMutex(mutex);
        }
        this.releaseMutex(mutex);
    }

    public void close() {
        super.close();
    }

    protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
        if (path.startsWith(this.getZNodeParentPath(ZKNodeType.MASTER) + "/")) {
            this.handleMasterEvent(event, path);
        } else if (path.startsWith(this.getZNodeParentPath(ZKNodeType.WORKER) + "/")) {
            this.handleWorkerEvent(event, path);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) {
        block6: {
            block5: {
                ZKMasterClient.logger.info("{} node deleted : {}", (Object)zkNodeType.toString(), (Object)path);
                mutex = null;
                failoverPath = this.getFailoverLockPath(zkNodeType);
                mutex = new InterProcessMutex(this.getZkClient(), failoverPath);
                mutex.acquire();
                serverHost = null;
                if (!StringUtils.isNotEmpty((String)path)) ** GOTO lbl17
                serverHost = this.getHostByEventDataPath(path);
                if (!StringUtils.isEmpty((String)serverHost)) break block5;
                ZKMasterClient.logger.error("server down error: unknown path: {}", (Object)path);
                this.releaseMutex(mutex);
                return;
            }
            try {
                this.handleDeadServer(path, zkNodeType, "add");
lbl17:
                // 2 sources

                if (!failover) break block6;
                this.failoverServerWhenDown(serverHost, zkNodeType);
            }
            catch (Exception e) {
                try {
                    ZKMasterClient.logger.error("{} server failover failed.", (Object)zkNodeType.toString());
                    ZKMasterClient.logger.error("failover exception ", (Throwable)e);
                }
                catch (Throwable var7_8) {
                    this.releaseMutex(mutex);
                    throw var7_8;
                }
                this.releaseMutex(mutex);
            }
        }
        this.releaseMutex(mutex);
    }

    private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception {
        switch (zkNodeType) {
            case MASTER: {
                this.failoverMaster(serverHost);
                break;
            }
            case WORKER: {
                this.failoverWorker(serverHost, true);
            }
        }
    }

    private String getFailoverLockPath(ZKNodeType zkNodeType) {
        switch (zkNodeType) {
            case MASTER: {
                return this.getMasterFailoverLockPath();
            }
            case WORKER: {
                return this.getWorkerFailoverLockPath();
            }
        }
        return "";
    }

    public void handleMasterEvent(TreeCacheEvent event, String path) {
        switch (event.getType()) {
            case NODE_ADDED: {
                logger.info("master node added : {}", (Object)path);
                break;
            }
            case NODE_REMOVED: {
                this.removeZKNodePath(path, ZKNodeType.MASTER, true);
                break;
            }
        }
    }

    public void handleWorkerEvent(TreeCacheEvent event, String path) {
        switch (event.getType()) {
            case NODE_ADDED: {
                logger.info("worker node added : {}", (Object)path);
                break;
            }
            case NODE_REMOVED: {
                logger.info("worker node deleted : {}", (Object)path);
                this.removeZKNodePath(path, ZKNodeType.WORKER, true);
                break;
            }
        }
    }

    private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) throws Exception {
        boolean taskNeedFailover = true;
        if (taskInstance.getHost() == null) {
            return false;
        }
        if (this.checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER) && this.checkTaskAfterWorkerStart(taskInstance)) {
            taskNeedFailover = false;
        }
        return taskNeedFailover;
    }

    private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) {
        if (StringUtils.isEmpty((String)taskInstance.getHost())) {
            return false;
        }
        Date workerServerStartDate = null;
        List workerServers = this.getServersList(ZKNodeType.WORKER);
        for (Server workerServer : workerServers) {
            if (!taskInstance.getHost().equals(workerServer.getHost() + ":" + workerServer.getPort())) continue;
            workerServerStartDate = workerServer.getCreateTime();
            break;
        }
        if (workerServerStartDate != null) {
            return taskInstance.getStartTime().after(workerServerStartDate);
        }
        return false;
    }

    private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception {
        logger.info("start worker[{}] failover ...", (Object)workerHost);
        List needFailoverTaskInstanceList = this.processService.queryNeedFailoverTaskInstances(workerHost);
        for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
            if (needCheckWorkerAlive && !this.checkTaskInstanceNeedFailover(taskInstance)) continue;
            ProcessInstance processInstance = this.processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
            if (processInstance != null) {
                taskInstance.setProcessInstance(processInstance);
            }
            TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get().buildTaskInstanceRelatedInfo(taskInstance).buildProcessInstanceRelatedInfo(processInstance).create();
            ProcessUtils.killYarnJob(taskExecutionContext);
            taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
            this.processService.saveTaskInstance(taskInstance);
        }
        logger.info("end worker[{}] failover ...", (Object)workerHost);
    }

    private void failoverMaster(String masterHost) {
        logger.info("start master failover ...");
        List needFailoverProcessInstanceList = this.processService.queryNeedFailoverProcessInstances(masterHost);
        logger.info("failover process list size:{} ", (Object)needFailoverProcessInstanceList.size());
        for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
            logger.info("failover process instance id: {} host:{}", (Object)processInstance.getId(), (Object)processInstance.getHost());
            if ("NULL".equals(processInstance.getHost())) continue;
            this.processService.processNeedFailoverProcessInstances(processInstance);
        }
        logger.info("master failover end");
    }

    public InterProcessMutex blockAcquireMutex() throws Exception {
        InterProcessMutex mutex = new InterProcessMutex(this.getZkClient(), this.getMasterLockPath());
        mutex.acquire();
        return mutex;
    }
}

