/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.service;

import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
public class MasterFailoverService {
    private static final Logger LOGGER = LoggerFactory.getLogger(MasterFailoverService.class);
    private final RegistryClient registryClient;
    private final MasterConfig masterConfig;
    private final ProcessService processService;
    private final String localAddress;
    private final NettyExecutorManager nettyExecutorManager;
    private final ProcessInstanceExecCacheManager processInstanceExecCacheManager;

    public MasterFailoverService(@NonNull RegistryClient registryClient, @NonNull MasterConfig masterConfig, @NonNull ProcessService processService, @NonNull NettyExecutorManager nettyExecutorManager, @NonNull ProcessInstanceExecCacheManager processInstanceExecCacheManager) {
        if (registryClient == null) {
            throw new NullPointerException("registryClient is marked non-null but is null");
        }
        if (masterConfig == null) {
            throw new NullPointerException("masterConfig is marked non-null but is null");
        }
        if (processService == null) {
            throw new NullPointerException("processService is marked non-null but is null");
        }
        if (nettyExecutorManager == null) {
            throw new NullPointerException("nettyExecutorManager is marked non-null but is null");
        }
        if (processInstanceExecCacheManager == null) {
            throw new NullPointerException("processInstanceExecCacheManager is marked non-null but is null");
        }
        this.registryClient = registryClient;
        this.masterConfig = masterConfig;
        this.processService = processService;
        this.localAddress = NetUtils.getAddr((int)masterConfig.getListenPort());
        this.nettyExecutorManager = nettyExecutorManager;
        this.processInstanceExecCacheManager = processInstanceExecCacheManager;
    }

    @Counted(value="ds.master.scheduler.failover.check.count")
    @Timed(value="ds.master.scheduler.failover.check.time", percentiles={0.5, 0.75, 0.95, 0.99}, histogram=true)
    public void checkMasterFailover() {
        List needFailoverMasterHosts = this.processService.queryNeedFailoverProcessInstanceHost().stream().filter(host -> this.localAddress.equals(host) || !this.registryClient.checkNodeExists(host, NodeType.MASTER)).distinct().collect(Collectors.toList());
        if (CollectionUtils.isEmpty(needFailoverMasterHosts)) {
            return;
        }
        LOGGER.info("Master failover service {} begin to failover hosts:{}", (Object)this.localAddress, needFailoverMasterHosts);
        for (String needFailoverMasterHost : needFailoverMasterHosts) {
            this.failoverMaster(needFailoverMasterHost);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void failoverMaster(String masterHost) {
        String failoverPath = "/lock/failover/masters/" + masterHost;
        try {
            this.registryClient.getLock(failoverPath);
            this.doFailoverMaster(masterHost);
        }
        catch (Exception e) {
            LOGGER.error("Master server failover failed, host:{}", (Object)masterHost, (Object)e);
        }
        finally {
            this.registryClient.releaseLock(failoverPath);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doFailoverMaster(@NonNull String masterHost) {
        if (masterHost == null) {
            throw new NullPointerException("masterHost is marked non-null but is null");
        }
        StopWatch failoverTimeCost = StopWatch.createStarted();
        Optional<Date> masterStartupTimeOptional = this.getServerStartupTime(this.registryClient.getServerList(NodeType.MASTER), masterHost);
        List needFailoverProcessInstanceList = this.processService.queryNeedFailoverProcessInstances(masterHost);
        if (CollectionUtils.isEmpty((Collection)needFailoverProcessInstanceList)) {
            return;
        }
        LOGGER.info("Master[{}] failover starting there are {} workflowInstance may need to failover, will do a deep check, workflowInstanceIds: {}", new Object[]{masterHost, needFailoverProcessInstanceList.size(), needFailoverProcessInstanceList.stream().map(ProcessInstance::getId).collect(Collectors.toList())});
        for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
            try {
                LoggerUtils.setWorkflowInstanceIdMDC((int)processInstance.getId());
                LOGGER.info("WorkflowInstance failover starting");
                if (!this.checkProcessInstanceNeedFailover(masterStartupTimeOptional, processInstance)) {
                    LOGGER.info("WorkflowInstance doesn't need to failover");
                    continue;
                }
                ProcessDefinition processDefinition = this.processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
                processInstance.setProcessDefinition(processDefinition);
                int processInstanceId = processInstance.getId();
                List taskInstanceList = this.processService.findValidTaskListByProcessId(Integer.valueOf(processInstanceId));
                for (TaskInstance taskInstance : taskInstanceList) {
                    try {
                        LoggerUtils.setTaskInstanceIdMDC((int)taskInstance.getId());
                        LOGGER.info("TaskInstance failover starting");
                        if (!this.checkTaskInstanceNeedFailover(taskInstance)) {
                            LOGGER.info("The taskInstance doesn't need to failover");
                            continue;
                        }
                        this.failoverTaskInstance(processInstance, taskInstance);
                        LOGGER.info("TaskInstance failover finished");
                    }
                    finally {
                        LoggerUtils.removeTaskInstanceIdMDC();
                    }
                }
                ProcessInstanceMetrics.incProcessInstanceByState("failover");
                processInstance.setHost("NULL");
                this.processService.processNeedFailoverProcessInstances(processInstance);
                LOGGER.info("WorkflowInstance failover finished");
            }
            finally {
                LoggerUtils.removeWorkflowInstanceIdMDC();
            }
        }
        failoverTimeCost.stop();
        LOGGER.info("Master[{}] failover finished, useTime:{}ms", (Object)masterHost, (Object)failoverTimeCost.getTime(TimeUnit.MILLISECONDS));
    }

    private Optional<Date> getServerStartupTime(List<Server> servers, String host) {
        if (CollectionUtils.isEmpty(servers)) {
            return Optional.empty();
        }
        Date serverStartupTime = null;
        for (Server server : servers) {
            if (!host.equals(server.getHost() + ":" + server.getPort())) continue;
            serverStartupTime = server.getCreateTime();
            break;
        }
        return Optional.ofNullable(serverStartupTime);
    }

    private void failoverTaskInstance(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
        if (processInstance == null) {
            throw new NullPointerException("processInstance is marked non-null but is null");
        }
        if (taskInstance == null) {
            throw new NullPointerException("taskInstance is marked non-null but is null");
        }
        TaskMetrics.incTaskInstanceByState("failover");
        boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType());
        taskInstance.setProcessInstance(processInstance);
        if (!isMasterTask) {
            LOGGER.info("The failover taskInstance is not master task");
            TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get().buildTaskInstanceRelatedInfo(taskInstance).buildProcessInstanceRelatedInfo(processInstance).buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition()).create();
            if (this.masterConfig.isKillYarnJobWhenTaskFailover()) {
                LOGGER.info("TaskInstance failover begin kill the task related yarn job");
                ProcessUtils.killYarnJob((TaskExecutionContext)taskExecutionContext);
            }
            this.sendKillCommandToWorker(taskInstance);
        } else {
            LOGGER.info("The failover taskInstance is a master task");
        }
        taskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE);
        taskInstance.setFlag(Flag.NO);
        this.processService.saveTaskInstance(taskInstance);
    }

    private void sendKillCommandToWorker(@NonNull TaskInstance taskInstance) {
        if (taskInstance == null) {
            throw new NullPointerException("taskInstance is marked non-null but is null");
        }
        if (StringUtils.isEmpty((CharSequence)taskInstance.getHost())) {
            return;
        }
        try {
            TaskKillRequestCommand killCommand = new TaskKillRequestCommand(taskInstance.getId().intValue());
            Host workerHost = Host.of((String)taskInstance.getHost());
            this.nettyExecutorManager.doExecute(workerHost, killCommand.convert2Command());
            LOGGER.info("Failover task success, has killed the task in worker: {}", (Object)taskInstance.getHost());
        }
        catch (ExecuteException e) {
            LOGGER.error("Kill task failed", (Throwable)e);
        }
    }

    private boolean checkTaskInstanceNeedFailover(@NonNull TaskInstance taskInstance) {
        if (taskInstance == null) {
            throw new NullPointerException("taskInstance is marked non-null but is null");
        }
        return taskInstance.getState() == null || !taskInstance.getState().isFinished();
    }

    private boolean checkProcessInstanceNeedFailover(Optional<Date> beFailoveredMasterStartupTimeOptional, @NonNull ProcessInstance processInstance) {
        if (processInstance == null) {
            throw new NullPointerException("processInstance is marked non-null but is null");
        }
        if ("NULL".equals(processInstance.getHost())) {
            return false;
        }
        if (!beFailoveredMasterStartupTimeOptional.isPresent()) {
            return true;
        }
        Date beFailoveredMasterStartupTime = beFailoveredMasterStartupTimeOptional.get();
        if (processInstance.getStartTime().after(beFailoveredMasterStartupTime)) {
            return false;
        }
        if (processInstance.getRestartTime() != null && processInstance.getRestartTime().after(beFailoveredMasterStartupTime)) {
            return false;
        }
        return !this.processInstanceExecCacheManager.contains(processInstance.getId());
    }
}

