package org.apache.dolphinscheduler.server.master.runner;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.event.WorkflowEvent;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventType;
import org.apache.dolphinscheduler.server.master.exception.MasterException;
import org.apache.dolphinscheduler.server.master.exception.WorkflowCreateException;
import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.registry.MasterSlotManager;
import org.apache.dolphinscheduler.service.command.CommandService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.class */
public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCloseable {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(MasterSchedulerBootstrap.class);

    @Autowired
    private CommandService commandService;

    @Autowired
    private MasterConfig masterConfig;

    @Autowired
    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;

    @Autowired
    private WorkflowExecuteRunnableFactory workflowExecuteRunnableFactory;

    @Autowired
    private WorkflowEventQueue workflowEventQueue;

    @Autowired
    private WorkflowEventLooper workflowEventLooper;

    @Autowired
    private MasterSlotManager masterSlotManager;

    @Autowired
    private MasterTaskExecutorBootstrap masterTaskExecutorBootstrap;

    protected MasterSchedulerBootstrap() {
        super("MasterCommandLoopThread");
    }

    public synchronized void start() {
        log.info("MasterSchedulerBootstrap starting..");
        super.start();
        this.workflowEventLooper.start();
        this.masterTaskExecutorBootstrap.start();
        log.info("MasterSchedulerBootstrap started...");
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        log.info("MasterSchedulerBootstrap stopping...");
        WorkflowEventLooper workflowEventLooper = this.workflowEventLooper;
        Throwable th = null;
        try {
            MasterTaskExecutorBootstrap masterTaskExecutorBootstrap = this.masterTaskExecutorBootstrap;
            Throwable th2 = null;
            if (masterTaskExecutorBootstrap != null) {
                if (0 != 0) {
                    try {
                        masterTaskExecutorBootstrap.close();
                    } catch (Throwable th3) {
                        th2.addSuppressed(th3);
                    }
                } else {
                    masterTaskExecutorBootstrap.close();
                }
            }
            log.info("MasterSchedulerBootstrap stopped...");
        } finally {
            if (workflowEventLooper != null) {
                if (0 != 0) {
                    try {
                        workflowEventLooper.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    workflowEventLooper.close();
                }
            }
        }
    }

    public void run() {
        while (!ServerLifeCycleManager.isStopped()) {
            try {
                if (!ServerLifeCycleManager.isRunning()) {
                    log.warn("The current server is not at running status, cannot consumes commands.");
                    Thread.sleep(1000L);
                }
                if (OSUtils.isOverload(this.masterConfig.getMaxCpuLoadAvg(), this.masterConfig.getReservedMemory()).booleanValue()) {
                    log.warn("The current server is overload, cannot consumes commands.");
                    MasterServerMetrics.incMasterOverload();
                    Thread.sleep(1000L);
                } else {
                    List<Command> findCommands = findCommands();
                    if (CollectionUtils.isEmpty(findCommands)) {
                        Thread.sleep(1000L);
                    } else {
                        findCommands.parallelStream().forEach(command -> {
                            try {
                                Optional<WorkflowExecuteRunnable> createWorkflowExecuteRunnable = this.workflowExecuteRunnableFactory.createWorkflowExecuteRunnable(command);
                                if (!createWorkflowExecuteRunnable.isPresent()) {
                                    log.warn("The command execute success, will not trigger a WorkflowExecuteRunnable, this workflowInstance might be in serial mode");
                                    return;
                                }
                                WorkflowExecuteRunnable workflowExecuteRunnable = createWorkflowExecuteRunnable.get();
                                ProcessInstance workflowInstance = workflowExecuteRunnable.getWorkflowExecuteContext().getWorkflowInstance();
                                if (this.processInstanceExecCacheManager.contains(workflowInstance.getId().intValue())) {
                                    log.error("The workflow instance is already been cached, this case shouldn't be happened");
                                }
                                this.processInstanceExecCacheManager.cache(workflowInstance.getId().intValue(), workflowExecuteRunnable);
                                this.workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW, workflowInstance.getId().intValue()));
                            } catch (WorkflowCreateException e) {
                                log.error("Master handle command {} error ", command.getId(), e);
                                this.commandService.moveToErrorCommand(command, e.toString());
                            }
                        });
                        MasterServerMetrics.incMasterConsumeCommand(findCommands.size());
                    }
                }
            } catch (InterruptedException e) {
                log.warn("Master schedule bootstrap interrupted, close the loop", e);
                Thread.currentThread().interrupt();
                return;
            } catch (Exception e2) {
                log.error("Master schedule workflow error", e2);
                ThreadUtils.sleep(1000L);
            }
        }
    }

    private List<Command> findCommands() throws MasterException {
        try {
            long currentTimeMillis = System.currentTimeMillis();
            int slot = this.masterSlotManager.getSlot();
            int masterSize = this.masterSlotManager.getMasterSize();
            if (masterSize <= 0) {
                log.warn("Master count: {} is invalid, the current slot: {}", Integer.valueOf(masterSize), Integer.valueOf(slot));
                return Collections.emptyList();
            }
            List<Command> findCommandPageBySlot = this.commandService.findCommandPageBySlot(this.masterConfig.getFetchCommandNum(), masterSize, slot);
            if (CollectionUtils.isNotEmpty(findCommandPageBySlot)) {
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                log.info("Master schedule bootstrap loop command success, fetch command size: {}, cost: {}ms, current slot: {}, total slot size: {}", new Object[]{Integer.valueOf(findCommandPageBySlot.size()), Long.valueOf(currentTimeMillis2), Integer.valueOf(slot), Integer.valueOf(masterSize)});
                ProcessInstanceMetrics.recordCommandQueryTime(currentTimeMillis2);
            }
            return findCommandPageBySlot;
        } catch (Exception e) {
            throw new MasterException("Master loop command from database error", e);
        }
    }
}
