/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.worker.runner;

import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.queue.ITaskQueue;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.server.worker.runner.TaskScheduleThread;
import org.apache.dolphinscheduler.server.zk.ZKWorkerClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FetchTaskThread
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(FetchTaskThread.class);
    private final int taskNum;
    private final ZKWorkerClient zkWorkerClient;
    protected ITaskQueue taskQueue;
    private final ProcessDao processDao;
    private final ExecutorService workerExecService;
    private int workerExecNums;
    private Configuration conf;
    private TaskInstance taskInstance;
    Integer taskInstId;

    public FetchTaskThread(int taskNum, ZKWorkerClient zkWorkerClient, ProcessDao processDao, Configuration conf, ITaskQueue taskQueue) {
        this.taskNum = taskNum;
        this.zkWorkerClient = zkWorkerClient;
        this.processDao = processDao;
        this.workerExecNums = conf.getInt("worker.exec.threads", 10);
        this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor((String)"Worker-Fetch-Task-Thread", (int)this.workerExecNums);
        this.conf = conf;
        this.taskQueue = taskQueue;
        this.taskInstance = null;
    }

    private boolean checkWorkerGroup(TaskInstance taskInstance, String host) {
        int taskWorkerGroupId = this.processDao.getTaskWorkerGroupId(taskInstance);
        if (taskWorkerGroupId <= 0) {
            return true;
        }
        WorkerGroup workerGroup = this.processDao.queryWorkerGroupById(taskWorkerGroupId);
        if (workerGroup == null) {
            logger.info("task {} cannot find the worker group, use all worker instead.", (Object)taskInstance.getId());
            return true;
        }
        String ips = workerGroup.getIpList();
        if (StringUtils.isBlank((CharSequence)ips)) {
            logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers", (Object)taskInstance.getId(), (Object)workerGroup.getId());
        }
        String[] ipArray = ips.split(",");
        List<String> ipList = Arrays.asList(ipArray);
        return ipList.contains(host);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        while (Stopper.isRunning()) {
            InterProcessMutex mutex = null;
            try {
                List tasksQueueList;
                ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor)this.workerExecService;
                boolean runCheckFlag = OSUtils.checkResource((Configuration)this.conf, (Boolean)false) != false && this.checkThreadCount(poolExecutor);
                Thread.sleep(1000L);
                if (!runCheckFlag || CollectionUtils.isEmpty((Collection)(tasksQueueList = this.taskQueue.getAllTasks("tasks_queue")))) continue;
                mutex = this.zkWorkerClient.acquireZkLock(this.zkWorkerClient.getZkClient(), this.zkWorkerClient.getWorkerLockPath());
                List taskQueueStrArr = this.taskQueue.poll("tasks_queue", this.taskNum);
                for (String taskQueueStr : taskQueueStrArr) {
                    if (StringUtils.isEmpty((CharSequence)taskQueueStr)) continue;
                    if (!this.checkThreadCount(poolExecutor)) break;
                    this.taskInstId = this.getTaskInstanceId(taskQueueStr);
                    this.waitForTaskInstance();
                    this.taskInstance = this.processDao.getTaskInstanceDetailByTaskId(this.taskInstId.intValue());
                    if (this.verifyTaskInstanceIsNull(this.taskInstance)) {
                        logger.warn("remove task queue : {} due to taskInstance is null", (Object)taskQueueStr);
                        this.removeNodeFromTaskQueue(taskQueueStr);
                        continue;
                    }
                    Tenant tenant = this.processDao.getTenantForProcess(this.taskInstance.getProcessInstance().getTenantId(), this.taskInstance.getProcessDefine().getUserId());
                    if (this.verifyTenantIsNull(tenant)) {
                        logger.warn("remove task queue : {} due to tenant is null", (Object)taskQueueStr);
                        this.removeNodeFromTaskQueue(taskQueueStr);
                        continue;
                    }
                    String userQueue = this.processDao.queryUserQueueByProcessInstanceId(this.taskInstance.getProcessInstanceId());
                    this.taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty((CharSequence)userQueue) ? tenant.getQueue() : userQueue);
                    this.taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
                    logger.info("worker fetch taskId : {} from queue ", (Object)this.taskInstId);
                    if (!this.checkWorkerGroup(this.taskInstance, OSUtils.getHost())) continue;
                    String execLocalPath = this.getExecLocalPath();
                    logger.info("task instance  local execute path : {} ", (Object)execLocalPath);
                    this.taskInstance.init(OSUtils.getHost(), new Date(), execLocalPath);
                    FileUtils.createWorkDirAndUserIfAbsent((String)execLocalPath, (String)tenant.getTenantCode(), (Logger)logger);
                    logger.info("task : {} ready to submit to task scheduler thread", (Object)this.taskInstId);
                    this.workerExecService.submit(new TaskScheduleThread(this.taskInstance, this.processDao));
                    this.removeNodeFromTaskQueue(taskQueueStr);
                }
                AbstractZKClient.releaseMutex((InterProcessMutex)mutex);
            }
            catch (Exception e) {
                logger.error("fetch task thread failure", (Throwable)e);
            }
            finally {
                AbstractZKClient.releaseMutex(mutex);
            }
        }
    }

    private void removeNodeFromTaskQueue(String taskQueueStr) {
        this.taskQueue.removeNode("tasks_queue", taskQueueStr);
    }

    private boolean verifyTaskInstanceIsNull(TaskInstance taskInstance) {
        if (taskInstance == null) {
            logger.error("task instance is null. task id : {} ", (Object)this.taskInstId);
            return true;
        }
        return false;
    }

    private boolean verifyTenantIsNull(Tenant tenant) {
        if (tenant == null) {
            logger.error("tenant not exists,process define id : {},process instance id : {},task instance id : {}", new Object[]{this.taskInstance.getProcessDefine().getId(), this.taskInstance.getProcessInstance().getId(), this.taskInstance.getId()});
            return true;
        }
        return false;
    }

    private String getExecLocalPath() {
        return FileUtils.getProcessExecDir((int)this.taskInstance.getProcessDefine().getProjectId(), (int)this.taskInstance.getProcessDefine().getId(), (int)this.taskInstance.getProcessInstance().getId(), (int)this.taskInstance.getId());
    }

    private boolean checkThreadCount(ThreadPoolExecutor poolExecutor) {
        int activeCount = poolExecutor.getActiveCount();
        if (activeCount >= this.workerExecNums) {
            logger.info("thread insufficient , activeCount : {} , workerExecNums : {}, will sleep : {} millis for thread resource", new Object[]{activeCount, this.workerExecNums, 1000});
            return false;
        }
        return true;
    }

    private void waitForTaskInstance() throws Exception {
        for (int retryTimes = 30; this.taskInstance == null && retryTimes > 0; --retryTimes) {
            Thread.sleep(1000L);
            this.taskInstance = this.processDao.findTaskInstanceById(this.taskInstId);
        }
    }

    private int getTaskInstanceId(String taskQueueStr) {
        return Integer.parseInt(taskQueueStr.split("_")[3]);
    }
}

