package org.apache.dolphinscheduler.server.worker.runner;

import java.util.Arrays;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.queue.ITaskQueue;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.server.zk.ZKWorkerClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.class */
public class FetchTaskThread implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(FetchTaskThread.class);
    private final int taskNum;
    private final ZKWorkerClient zkWorkerClient;
    protected ITaskQueue taskQueue;
    private final ProcessDao processDao;
    private final ExecutorService workerExecService;
    private int workerExecNums;
    private Configuration conf;
    private TaskInstance taskInstance = null;
    Integer taskInstId;

    public FetchTaskThread(int i, ZKWorkerClient zKWorkerClient, ProcessDao processDao, Configuration configuration, ITaskQueue iTaskQueue) {
        this.taskNum = i;
        this.zkWorkerClient = zKWorkerClient;
        this.processDao = processDao;
        this.workerExecNums = configuration.getInt("worker.exec.threads", 10);
        this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Fetch-Task-Thread", this.workerExecNums);
        this.conf = configuration;
        this.taskQueue = iTaskQueue;
    }

    private boolean checkWorkerGroup(TaskInstance taskInstance, String str) {
        int taskWorkerGroupId = this.processDao.getTaskWorkerGroupId(taskInstance);
        if (taskWorkerGroupId <= 0) {
            return true;
        }
        WorkerGroup queryWorkerGroupById = this.processDao.queryWorkerGroupById(taskWorkerGroupId);
        if (queryWorkerGroupById == null) {
            logger.info("task {} cannot find the worker group, use all worker instead.", Integer.valueOf(taskInstance.getId()));
            return true;
        }
        String ipList = queryWorkerGroupById.getIpList();
        if (StringUtils.isBlank(ipList)) {
            logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers", Integer.valueOf(taskInstance.getId()), Integer.valueOf(queryWorkerGroupById.getId()));
        }
        return Arrays.asList(ipList.split(",")).contains(str);
    }

    @Override // java.lang.Runnable
    public void run() {
        while (Stopper.isRunning()) {
            try {
                try {
                    ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) this.workerExecService;
                    boolean z = OSUtils.checkResource(this.conf, false).booleanValue() && checkThreadCount(threadPoolExecutor);
                    Thread.sleep(1000L);
                    if (!z) {
                        AbstractZKClient.releaseMutex((InterProcessMutex) null);
                    } else if (CollectionUtils.isEmpty(this.taskQueue.getAllTasks("tasks_queue"))) {
                        AbstractZKClient.releaseMutex((InterProcessMutex) null);
                    } else {
                        InterProcessMutex acquireZkLock = this.zkWorkerClient.acquireZkLock(this.zkWorkerClient.getZkClient(), this.zkWorkerClient.getWorkerLockPath());
                        for (String str : this.taskQueue.poll("tasks_queue", this.taskNum)) {
                            if (!StringUtils.isEmpty(str)) {
                                if (!checkThreadCount(threadPoolExecutor)) {
                                    break;
                                }
                                this.taskInstId = Integer.valueOf(getTaskInstanceId(str));
                                waitForTaskInstance();
                                this.taskInstance = this.processDao.getTaskInstanceDetailByTaskId(this.taskInstId.intValue());
                                if (verifyTaskInstanceIsNull(this.taskInstance)) {
                                    logger.warn("remove task queue : {} due to taskInstance is null", str);
                                    removeNodeFromTaskQueue(str);
                                } else {
                                    Tenant tenantForProcess = this.processDao.getTenantForProcess(this.taskInstance.getProcessInstance().getTenantId(), this.taskInstance.getProcessDefine().getUserId());
                                    if (verifyTenantIsNull(tenantForProcess)) {
                                        logger.warn("remove task queue : {} due to tenant is null", str);
                                        removeNodeFromTaskQueue(str);
                                    } else {
                                        String queryUserQueueByProcessInstanceId = this.processDao.queryUserQueueByProcessInstanceId(this.taskInstance.getProcessInstanceId());
                                        this.taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(queryUserQueueByProcessInstanceId) ? tenantForProcess.getQueue() : queryUserQueueByProcessInstanceId);
                                        this.taskInstance.getProcessInstance().setTenantCode(tenantForProcess.getTenantCode());
                                        logger.info("worker fetch taskId : {} from queue ", this.taskInstId);
                                        if (checkWorkerGroup(this.taskInstance, OSUtils.getHost())) {
                                            String execLocalPath = getExecLocalPath();
                                            logger.info("task instance  local execute path : {} ", execLocalPath);
                                            this.taskInstance.init(OSUtils.getHost(), new Date(), execLocalPath);
                                            FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, tenantForProcess.getTenantCode(), logger);
                                            logger.info("task : {} ready to submit to task scheduler thread", this.taskInstId);
                                            this.workerExecService.submit(new TaskScheduleThread(this.taskInstance, this.processDao));
                                            removeNodeFromTaskQueue(str);
                                        }
                                    }
                                }
                            }
                        }
                        AbstractZKClient.releaseMutex(acquireZkLock);
                    }
                } catch (Exception e) {
                    logger.error("fetch task thread failure", e);
                    AbstractZKClient.releaseMutex((InterProcessMutex) null);
                }
            } catch (Throwable th) {
                AbstractZKClient.releaseMutex((InterProcessMutex) null);
                throw th;
            }
        }
    }

    private void removeNodeFromTaskQueue(String str) {
        this.taskQueue.removeNode("tasks_queue", str);
    }

    private boolean verifyTaskInstanceIsNull(TaskInstance taskInstance) {
        if (taskInstance != null) {
            return false;
        }
        logger.error("task instance is null. task id : {} ", this.taskInstId);
        return true;
    }

    private boolean verifyTenantIsNull(Tenant tenant) {
        if (tenant != null) {
            return false;
        }
        logger.error("tenant not exists,process define id : {},process instance id : {},task instance id : {}", new Object[]{Integer.valueOf(this.taskInstance.getProcessDefine().getId()), Integer.valueOf(this.taskInstance.getProcessInstance().getId()), Integer.valueOf(this.taskInstance.getId())});
        return true;
    }

    private String getExecLocalPath() {
        return FileUtils.getProcessExecDir(this.taskInstance.getProcessDefine().getProjectId(), this.taskInstance.getProcessDefine().getId(), this.taskInstance.getProcessInstance().getId(), this.taskInstance.getId());
    }

    private boolean checkThreadCount(ThreadPoolExecutor threadPoolExecutor) {
        int activeCount = threadPoolExecutor.getActiveCount();
        if (activeCount < this.workerExecNums) {
            return true;
        }
        logger.info("thread insufficient , activeCount : {} , workerExecNums : {}, will sleep : {} millis for thread resource", new Object[]{Integer.valueOf(activeCount), Integer.valueOf(this.workerExecNums), 1000});
        return false;
    }

    private void waitForTaskInstance() throws Exception {
        for (int i = 30; this.taskInstance == null && i > 0; i--) {
            Thread.sleep(1000L);
            this.taskInstance = this.processDao.findTaskInstanceById(this.taskInstId);
        }
    }

    private int getTaskInstanceId(String str) {
        return Integer.parseInt(str.split("_")[3]);
    }
}
