/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.service;

import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Generated;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
public class MasterFailoverService {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(MasterFailoverService.class);
    private final RegistryClient registryClient;
    private final MasterConfig masterConfig;
    private final ProcessService processService;
    private final String localAddress;
    private final ProcessInstanceExecCacheManager processInstanceExecCacheManager;

    public MasterFailoverService(@NonNull RegistryClient registryClient, @NonNull MasterConfig masterConfig, @NonNull ProcessService processService, @NonNull ProcessInstanceExecCacheManager processInstanceExecCacheManager) {
        if (registryClient == null) {
            throw new NullPointerException("registryClient is marked non-null but is null");
        }
        if (masterConfig == null) {
            throw new NullPointerException("masterConfig is marked non-null but is null");
        }
        if (processService == null) {
            throw new NullPointerException("processService is marked non-null but is null");
        }
        if (processInstanceExecCacheManager == null) {
            throw new NullPointerException("processInstanceExecCacheManager is marked non-null but is null");
        }
        this.registryClient = registryClient;
        this.masterConfig = masterConfig;
        this.processService = processService;
        this.localAddress = masterConfig.getMasterAddress();
        this.processInstanceExecCacheManager = processInstanceExecCacheManager;
    }

    @Counted(value="ds.master.scheduler.failover.check.count")
    @Timed(value="ds.master.scheduler.failover.check.time", percentiles={0.5, 0.75, 0.95, 0.99}, histogram=true)
    public void checkMasterFailover() {
        List needFailoverMasterHosts = this.processService.queryNeedFailoverProcessInstanceHost().stream().filter(host -> this.localAddress.equals(host) || !this.registryClient.checkNodeExists(host, RegistryNodeType.MASTER)).distinct().collect(Collectors.toList());
        if (CollectionUtils.isEmpty(needFailoverMasterHosts)) {
            return;
        }
        log.info("Master failover service {} begin to failover hosts:{}", (Object)this.localAddress, needFailoverMasterHosts);
        for (String needFailoverMasterHost : needFailoverMasterHosts) {
            this.failoverMaster(needFailoverMasterHost);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void failoverMaster(String masterHost) {
        String failoverPath = RegistryNodeType.MASTER_FAILOVER_LOCK.getRegistryPath() + "/" + masterHost;
        try {
            this.registryClient.getLock(failoverPath);
            this.doFailoverMaster(masterHost);
        }
        catch (Exception e) {
            log.error("Master server failover failed, host:{}", (Object)masterHost, (Object)e);
        }
        finally {
            this.registryClient.releaseLock(failoverPath);
        }
    }

    private void doFailoverMaster(@NonNull String masterHost) {
        if (masterHost == null) {
            throw new NullPointerException("masterHost is marked non-null but is null");
        }
        StopWatch failoverTimeCost = StopWatch.createStarted();
        Optional<Date> masterStartupTimeOptional = this.getServerStartupTime(this.registryClient.getServerList(RegistryNodeType.MASTER), masterHost);
        List needFailoverProcessInstanceList = this.processService.queryNeedFailoverProcessInstances(masterHost);
        if (CollectionUtils.isEmpty((Collection)needFailoverProcessInstanceList)) {
            return;
        }
        log.info("Master[{}] failover starting there are {} workflowInstance may need to failover, will do a deep check, workflowInstanceIds: {}", new Object[]{masterHost, needFailoverProcessInstanceList.size(), needFailoverProcessInstanceList.stream().map(ProcessInstance::getId).collect(Collectors.toList())});
        for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
            LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setWorkflowInstanceIdMDC((Integer)processInstance.getId());
            Throwable throwable = null;
            try {
                log.info("WorkflowInstance failover starting");
                if (!this.checkProcessInstanceNeedFailover(masterStartupTimeOptional, processInstance)) continue;
                ProcessInstanceMetrics.incProcessInstanceByStateAndProcessDefinitionCode("failover", processInstance.getProcessDefinitionCode().toString());
                this.processService.processNeedFailoverProcessInstances(processInstance);
                log.info("WorkflowInstance failover finished");
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (mdcAutoClosableContext == null) continue;
                if (throwable != null) {
                    try {
                        mdcAutoClosableContext.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                mdcAutoClosableContext.close();
            }
        }
        failoverTimeCost.stop();
        log.info("Master[{}] failover finished, useTime:{}ms", (Object)masterHost, (Object)failoverTimeCost.getTime(TimeUnit.MILLISECONDS));
    }

    private Optional<Date> getServerStartupTime(List<Server> servers, String host) {
        if (CollectionUtils.isEmpty(servers)) {
            return Optional.empty();
        }
        Date serverStartupTime = null;
        for (Server server : servers) {
            if (!host.equals(server.getHost() + ":" + server.getPort())) continue;
            serverStartupTime = server.getCreateTime();
            break;
        }
        return Optional.ofNullable(serverStartupTime);
    }

    private boolean checkProcessInstanceNeedFailover(Optional<Date> beFailoveredMasterStartupTimeOptional, @NonNull ProcessInstance processInstance) {
        if (processInstance == null) {
            throw new NullPointerException("processInstance is marked non-null but is null");
        }
        if ("NULL".equals(processInstance.getHost())) {
            log.info("The workflowInstance's  host is NULL, no need to failover");
            return false;
        }
        if (!beFailoveredMasterStartupTimeOptional.isPresent()) {
            return true;
        }
        Date beFailoveredMasterStartupTime = beFailoveredMasterStartupTimeOptional.get();
        if (processInstance.getStartTime().after(beFailoveredMasterStartupTime)) {
            log.info("The workflowInstance is newly created, no need to failover");
            return false;
        }
        if (processInstance.getRestartTime() != null && processInstance.getRestartTime().after(beFailoveredMasterStartupTime)) {
            log.info("The workflowInstance's restartTime is after the dead master startup time, no need to failover");
            return false;
        }
        if (this.processInstanceExecCacheManager.contains(processInstance.getId())) {
            log.info("The workflowInstance is running in the current master, no need to failover");
            return false;
        }
        return true;
    }
}

