/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.runner;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.config.MasterServerLoadProtection;
import org.apache.dolphinscheduler.server.master.event.WorkflowEvent;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventType;
import org.apache.dolphinscheduler.server.master.exception.MasterException;
import org.apache.dolphinscheduler.server.master.exception.WorkflowCreateException;
import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.registry.MasterSlotManager;
import org.apache.dolphinscheduler.server.master.runner.MasterTaskExecutorBootstrap;
import org.apache.dolphinscheduler.server.master.runner.WorkflowEventLooper;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnableFactory;
import org.apache.dolphinscheduler.service.command.CommandService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MasterSchedulerBootstrap
extends BaseDaemonThread
implements AutoCloseable {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(MasterSchedulerBootstrap.class);
    @Autowired
    private CommandService commandService;
    @Autowired
    private MasterConfig masterConfig;
    @Autowired
    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
    @Autowired
    private WorkflowExecuteRunnableFactory workflowExecuteRunnableFactory;
    @Autowired
    private WorkflowEventQueue workflowEventQueue;
    @Autowired
    private WorkflowEventLooper workflowEventLooper;
    @Autowired
    private MasterSlotManager masterSlotManager;
    @Autowired
    private MasterTaskExecutorBootstrap masterTaskExecutorBootstrap;
    @Autowired
    private MetricsProvider metricsProvider;

    protected MasterSchedulerBootstrap() {
        super("MasterCommandLoopThread");
    }

    public synchronized void start() {
        log.info("MasterSchedulerBootstrap starting..");
        super.start();
        this.workflowEventLooper.start();
        this.masterTaskExecutorBootstrap.start();
        log.info("MasterSchedulerBootstrap started...");
    }

    @Override
    public void close() throws Exception {
        log.info("MasterSchedulerBootstrap stopping...");
        try (WorkflowEventLooper workflowEventLooper1 = this.workflowEventLooper;){
            MasterTaskExecutorBootstrap masterTaskExecutorBootstrap1 = this.masterTaskExecutorBootstrap;
            Throwable throwable = null;
            if (masterTaskExecutorBootstrap1 != null) {
                if (throwable != null) {
                    try {
                        masterTaskExecutorBootstrap1.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                } else {
                    masterTaskExecutorBootstrap1.close();
                }
            }
        }
        log.info("MasterSchedulerBootstrap stopped...");
    }

    public void run() {
        MasterServerLoadProtection serverLoadProtection = this.masterConfig.getServerLoadProtection();
        while (!ServerLifeCycleManager.isStopped()) {
            try {
                SystemMetrics systemMetrics;
                if (!ServerLifeCycleManager.isRunning()) {
                    log.warn("The current server is not at running status, cannot consumes commands.");
                    Thread.sleep(1000L);
                }
                if (serverLoadProtection.isOverload(systemMetrics = this.metricsProvider.getSystemMetrics())) {
                    log.warn("The current server is overload, cannot consumes commands.");
                    MasterServerMetrics.incMasterOverload();
                    Thread.sleep(1000L);
                    continue;
                }
                List<Command> commands = this.findCommands();
                if (CollectionUtils.isEmpty(commands)) {
                    Thread.sleep(1000L);
                    continue;
                }
                commands.parallelStream().forEach(command -> {
                    try {
                        Optional<WorkflowExecuteRunnable> workflowExecuteRunnableOptional = this.workflowExecuteRunnableFactory.createWorkflowExecuteRunnable((Command)command);
                        if (!workflowExecuteRunnableOptional.isPresent()) {
                            log.warn("The command execute success, will not trigger a WorkflowExecuteRunnable, this workflowInstance might be in serial mode");
                            return;
                        }
                        WorkflowExecuteRunnable workflowExecuteRunnable = workflowExecuteRunnableOptional.get();
                        ProcessInstance processInstance = workflowExecuteRunnable.getWorkflowExecuteContext().getWorkflowInstance();
                        if (this.processInstanceExecCacheManager.contains(processInstance.getId())) {
                            log.error("The workflow instance is already been cached, this case shouldn't be happened");
                        }
                        this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
                        this.workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW, processInstance.getId()));
                    }
                    catch (WorkflowCreateException workflowCreateException) {
                        log.error("Master handle command {} error ", (Object)command.getId(), (Object)workflowCreateException);
                        this.commandService.moveToErrorCommand(command, workflowCreateException.toString());
                    }
                });
                MasterServerMetrics.incMasterConsumeCommand(commands.size());
            }
            catch (InterruptedException interruptedException) {
                log.warn("Master schedule bootstrap interrupted, close the loop", (Throwable)interruptedException);
                Thread.currentThread().interrupt();
                break;
            }
            catch (Exception e) {
                log.error("Master schedule workflow error", (Throwable)e);
                ThreadUtils.sleep((long)1000L);
            }
        }
    }

    private List<Command> findCommands() throws MasterException {
        try {
            long scheduleStartTime = System.currentTimeMillis();
            int thisMasterSlot = this.masterSlotManager.getSlot();
            int masterCount = this.masterSlotManager.getMasterSize();
            if (masterCount <= 0) {
                log.warn("Master count: {} is invalid, the current slot: {}", (Object)masterCount, (Object)thisMasterSlot);
                return Collections.emptyList();
            }
            int pageSize = this.masterConfig.getFetchCommandNum();
            List result = this.commandService.findCommandPageBySlot(pageSize, masterCount, thisMasterSlot);
            if (CollectionUtils.isNotEmpty((Collection)result)) {
                long cost = System.currentTimeMillis() - scheduleStartTime;
                log.info("Master schedule bootstrap loop command success, fetch command size: {}, cost: {}ms, current slot: {}, total slot size: {}", new Object[]{result.size(), cost, thisMasterSlot, masterCount});
                ProcessInstanceMetrics.recordCommandQueryTime(cost);
            }
            return result;
        }
        catch (Exception ex) {
            throw new MasterException("Master loop command from database error", ex);
        }
    }
}

