package org.apache.dolphinscheduler.server.master.registry;

import com.google.common.collect.Sets;
import java.time.Duration;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.class */
public class MasterRegistryClient {
    private static final Logger logger = LoggerFactory.getLogger(MasterRegistryClient.class);

    @Autowired
    private ProcessService processService;

    @Autowired
    private RegistryClient registryClient;

    @Autowired
    private MasterConfig masterConfig;
    private ScheduledExecutorService heartBeatExecutor;

    @Autowired
    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
    private long startupTime;
    private String localNodePath;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$dolphinscheduler$common$enums$NodeType;
        static final /* synthetic */ int[] $SwitchMap$org$apache$dolphinscheduler$registry$api$ConnectionState = new int[ConnectionState.values().length];

        static {
            try {
                $SwitchMap$org$apache$dolphinscheduler$registry$api$ConnectionState[ConnectionState.CONNECTED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$dolphinscheduler$registry$api$ConnectionState[ConnectionState.SUSPENDED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$dolphinscheduler$registry$api$ConnectionState[ConnectionState.RECONNECTED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$dolphinscheduler$registry$api$ConnectionState[ConnectionState.DISCONNECTED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            $SwitchMap$org$apache$dolphinscheduler$common$enums$NodeType = new int[NodeType.values().length];
            try {
                $SwitchMap$org$apache$dolphinscheduler$common$enums$NodeType[NodeType.MASTER.ordinal()] = 1;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$dolphinscheduler$common$enums$NodeType[NodeType.WORKER.ordinal()] = 2;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    public void init() {
        this.startupTime = System.currentTimeMillis();
        this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
    }

    public void start() {
        try {
            registry();
            this.registryClient.subscribe("/nodes", new MasterRegistryDataListener());
        } catch (Exception e) {
            logger.error("master start up exception", e);
            throw new RuntimeException("master start up error", e);
        }
    }

    public void setRegistryStoppable(IStoppable iStoppable) {
        this.registryClient.setStoppable(iStoppable);
    }

    public void closeRegistry() {
        deregister();
    }

    public void removeMasterNodePath(String str, NodeType nodeType, boolean z) {
        logger.info("{} node deleted : {}", nodeType, str);
        if (StringUtils.isEmpty(str)) {
            logger.error("server down error: empty path: {}, nodeType:{}", str, nodeType);
            return;
        }
        String hostByEventDataPath = this.registryClient.getHostByEventDataPath(str);
        if (StringUtils.isEmpty(hostByEventDataPath)) {
            logger.error("server down error: unknown path: {}, nodeType:{}", str, nodeType);
            return;
        }
        String failoverLockPath = getFailoverLockPath(nodeType, hostByEventDataPath);
        try {
            try {
                this.registryClient.getLock(failoverLockPath);
                if (!this.registryClient.exists(str)) {
                    logger.info("path: {} not exists", str);
                    this.registryClient.handleDeadServer(Collections.singleton(str), nodeType, "add");
                }
                if (z) {
                    failoverServerWhenDown(hostByEventDataPath, nodeType);
                }
            } catch (Exception e) {
                logger.error("{} server failover failed, host:{}", new Object[]{nodeType, hostByEventDataPath, e});
                this.registryClient.releaseLock(failoverLockPath);
            }
        } finally {
            this.registryClient.releaseLock(failoverLockPath);
        }
    }

    public void removeWorkerNodePath(String str, NodeType nodeType, boolean z) {
        logger.info("{} node deleted : {}", nodeType, str);
        try {
            String str2 = null;
            if (!StringUtils.isEmpty(str)) {
                str2 = this.registryClient.getHostByEventDataPath(str);
                if (StringUtils.isEmpty(str2)) {
                    logger.error("server down error: unknown path: {}", str);
                    return;
                } else if (!this.registryClient.exists(str)) {
                    logger.info("path: {} not exists", str);
                    this.registryClient.handleDeadServer(Collections.singleton(str), nodeType, "add");
                }
            }
            if (z) {
                failoverServerWhenDown(str2, nodeType);
            }
        } catch (Exception e) {
            logger.error("{} server failover failed", nodeType, e);
        }
    }

    private boolean isNeedToHandleDeadServer(String str, NodeType nodeType, Duration duration) {
        Date serverStartupTime;
        long max = Math.max(10000L, duration.toMillis());
        List<Server> serverList = this.registryClient.getServerList(nodeType);
        return CollectionUtils.isEmpty(serverList) || (serverStartupTime = getServerStartupTime(serverList, str)) == null || System.currentTimeMillis() - serverStartupTime.getTime() > max;
    }

    private void failoverServerWhenDown(String str, NodeType nodeType) {
        switch (AnonymousClass1.$SwitchMap$org$apache$dolphinscheduler$common$enums$NodeType[nodeType.ordinal()]) {
            case 1:
                failoverMaster(str);
                return;
            case 2:
                failoverWorker(str);
                return;
            default:
                return;
        }
    }

    public String getFailoverLockPath(NodeType nodeType, String str) {
        switch (AnonymousClass1.$SwitchMap$org$apache$dolphinscheduler$common$enums$NodeType[nodeType.ordinal()]) {
            case 1:
                return "/lock/failover/masters/" + str;
            case 2:
                return "/lock/failover/workers/" + str;
            default:
                return "";
        }
    }

    private boolean checkTaskInstanceNeedFailover(List<Server> list, TaskInstance taskInstance) {
        boolean z = true;
        if (taskInstance.getHost() == null) {
            return false;
        }
        if (checkTaskAfterWorkerStart(list, taskInstance)) {
            z = false;
        }
        return z;
    }

    private boolean checkTaskAfterWorkerStart(List<Server> list, TaskInstance taskInstance) {
        Date serverStartupTime;
        if (StringUtils.isEmpty(taskInstance.getHost()) || (serverStartupTime = getServerStartupTime(list, taskInstance.getHost())) == null) {
            return false;
        }
        return taskInstance.getStartTime() == null ? taskInstance.getSubmitTime().after(serverStartupTime) : taskInstance.getStartTime().after(serverStartupTime);
    }

    private Date getServerStartupTime(List<Server> list, String str) {
        if (CollectionUtils.isEmpty(list)) {
            return null;
        }
        Date date = null;
        Iterator<Server> it = list.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Server next = it.next();
            if (str.equals(next.getHost() + ":" + next.getPort())) {
                date = next.getCreateTime();
                break;
            }
        }
        return date;
    }

    private Date getServerStartupTime(NodeType nodeType, String str) {
        if (StringUtils.isEmpty(str)) {
            return null;
        }
        return getServerStartupTime(this.registryClient.getServerList(nodeType), str);
    }

    private void failoverWorker(String str) {
        if (StringUtils.isEmpty(str)) {
            return;
        }
        List<Server> serverList = this.registryClient.getServerList(NodeType.WORKER);
        long currentTimeMillis = System.currentTimeMillis();
        List<TaskInstance> queryNeedFailoverTaskInstances = this.processService.queryNeedFailoverTaskInstances(str);
        HashMap hashMap = new HashMap();
        logger.info("start worker[{}] failover, task list size:{}", str, Integer.valueOf(queryNeedFailoverTaskInstances.size()));
        for (TaskInstance taskInstance : queryNeedFailoverTaskInstances) {
            ProcessInstance processInstance = (ProcessInstance) hashMap.get(Integer.valueOf(taskInstance.getProcessInstanceId()));
            if (processInstance == null) {
                processInstance = this.processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
                if (processInstance == null) {
                    logger.error("failover task instance error, processInstance {} of taskInstance {} is null", Integer.valueOf(taskInstance.getProcessInstanceId()), Integer.valueOf(taskInstance.getId()));
                } else {
                    hashMap.put(Integer.valueOf(processInstance.getId()), processInstance);
                }
            }
            if (checkTaskInstanceNeedFailover(serverList, taskInstance) && processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
                logger.info("failover task instance id: {}, process instance id: {}", Integer.valueOf(taskInstance.getId()), Integer.valueOf(taskInstance.getProcessInstanceId()));
                failoverTaskInstance(processInstance, taskInstance);
            }
        }
        logger.info("end worker[{}] failover, useTime:{}ms", str, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    public void failoverMaster(String str) {
        if (StringUtils.isEmpty(str)) {
            return;
        }
        Date serverStartupTime = getServerStartupTime(NodeType.MASTER, str);
        List<Server> serverList = this.registryClient.getServerList(NodeType.WORKER);
        long currentTimeMillis = System.currentTimeMillis();
        List<ProcessInstance> queryNeedFailoverProcessInstances = this.processService.queryNeedFailoverProcessInstances(str);
        logger.info("start master[{}] failover, process list size:{}", str, Integer.valueOf(queryNeedFailoverProcessInstances.size()));
        for (ProcessInstance processInstance : queryNeedFailoverProcessInstances) {
            if (!"NULL".equals(processInstance.getHost())) {
                for (TaskInstance taskInstance : this.processService.findValidTaskListByProcessId(Integer.valueOf(processInstance.getId()))) {
                    if (!"NULL".equals(taskInstance.getHost()) && !taskInstance.getState().typeIsFinished() && checkTaskInstanceNeedFailover(serverList, taskInstance)) {
                        logger.info("failover task instance id: {}, process instance id: {}", Integer.valueOf(taskInstance.getId()), Integer.valueOf(taskInstance.getProcessInstanceId()));
                        failoverTaskInstance(processInstance, taskInstance);
                    }
                }
                if (serverStartupTime == null || processInstance.getRestartTime() == null || !processInstance.getRestartTime().after(serverStartupTime)) {
                    logger.info("failover process instance id: {}", Integer.valueOf(processInstance.getId()));
                    this.processService.processNeedFailoverProcessInstances(processInstance);
                }
            }
        }
        logger.info("master[{}] failover end, useTime:{}ms", str, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    private void failoverTaskInstance(ProcessInstance processInstance, TaskInstance taskInstance) {
        if (taskInstance == null) {
            logger.error("failover task instance error, taskInstance is null");
            return;
        }
        if (processInstance == null) {
            logger.error("failover task instance error, processInstance {} of taskInstance {} is null", Integer.valueOf(taskInstance.getProcessInstanceId()), Integer.valueOf(taskInstance.getId()));
            return;
        }
        taskInstance.setProcessInstance(processInstance);
        TaskExecutionContext create = TaskExecutionContextBuilder.get().buildTaskInstanceRelatedInfo(taskInstance).buildProcessInstanceRelatedInfo(processInstance).create();
        if (this.masterConfig.isKillYarnJobWhenTaskFailover()) {
            ProcessUtils.killYarnJob(create);
        }
        taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
        this.processService.saveTaskInstance(taskInstance);
        StateEvent stateEvent = new StateEvent();
        stateEvent.setTaskInstanceId(taskInstance.getId());
        stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
        stateEvent.setProcessInstanceId(processInstance.getId());
        stateEvent.setExecutionStatus(taskInstance.getState());
        this.workflowExecuteThreadPool.submitStateEvent(stateEvent);
    }

    public void registry() {
        String addr = NetUtils.getAddr(this.masterConfig.getListenPort());
        this.localNodePath = getMasterPath();
        int heartbeatInterval = this.masterConfig.getHeartbeatInterval();
        Runnable heartBeatTask = new HeartBeatTask(this.startupTime, this.masterConfig.getMaxCpuLoadAvg(), this.masterConfig.getReservedMemory(), Sets.newHashSet(new String[]{getMasterPath()}), "master", this.registryClient);
        this.registryClient.remove(this.localNodePath);
        this.registryClient.persistEphemeral(this.localNodePath, heartBeatTask.getHeartBeatInfo());
        while (!this.registryClient.checkNodeExists(NetUtils.getHost(), NodeType.MASTER)) {
            ThreadUtils.sleep(1000L);
        }
        ThreadUtils.sleep(1000L);
        this.registryClient.handleDeadServer(Collections.singleton(this.localNodePath), NodeType.MASTER, "delete");
        this.registryClient.addConnectionStateListener(this::handleConnectionState);
        this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, heartbeatInterval, heartbeatInterval, TimeUnit.SECONDS);
        logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", addr, Integer.valueOf(heartbeatInterval));
    }

    public void handleConnectionState(ConnectionState connectionState) {
        switch (AnonymousClass1.$SwitchMap$org$apache$dolphinscheduler$registry$api$ConnectionState[connectionState.ordinal()]) {
            case 1:
                logger.debug("registry connection state is {}", connectionState);
                return;
            case 2:
                logger.warn("registry connection state is {}, ready to retry connection", connectionState);
                return;
            case 3:
                logger.debug("registry connection state is {}, clean the node info", connectionState);
                this.registryClient.persistEphemeral(this.localNodePath, "");
                return;
            case 4:
                logger.warn("registry connection state is {}, ready to stop myself", connectionState);
                this.registryClient.getStoppable().stop("registry connection state is DISCONNECTED, stop myself");
                return;
            default:
                return;
        }
    }

    public void deregister() {
        try {
            String localAddress = getLocalAddress();
            this.registryClient.remove(getMasterPath());
            logger.info("master node : {} unRegistry to register center.", localAddress);
            this.heartBeatExecutor.shutdown();
            logger.info("heartbeat executor shutdown");
            this.registryClient.close();
        } catch (Exception e) {
            logger.error("remove registry path exception ", e);
        }
    }

    public String getMasterPath() {
        return "/nodes/master/" + getLocalAddress();
    }

    public String getLocalAddress() {
        return NetUtils.getAddr(this.masterConfig.getListenPort());
    }
}
