/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.runner;

import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.MasterExecThread;
import org.apache.dolphinscheduler.server.master.zk.ZKMasterClient;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MasterSchedulerService
extends Thread {
    private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerService.class);
    @Autowired
    private ProcessService processService;
    @Autowired
    private ZKMasterClient zkMasterClient;
    @Autowired
    private MasterConfig masterConfig;
    private NettyRemotingClient nettyRemotingClient;
    private ThreadPoolExecutor masterExecService;

    @PostConstruct
    public void init() {
        this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor((String)"Master-Exec-Thread", (int)this.masterConfig.getMasterExecThreads());
        NettyClientConfig clientConfig = new NettyClientConfig();
        this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
    }

    @Override
    public void start() {
        super.setName("MasterSchedulerThread");
        super.start();
    }

    public void close() {
        this.masterExecService.shutdown();
        boolean terminated = false;
        try {
            terminated = this.masterExecService.awaitTermination(5L, TimeUnit.SECONDS);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        if (!terminated) {
            logger.warn("masterExecService shutdown without terminated, increase await time");
        }
        this.nettyRemotingClient.close();
        logger.info("master schedule service stopped...");
    }

    @Override
    public void run() {
        logger.info("master scheduler started");
        while (Stopper.isRunning()) {
            try {
                boolean runCheckFlag = OSUtils.checkResource((double)this.masterConfig.getMasterMaxCpuloadAvg(), (double)this.masterConfig.getMasterReservedMemory());
                if (!runCheckFlag) {
                    Thread.sleep(1000L);
                    continue;
                }
                if (this.zkMasterClient.getZkClient().getState() != CuratorFrameworkState.STARTED) continue;
                this.scheduleProcess();
            }
            catch (Exception e) {
                logger.error("master scheduler thread error", (Throwable)e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void scheduleProcess() throws Exception {
        block7: {
            InterProcessMutex mutex = null;
            try {
                mutex = this.zkMasterClient.blockAcquireMutex();
                int activeCount = this.masterExecService.getActiveCount();
                Command command = this.processService.findOneCommand();
                if (command != null) {
                    logger.info("find one command: id: {}, type: {}", (Object)command.getId(), (Object)command.getCommandType());
                    try {
                        ProcessInstance processInstance = this.processService.handleCommand(logger, this.getLocalAddress(), this.masterConfig.getMasterExecThreads() - activeCount, command);
                        if (processInstance != null) {
                            logger.info("start master exec thread , split DAG ...");
                            this.masterExecService.execute(new MasterExecThread(processInstance, this.processService, this.nettyRemotingClient));
                        }
                        break block7;
                    }
                    catch (Exception e) {
                        logger.error("scan command error ", (Throwable)e);
                        this.processService.moveToErrorCommand(command, e.toString());
                    }
                    break block7;
                }
                Thread.sleep(1000L);
            }
            finally {
                this.zkMasterClient.releaseMutex(mutex);
            }
        }
    }

    private String getLocalAddress() {
        return NetUtils.getAddr((int)this.masterConfig.getListenPort());
    }
}

