package org.apache.dolphinscheduler.server.master.service;

import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/dolphinscheduler/server/master/service/FailoverService.class */
public class FailoverService {
    private static final Logger LOGGER = LoggerFactory.getLogger(FailoverService.class);
    private final RegistryClient registryClient;
    private final MasterConfig masterConfig;
    private final ProcessService processService;
    private final WorkflowExecuteThreadPool workflowExecuteThreadPool;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.dolphinscheduler.server.master.service.FailoverService$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/dolphinscheduler/server/master/service/FailoverService$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$dolphinscheduler$common$enums$NodeType = new int[NodeType.values().length];

        static {
            try {
                $SwitchMap$org$apache$dolphinscheduler$common$enums$NodeType[NodeType.MASTER.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$dolphinscheduler$common$enums$NodeType[NodeType.WORKER.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public FailoverService(RegistryClient registryClient, MasterConfig masterConfig, ProcessService processService, WorkflowExecuteThreadPool workflowExecuteThreadPool) {
        this.registryClient = registryClient;
        this.masterConfig = masterConfig;
        this.processService = processService;
        this.workflowExecuteThreadPool = workflowExecuteThreadPool;
    }

    public void checkMasterFailover() {
        List<String> needFailoverMasterServers = getNeedFailoverMasterServers();
        if (CollectionUtils.isEmpty(needFailoverMasterServers)) {
            return;
        }
        LOGGER.info("need failover hosts:{}", needFailoverMasterServers);
        Iterator<String> it = needFailoverMasterServers.iterator();
        while (it.hasNext()) {
            failoverMasterWithLock(it.next());
        }
    }

    public void failoverServerWhenDown(String str, NodeType nodeType) {
        switch (AnonymousClass1.$SwitchMap$org$apache$dolphinscheduler$common$enums$NodeType[nodeType.ordinal()]) {
            case 1:
                failoverMasterWithLock(str);
                return;
            case 2:
                failoverWorker(str);
                return;
            default:
                return;
        }
    }

    private void failoverMasterWithLock(String str) {
        String failoverLockPath = getFailoverLockPath(NodeType.MASTER, str);
        try {
            try {
                this.registryClient.getLock(failoverLockPath);
                failoverMaster(str);
                this.registryClient.releaseLock(failoverLockPath);
            } catch (Exception e) {
                LOGGER.error("{} server failover failed, host:{}", new Object[]{NodeType.MASTER, str, e});
                this.registryClient.releaseLock(failoverLockPath);
            }
        } catch (Throwable th) {
            this.registryClient.releaseLock(failoverLockPath);
            throw th;
        }
    }

    private void failoverMaster(String str) {
        if (StringUtils.isEmpty(str)) {
            return;
        }
        Date serverStartupTime = getServerStartupTime(NodeType.MASTER, str);
        long currentTimeMillis = System.currentTimeMillis();
        List<ProcessInstance> queryNeedFailoverProcessInstances = this.processService.queryNeedFailoverProcessInstances(str);
        LOGGER.info("start master[{}] failover, process list size:{}", str, Integer.valueOf(queryNeedFailoverProcessInstances.size()));
        List<Server> serverList = this.registryClient.getServerList(NodeType.WORKER);
        serverList.addAll(this.registryClient.getServerList(NodeType.MASTER));
        for (ProcessInstance processInstance : queryNeedFailoverProcessInstances) {
            if (!"NULL".equals(processInstance.getHost())) {
                for (TaskInstance taskInstance : this.processService.findValidTaskListByProcessId(Integer.valueOf(processInstance.getId()))) {
                    LOGGER.info("failover task instance id: {}, process instance id: {}", Integer.valueOf(taskInstance.getId()), Integer.valueOf(taskInstance.getProcessInstanceId()));
                    failoverTaskInstance(processInstance, taskInstance, serverList);
                }
                if (serverStartupTime == null || processInstance.getRestartTime() == null || !processInstance.getRestartTime().after(serverStartupTime)) {
                    LOGGER.info("failover process instance id: {}", Integer.valueOf(processInstance.getId()));
                    processInstance.setHost("NULL");
                    this.processService.processNeedFailoverProcessInstances(processInstance);
                }
            }
        }
        LOGGER.info("master[{}] failover end, useTime:{}ms", str, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    private void failoverWorker(String str) {
        if (StringUtils.isEmpty(str)) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        List<TaskInstance> queryNeedFailoverTaskInstances = this.processService.queryNeedFailoverTaskInstances(str);
        HashMap hashMap = new HashMap();
        LOGGER.info("start worker[{}] failover, task list size:{}", str, Integer.valueOf(queryNeedFailoverTaskInstances.size()));
        List<Server> serverList = this.registryClient.getServerList(NodeType.WORKER);
        for (TaskInstance taskInstance : queryNeedFailoverTaskInstances) {
            ProcessInstance processInstance = (ProcessInstance) hashMap.get(Integer.valueOf(taskInstance.getProcessInstanceId()));
            if (processInstance == null) {
                processInstance = this.processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
                if (processInstance == null) {
                    LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null", Integer.valueOf(taskInstance.getProcessInstanceId()), Integer.valueOf(taskInstance.getId()));
                } else {
                    hashMap.put(Integer.valueOf(processInstance.getId()), processInstance);
                }
            }
            if (processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
                LOGGER.info("failover task instance id: {}, process instance id: {}", Integer.valueOf(taskInstance.getId()), Integer.valueOf(taskInstance.getProcessInstanceId()));
                failoverTaskInstance(processInstance, taskInstance, serverList);
            }
        }
        LOGGER.info("end worker[{}] failover, useTime:{}ms", str, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    private void failoverTaskInstance(ProcessInstance processInstance, TaskInstance taskInstance, List<Server> list) {
        if (processInstance == null) {
            LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null", Integer.valueOf(taskInstance.getProcessInstanceId()), Integer.valueOf(taskInstance.getId()));
            return;
        }
        if (checkTaskInstanceNeedFailover(list, taskInstance)) {
            boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType());
            taskInstance.setProcessInstance(processInstance);
            if (!isMasterTask) {
                TaskExecutionContext create = TaskExecutionContextBuilder.get().buildTaskInstanceRelatedInfo(taskInstance).buildProcessInstanceRelatedInfo(processInstance).create();
                if (this.masterConfig.isKillYarnJobWhenTaskFailover()) {
                    ProcessUtils.killYarnJob(create);
                }
            }
            taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
            this.processService.saveTaskInstance(taskInstance);
            StateEvent stateEvent = new StateEvent();
            stateEvent.setTaskInstanceId(taskInstance.getId());
            stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
            stateEvent.setProcessInstanceId(processInstance.getId());
            stateEvent.setExecutionStatus(taskInstance.getState());
            this.workflowExecuteThreadPool.submitStateEvent(stateEvent);
        }
    }

    private List<String> getNeedFailoverMasterServers() {
        List<String> queryNeedFailoverProcessInstanceHost = this.processService.queryNeedFailoverProcessInstanceHost();
        Iterator<String> it = queryNeedFailoverProcessInstanceHost.iterator();
        while (it.hasNext()) {
            String next = it.next();
            if (this.registryClient.checkNodeExists(next, NodeType.MASTER) && !next.equals(getLocalAddress())) {
                it.remove();
            }
        }
        return queryNeedFailoverProcessInstanceHost;
    }

    private boolean checkTaskInstanceNeedFailover(List<Server> list, TaskInstance taskInstance) {
        boolean z = true;
        if (taskInstance == null) {
            LOGGER.error("failover task instance error, taskInstance is null");
            return false;
        }
        if ("NULL".equals(taskInstance.getHost())) {
            return false;
        }
        if ((taskInstance.getState() != null && taskInstance.getState().typeIsFinished()) || taskInstance.getHost() == null) {
            return false;
        }
        if (checkTaskAfterServerStart(list, taskInstance)) {
            z = false;
        }
        return z;
    }

    private boolean checkTaskAfterServerStart(List<Server> list, TaskInstance taskInstance) {
        Date serverStartupTime;
        if (StringUtils.isEmpty(taskInstance.getHost()) || (serverStartupTime = getServerStartupTime(list, taskInstance.getHost())) == null) {
            return false;
        }
        return taskInstance.getStartTime() == null ? taskInstance.getSubmitTime().after(serverStartupTime) : taskInstance.getStartTime().after(serverStartupTime);
    }

    private String getFailoverLockPath(NodeType nodeType, String str) {
        switch (AnonymousClass1.$SwitchMap$org$apache$dolphinscheduler$common$enums$NodeType[nodeType.ordinal()]) {
            case 1:
                return "/lock/failover/masters/" + str;
            case 2:
                return "/lock/failover/workers/" + str;
            default:
                return "";
        }
    }

    private Date getServerStartupTime(NodeType nodeType, String str) {
        if (StringUtils.isEmpty(str)) {
            return null;
        }
        return getServerStartupTime(this.registryClient.getServerList(nodeType), str);
    }

    private Date getServerStartupTime(List<Server> list, String str) {
        if (CollectionUtils.isEmpty(list)) {
            return null;
        }
        Date date = null;
        Iterator<Server> it = list.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Server next = it.next();
            if (str.equals(next.getHost() + ":" + next.getPort())) {
                date = next.getCreateTime();
                break;
            }
        }
        return date;
    }

    String getLocalAddress() {
        return NetUtils.getAddr(this.masterConfig.getListenPort());
    }
}
