/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.runner;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.configuration.Configuration;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.master.runner.MasterExecThread;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MasterSchedulerThread
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerThread.class);
    private final ExecutorService masterExecService;
    private final ProcessDao processDao;
    private final ZKMasterClient zkMasterClient;
    private int masterExecThreadNum;
    private final Configuration conf;

    public MasterSchedulerThread(ZKMasterClient zkClient, ProcessDao processDao, Configuration conf, int masterExecThreadNum) {
        this.processDao = processDao;
        this.zkMasterClient = zkClient;
        this.conf = conf;
        this.masterExecThreadNum = masterExecThreadNum;
        this.masterExecService = ThreadUtils.newDaemonFixedThreadExecutor((String)"Master-Exec-Thread", (int)masterExecThreadNum);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        while (Stopper.isRunning()) {
            ProcessInstance processInstance = null;
            InterProcessMutex mutex = null;
            try {
                if (OSUtils.checkResource((Configuration)this.conf, (Boolean)true).booleanValue() && this.zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) {
                    String znodeLock = this.zkMasterClient.getMasterLockPath();
                    mutex = new InterProcessMutex(this.zkMasterClient.getZkClient(), znodeLock);
                    mutex.acquire();
                    ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor)this.masterExecService;
                    int activeCount = poolExecutor.getActiveCount();
                    Command command = this.processDao.findOneCommand();
                    if (command != null) {
                        logger.info(String.format("find one command: id: %d, type: %s", command.getId(), command.getCommandType().toString()));
                        try {
                            processInstance = this.processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command);
                            if (processInstance != null) {
                                logger.info("start master exec thread , split DAG ...");
                                this.masterExecService.execute(new MasterExecThread(processInstance, this.processDao));
                            }
                        }
                        catch (Exception e) {
                            logger.error("scan command error ", (Throwable)e);
                            this.processDao.moveToErrorCommand(command, e.toString());
                        }
                    }
                }
                Thread.sleep(1000L);
            }
            catch (Exception e) {
                logger.error("master scheduler thread exception : " + e.getMessage(), (Throwable)e);
            }
            finally {
                AbstractZKClient.releaseMutex(mutex);
            }
        }
    }
}

