package org.apache.dolphinscheduler.server.master.service;

import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/dolphinscheduler/server/master/service/MasterFailoverService.class */
public class MasterFailoverService {
    private static final Logger LOGGER = LoggerFactory.getLogger(MasterFailoverService.class);
    private final RegistryClient registryClient;
    private final MasterConfig masterConfig;
    private final ProcessService processService;
    private final String localAddress;
    private final NettyExecutorManager nettyExecutorManager;

    public MasterFailoverService(@NonNull RegistryClient registryClient, @NonNull MasterConfig masterConfig, @NonNull ProcessService processService, @NonNull NettyExecutorManager nettyExecutorManager) {
        if (registryClient == null) {
            throw new NullPointerException("registryClient is marked non-null but is null");
        }
        if (masterConfig == null) {
            throw new NullPointerException("masterConfig is marked non-null but is null");
        }
        if (processService == null) {
            throw new NullPointerException("processService is marked non-null but is null");
        }
        if (nettyExecutorManager == null) {
            throw new NullPointerException("nettyExecutorManager is marked non-null but is null");
        }
        this.registryClient = registryClient;
        this.masterConfig = masterConfig;
        this.processService = processService;
        this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
        this.nettyExecutorManager = nettyExecutorManager;
    }

    @Counted("ds.master.scheduler.failover.check.count")
    @Timed(value = "ds.master.scheduler.failover.check.time", percentiles = {0.5d, 0.75d, 0.95d, 0.99d}, histogram = true)
    public void checkMasterFailover() {
        List list = (List) this.processService.queryNeedFailoverProcessInstanceHost().stream().filter(str -> {
            return this.localAddress.equals(str) || !this.registryClient.checkNodeExists(str, NodeType.MASTER);
        }).distinct().collect(Collectors.toList());
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        LOGGER.info("Master failover service {} begin to failover hosts:{}", this.localAddress, list);
        Iterator it = list.iterator();
        while (it.hasNext()) {
            failoverMaster((String) it.next());
        }
    }

    public void failoverMaster(String str) {
        String str2 = "/lock/failover/masters/" + str;
        try {
            try {
                this.registryClient.getLock(str2);
                doFailoverMaster(str);
                this.registryClient.releaseLock(str2);
            } catch (Exception e) {
                LOGGER.error("Master server failover failed, host:{}", str, e);
                this.registryClient.releaseLock(str2);
            }
        } catch (Throwable th) {
            this.registryClient.releaseLock(str2);
            throw th;
        }
    }

    private void doFailoverMaster(@NonNull String str) {
        if (str == null) {
            throw new NullPointerException("masterHost is marked non-null but is null");
        }
        StopWatch createStarted = StopWatch.createStarted();
        Optional<Date> serverStartupTime = getServerStartupTime(this.registryClient.getServerList(NodeType.MASTER), str);
        List<ProcessInstance> queryNeedFailoverProcessInstances = this.processService.queryNeedFailoverProcessInstances(str);
        if (CollectionUtils.isEmpty(queryNeedFailoverProcessInstances)) {
            return;
        }
        LOGGER.info("Master[{}] failover starting there are {} workflowInstance may need to failover, will do a deep check, workflowInstanceIds: {}", new Object[]{str, Integer.valueOf(queryNeedFailoverProcessInstances.size()), queryNeedFailoverProcessInstances.stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList())});
        for (ProcessInstance processInstance : queryNeedFailoverProcessInstances) {
            try {
                LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
                LOGGER.info("WorkflowInstance failover starting");
                if (checkProcessInstanceNeedFailover(serverStartupTime, processInstance)) {
                    processInstance.setProcessDefinition(this.processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()));
                    for (TaskInstance taskInstance : this.processService.findValidTaskListByProcessId(Integer.valueOf(processInstance.getId()))) {
                        try {
                            LoggerUtils.setTaskInstanceIdMDC(taskInstance.getId());
                            LOGGER.info("TaskInstance failover starting");
                            if (checkTaskInstanceNeedFailover(taskInstance)) {
                                failoverTaskInstance(processInstance, taskInstance);
                                LOGGER.info("TaskInstance failover finished");
                                LoggerUtils.removeTaskInstanceIdMDC();
                            } else {
                                LOGGER.info("The taskInstance doesn't need to failover");
                                LoggerUtils.removeTaskInstanceIdMDC();
                            }
                        } finally {
                        }
                    }
                    ProcessInstanceMetrics.incProcessInstanceFailover();
                    processInstance.setHost("NULL");
                    this.processService.processNeedFailoverProcessInstances(processInstance);
                    LOGGER.info("WorkflowInstance failover finished");
                    LoggerUtils.removeWorkflowInstanceIdMDC();
                } else {
                    LOGGER.info("WorkflowInstance doesn't need to failover");
                    LoggerUtils.removeWorkflowInstanceIdMDC();
                }
            } catch (Throwable th) {
                LoggerUtils.removeWorkflowInstanceIdMDC();
                throw th;
            }
        }
        createStarted.stop();
        LOGGER.info("Master[{}] failover finished, useTime:{}ms", str, Long.valueOf(createStarted.getTime(TimeUnit.MILLISECONDS)));
    }

    private Optional<Date> getServerStartupTime(List<Server> list, String str) {
        if (CollectionUtils.isEmpty(list)) {
            return Optional.empty();
        }
        Date date = null;
        Iterator<Server> it = list.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Server next = it.next();
            if (str.equals(next.getHost() + ":" + next.getPort())) {
                date = next.getCreateTime();
                break;
            }
        }
        return Optional.ofNullable(date);
    }

    private void failoverTaskInstance(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
        if (processInstance == null) {
            throw new NullPointerException("processInstance is marked non-null but is null");
        }
        if (taskInstance == null) {
            throw new NullPointerException("taskInstance is marked non-null but is null");
        }
        TaskMetrics.incTaskFailover();
        boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType());
        taskInstance.setProcessInstance(processInstance);
        if (isMasterTask) {
            LOGGER.info("The failover taskInstance is a master task");
        } else {
            LOGGER.info("The failover taskInstance is not master task");
            TaskExecutionContext create = TaskExecutionContextBuilder.get().buildTaskInstanceRelatedInfo(taskInstance).buildProcessInstanceRelatedInfo(processInstance).buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition()).create();
            if (this.masterConfig.isKillYarnJobWhenTaskFailover()) {
                LOGGER.info("TaskInstance failover begin kill the task related yarn job");
                ProcessUtils.killYarnJob(create);
            }
            sendKillCommandToWorker(taskInstance);
        }
        taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
        taskInstance.setFlag(Flag.NO);
        this.processService.saveTaskInstance(taskInstance);
    }

    private void sendKillCommandToWorker(@NonNull TaskInstance taskInstance) {
        if (taskInstance == null) {
            throw new NullPointerException("taskInstance is marked non-null but is null");
        }
        if (StringUtils.isEmpty(taskInstance.getHost())) {
            return;
        }
        try {
            TaskKillRequestCommand taskKillRequestCommand = new TaskKillRequestCommand(taskInstance.getId());
            this.nettyExecutorManager.doExecute(Host.of(taskInstance.getHost()), taskKillRequestCommand.convert2Command());
            LOGGER.info("Failover task success, has killed the task in worker: {}", taskInstance.getHost());
        } catch (ExecuteException e) {
            LOGGER.error("Kill task failed", e);
        }
    }

    private boolean checkTaskInstanceNeedFailover(@NonNull TaskInstance taskInstance) {
        if (taskInstance == null) {
            throw new NullPointerException("taskInstance is marked non-null but is null");
        }
        return taskInstance.getState() == null || !taskInstance.getState().typeIsFinished();
    }

    private boolean checkProcessInstanceNeedFailover(Optional<Date> optional, @NonNull ProcessInstance processInstance) {
        if (processInstance == null) {
            throw new NullPointerException("processInstance is marked non-null but is null");
        }
        if ("NULL".equals(processInstance.getHost())) {
            return false;
        }
        if (optional.isPresent()) {
            return !processInstance.getStartTime().after(optional.get());
        }
        return true;
    }
}
