package org.apache.dolphinscheduler.server.zk;

import java.util.Date;
import java.util.Iterator;
import java.util.concurrent.ThreadFactory;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.utils.ThreadUtils;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/dolphinscheduler/server/zk/ZKMasterClient.class */
public class ZKMasterClient extends AbstractZKClient {
    private String masterZNode = null;
    private AlertDao alertDao = null;
    private ProcessDao processDao;
    private PathChildrenCache masterPathChildrenCache;
    private PathChildrenCache workerPathChildrenCache;
    private static final Logger logger = LoggerFactory.getLogger(ZKMasterClient.class);
    private static final ThreadFactory defaultThreadFactory = ThreadUtils.newGenericThreadFactory("Master-Main-Thread");
    private static ZKMasterClient zkMasterClient = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.dolphinscheduler.server.zk.ZKMasterClient$3, reason: invalid class name */
    /* loaded from: input_file:org/apache/dolphinscheduler/server/zk/ZKMasterClient$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type;
        static final /* synthetic */ int[] $SwitchMap$org$apache$dolphinscheduler$common$enums$ZKNodeType = new int[ZKNodeType.values().length];

        static {
            try {
                $SwitchMap$org$apache$dolphinscheduler$common$enums$ZKNodeType[ZKNodeType.MASTER.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$dolphinscheduler$common$enums$ZKNodeType[ZKNodeType.WORKER.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type = new int[PathChildrenCacheEvent.Type.values().length];
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_ADDED.ordinal()] = 1;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_REMOVED.ordinal()] = 2;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_UPDATED.ordinal()] = 3;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    private ZKMasterClient(ProcessDao processDao) {
        this.processDao = processDao;
        init();
    }

    private ZKMasterClient() {
    }

    public static synchronized ZKMasterClient getZKMasterClient(ProcessDao processDao) {
        if (zkMasterClient == null) {
            zkMasterClient = new ZKMasterClient(processDao);
        }
        zkMasterClient.processDao = processDao;
        return zkMasterClient;
    }

    public void init() {
        initDao();
        InterProcessMutex interProcessMutex = null;
        try {
            try {
                interProcessMutex = new InterProcessMutex(this.zkClient, getMasterStartUpLockPath());
                interProcessMutex.acquire();
                initSystemZNode();
                listenerMaster();
                listenerWorker();
                registerMaster();
                if (getActiveMasterNum() == 1) {
                    failoverWorker(null, true);
                    failoverMaster(null);
                }
                releaseMutex(interProcessMutex);
            } catch (Exception e) {
                logger.error("master start up  exception : " + e.getMessage(), e);
                releaseMutex(interProcessMutex);
            }
        } catch (Throwable th) {
            releaseMutex(interProcessMutex);
            throw th;
        }
    }

    public void close() {
        try {
            if (this.masterPathChildrenCache != null) {
                this.masterPathChildrenCache.close();
            }
            if (this.workerPathChildrenCache != null) {
                this.workerPathChildrenCache.close();
            }
            super.close();
        } catch (Exception e) {
        }
    }

    public void initDao() {
        this.alertDao = DaoFactory.getDaoInstance(AlertDao.class);
    }

    public AlertDao getAlertDao() {
        return this.alertDao;
    }

    public void registerMaster() {
        try {
            String registerServer = registerServer(ZKNodeType.MASTER);
            if (StringUtils.isEmpty(registerServer)) {
                System.exit(-1);
            }
            this.masterZNode = registerServer;
        } catch (Exception e) {
            logger.error("register master failure : " + e.getMessage(), e);
            System.exit(-1);
        }
    }

    public void listenerMaster() {
        this.masterPathChildrenCache = new PathChildrenCache(this.zkClient, getZNodeParentPath(ZKNodeType.MASTER), true, defaultThreadFactory);
        try {
            this.masterPathChildrenCache.start();
            this.masterPathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { // from class: org.apache.dolphinscheduler.server.zk.ZKMasterClient.1
                public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                    switch (AnonymousClass3.$SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[pathChildrenCacheEvent.getType().ordinal()]) {
                        case 1:
                            ZKMasterClient.logger.info("master node added : {}", pathChildrenCacheEvent.getData().getPath());
                            return;
                        case 2:
                            String path = pathChildrenCacheEvent.getData().getPath();
                            if (ZKMasterClient.this.checkServerSelfDead(ZKMasterClient.this.getHostByEventDataPath(path), ZKNodeType.MASTER)) {
                                return;
                            }
                            ZKMasterClient.this.removeZKNodePath(path, ZKNodeType.MASTER, true);
                            return;
                        case 3:
                        default:
                            return;
                    }
                }
            });
        } catch (Exception e) {
            logger.error("monitor master failed : " + e.getMessage(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeZKNodePath(String str, ZKNodeType zKNodeType, boolean z) {
        logger.info("{} node deleted : {}", zKNodeType.toString(), str);
        InterProcessMutex interProcessMutex = null;
        try {
            try {
                interProcessMutex = new InterProcessMutex(getZkClient(), getFailoverLockPath(zKNodeType));
                interProcessMutex.acquire();
                String hostByEventDataPath = getHostByEventDataPath(str);
                handleDeadServer(str, zKNodeType, "add");
                alertServerDown(hostByEventDataPath, zKNodeType);
                if (z) {
                    failoverServerWhenDown(hostByEventDataPath, zKNodeType);
                }
                releaseMutex(interProcessMutex);
            } catch (Exception e) {
                logger.error("{} server failover failed.", zKNodeType.toString());
                logger.error("failover exception : " + e.getMessage(), e);
                releaseMutex(interProcessMutex);
            }
        } catch (Throwable th) {
            releaseMutex(interProcessMutex);
            throw th;
        }
    }

    private void failoverServerWhenDown(String str, ZKNodeType zKNodeType) throws Exception {
        if (StringUtils.isEmpty(str)) {
            return;
        }
        switch (AnonymousClass3.$SwitchMap$org$apache$dolphinscheduler$common$enums$ZKNodeType[zKNodeType.ordinal()]) {
            case 1:
                failoverMaster(str);
                return;
            case 2:
                failoverWorker(str, true);
                return;
            default:
                return;
        }
    }

    private String getFailoverLockPath(ZKNodeType zKNodeType) {
        switch (AnonymousClass3.$SwitchMap$org$apache$dolphinscheduler$common$enums$ZKNodeType[zKNodeType.ordinal()]) {
            case 1:
                return getMasterFailoverLockPath();
            case 2:
                return getWorkerFailoverLockPath();
            default:
                return "";
        }
    }

    private void alertServerDown(String str, ZKNodeType zKNodeType) {
        String zKNodeType2 = zKNodeType.toString();
        for (int i = 0; i < 3; i++) {
            this.alertDao.sendServerStopedAlert(1, str, zKNodeType2);
        }
    }

    public void listenerWorker() {
        this.workerPathChildrenCache = new PathChildrenCache(this.zkClient, getZNodeParentPath(ZKNodeType.WORKER), true, defaultThreadFactory);
        try {
            this.workerPathChildrenCache.start();
            this.workerPathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { // from class: org.apache.dolphinscheduler.server.zk.ZKMasterClient.2
                public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) {
                    switch (AnonymousClass3.$SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[pathChildrenCacheEvent.getType().ordinal()]) {
                        case 1:
                            ZKMasterClient.logger.info("node added : {}", pathChildrenCacheEvent.getData().getPath());
                            return;
                        case 2:
                            String path = pathChildrenCacheEvent.getData().getPath();
                            ZKMasterClient.logger.info("node deleted : {}", pathChildrenCacheEvent.getData().getPath());
                            ZKMasterClient.this.removeZKNodePath(path, ZKNodeType.WORKER, true);
                            return;
                        default:
                            return;
                    }
                }
            });
        } catch (Exception e) {
            logger.error("listener worker failed : " + e.getMessage(), e);
        }
    }

    public String getMasterZNode() {
        return this.masterZNode;
    }

    private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) throws Exception {
        boolean z = true;
        if (taskInstance.getHost() == null) {
            return false;
        }
        if (checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER) && checkTaskAfterWorkerStart(taskInstance)) {
            z = false;
        }
        return z;
    }

    private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) {
        if (StringUtils.isEmpty(taskInstance.getHost())) {
            return false;
        }
        Date date = null;
        Iterator it = getServersList(ZKNodeType.WORKER).iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Server server = (Server) it.next();
            if (server.getHost().equals(taskInstance.getHost())) {
                date = server.getCreateTime();
                break;
            }
        }
        if (date != null) {
            return taskInstance.getStartTime().after(date);
        }
        return false;
    }

    private void failoverWorker(String str, boolean z) throws Exception {
        logger.info("start worker[{}] failover ...", str);
        for (TaskInstance taskInstance : this.processDao.queryNeedFailoverTaskInstances(str)) {
            if (!z || checkTaskInstanceNeedFailover(taskInstance)) {
                ProcessInstance findProcessInstanceDetailById = this.processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
                if (findProcessInstanceDetailById != null) {
                    taskInstance.setProcessInstance(findProcessInstanceDetailById);
                }
                ProcessUtils.killYarnJob(taskInstance);
                taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
                this.processDao.saveTaskInstance(taskInstance);
            }
        }
        logger.info("end worker[{}] failover ...", str);
    }

    private void failoverMaster(String str) {
        logger.info("start master failover ...");
        Iterator it = this.processDao.queryNeedFailoverProcessInstances(str).iterator();
        while (it.hasNext()) {
            this.processDao.processNeedFailoverProcessInstances((ProcessInstance) it.next());
        }
        logger.info("master failover end");
    }
}
