package org.apache.dolphinscheduler.server.master.zk;

import java.util.Date;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.zk.AbstractListener;
import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/dolphinscheduler/server/master/zk/ZKMasterClient.class */
public class ZKMasterClient extends AbstractZKClient {
    private static final Logger logger = LoggerFactory.getLogger(ZKMasterClient.class);

    @Autowired
    private ProcessService processService;

    @Autowired
    private MasterRegistry masterRegistry;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.dolphinscheduler.server.master.zk.ZKMasterClient$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/dolphinscheduler/server/master/zk/ZKMasterClient$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$dolphinscheduler$common$enums$ZKNodeType;
        static final /* synthetic */ int[] $SwitchMap$org$apache$curator$framework$recipes$cache$TreeCacheEvent$Type = new int[TreeCacheEvent.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$TreeCacheEvent$Type[TreeCacheEvent.Type.NODE_ADDED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$TreeCacheEvent$Type[TreeCacheEvent.Type.NODE_REMOVED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            $SwitchMap$org$apache$dolphinscheduler$common$enums$ZKNodeType = new int[ZKNodeType.values().length];
            try {
                $SwitchMap$org$apache$dolphinscheduler$common$enums$ZKNodeType[ZKNodeType.MASTER.ordinal()] = 1;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$dolphinscheduler$common$enums$ZKNodeType[ZKNodeType.WORKER.ordinal()] = 2;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* loaded from: input_file:org/apache/dolphinscheduler/server/master/zk/ZKMasterClient$NodeChangeListener.class */
    class NodeChangeListener extends AbstractListener {
        public NodeChangeListener(int i) {
            super(i);
        }

        protected void dataChanged(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent, String str) {
            if (str.startsWith(ZKMasterClient.this.getZNodeParentPath(ZKNodeType.MASTER) + "/")) {
                ZKMasterClient.this.handleMasterEvent(treeCacheEvent, str);
            } else if (str.startsWith(ZKMasterClient.this.getZNodeParentPath(ZKNodeType.WORKER) + "/")) {
                ZKMasterClient.this.handleWorkerEvent(treeCacheEvent, str);
            }
        }
    }

    public void start() {
        InterProcessMutex interProcessMutex = null;
        try {
            try {
                interProcessMutex = new InterProcessMutex(getZkClient(), getMasterStartUpLockPath());
                interProcessMutex.acquire();
                this.masterRegistry.registry();
                this.masterRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(this.masterRegistry.getMasterPath(), ZKNodeType.MASTER, "delete");
                initSystemZNode();
                while (!checkZKNodeExists(NetUtils.getHost(), ZKNodeType.MASTER)) {
                    ThreadUtils.sleep(1000L);
                }
                if (getActiveMasterNum() == 1) {
                    removeZKNodePath(null, ZKNodeType.MASTER, true);
                    removeZKNodePath(null, ZKNodeType.WORKER, true);
                }
                registerListener(new NodeChangeListener(Integer.MIN_VALUE));
                releaseMutex(interProcessMutex);
            } catch (Exception e) {
                logger.error("master start up exception", e);
                releaseMutex(interProcessMutex);
            }
        } catch (Throwable th) {
            releaseMutex(interProcessMutex);
            throw th;
        }
    }

    public void setStoppable(IStoppable iStoppable) {
        this.masterRegistry.getZookeeperRegistryCenter().setStoppable(iStoppable);
    }

    public void close() {
        this.masterRegistry.unRegistry();
        super.close();
    }

    private void removeZKNodePath(String str, ZKNodeType zKNodeType, boolean z) {
        logger.info("{} node deleted : {}", zKNodeType, str);
        try {
            try {
                InterProcessMutex interProcessMutex = new InterProcessMutex(getZkClient(), getFailoverLockPath(zKNodeType));
                interProcessMutex.acquire();
                String str2 = null;
                if (StringUtils.isNotEmpty(str)) {
                    str2 = getHostByEventDataPath(str);
                    if (StringUtils.isEmpty(str2)) {
                        logger.error("server down error: unknown path: {}", str);
                        releaseMutex(interProcessMutex);
                        return;
                    }
                    handleDeadServer(str, zKNodeType, "add");
                }
                if (z) {
                    failoverServerWhenDown(str2, zKNodeType);
                }
                releaseMutex(interProcessMutex);
            } catch (Exception e) {
                logger.error("{} server failover failed.", zKNodeType);
                logger.error("failover exception ", e);
                releaseMutex(null);
            }
        } catch (Throwable th) {
            releaseMutex(null);
            throw th;
        }
    }

    private void failoverServerWhenDown(String str, ZKNodeType zKNodeType) {
        switch (AnonymousClass1.$SwitchMap$org$apache$dolphinscheduler$common$enums$ZKNodeType[zKNodeType.ordinal()]) {
            case 1:
                failoverMaster(str);
                return;
            case 2:
                failoverWorker(str, true);
                return;
            default:
                return;
        }
    }

    private String getFailoverLockPath(ZKNodeType zKNodeType) {
        switch (AnonymousClass1.$SwitchMap$org$apache$dolphinscheduler$common$enums$ZKNodeType[zKNodeType.ordinal()]) {
            case 1:
                return getMasterFailoverLockPath();
            case 2:
                return getWorkerFailoverLockPath();
            default:
                return "";
        }
    }

    public void handleMasterEvent(TreeCacheEvent treeCacheEvent, String str) {
        switch (AnonymousClass1.$SwitchMap$org$apache$curator$framework$recipes$cache$TreeCacheEvent$Type[treeCacheEvent.getType().ordinal()]) {
            case 1:
                logger.info("master node added : {}", str);
                return;
            case 2:
                removeZKNodePath(str, ZKNodeType.MASTER, true);
                return;
            default:
                return;
        }
    }

    public void handleWorkerEvent(TreeCacheEvent treeCacheEvent, String str) {
        switch (AnonymousClass1.$SwitchMap$org$apache$curator$framework$recipes$cache$TreeCacheEvent$Type[treeCacheEvent.getType().ordinal()]) {
            case 1:
                logger.info("worker node added : {}", str);
                return;
            case 2:
                logger.info("worker node deleted : {}", str);
                removeZKNodePath(str, ZKNodeType.WORKER, true);
                return;
            default:
                return;
        }
    }

    private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) {
        boolean z = true;
        if (taskInstance.getHost() == null) {
            return false;
        }
        if (checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER) && checkTaskAfterWorkerStart(taskInstance)) {
            z = false;
        }
        return z;
    }

    private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) {
        if (StringUtils.isEmpty(taskInstance.getHost())) {
            return false;
        }
        Date date = null;
        Iterator it = getServerList(ZKNodeType.WORKER).iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Server server = (Server) it.next();
            if (taskInstance.getHost().equals(server.getHost() + ":" + server.getPort())) {
                date = server.getCreateTime();
                break;
            }
        }
        if (date != null) {
            return taskInstance.getStartTime().after(date);
        }
        return false;
    }

    private void failoverWorker(String str, boolean z) {
        logger.info("start worker[{}] failover ...", str);
        for (TaskInstance taskInstance : this.processService.queryNeedFailoverTaskInstances(str)) {
            if (!z || checkTaskInstanceNeedFailover(taskInstance)) {
                ProcessInstance findProcessInstanceDetailById = this.processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
                if (findProcessInstanceDetailById != null) {
                    taskInstance.setProcessInstance(findProcessInstanceDetailById);
                }
                ProcessUtils.killYarnJob(TaskExecutionContextBuilder.get().buildTaskInstanceRelatedInfo(taskInstance).buildProcessInstanceRelatedInfo(findProcessInstanceDetailById).create());
                taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
                this.processService.saveTaskInstance(taskInstance);
            }
        }
        logger.info("end worker[{}] failover ...", str);
    }

    private void failoverMaster(String str) {
        logger.info("start master failover ...");
        List<ProcessInstance> queryNeedFailoverProcessInstances = this.processService.queryNeedFailoverProcessInstances(str);
        logger.info("failover process list size:{} ", Integer.valueOf(queryNeedFailoverProcessInstances.size()));
        for (ProcessInstance processInstance : queryNeedFailoverProcessInstances) {
            logger.info("failover process instance id: {} host:{}", Integer.valueOf(processInstance.getId()), processInstance.getHost());
            if (!"NULL".equals(processInstance.getHost())) {
                this.processService.processNeedFailoverProcessInstances(processInstance);
            }
        }
        logger.info("master failover end");
    }

    public InterProcessMutex blockAcquireMutex() throws Exception {
        InterProcessMutex interProcessMutex = new InterProcessMutex(getZkClient(), getMasterLockPath());
        interProcessMutex.acquire();
        return interProcessMutex;
    }
}
