/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.consumer;

import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TaskPriorityQueueConsumer
extends Thread {
    private static final Logger logger = LoggerFactory.getLogger(TaskPriorityQueueConsumer.class);
    @Autowired
    private TaskPriorityQueue<TaskPriority> taskPriorityQueue;
    @Autowired
    private ProcessService processService;
    @Autowired
    private ExecutorDispatcher dispatcher;
    @Autowired
    private MasterConfig masterConfig;

    @PostConstruct
    public void init() {
        super.setName("TaskUpdateQueueConsumerThread");
        super.start();
    }

    @Override
    public void run() {
        ArrayList<TaskPriority> failedDispatchTasks = new ArrayList<TaskPriority>();
        while (Stopper.isRunning()) {
            try {
                int fetchTaskNum = this.masterConfig.getMasterDispatchTaskNumber();
                failedDispatchTasks.clear();
                for (int i = 0; i < fetchTaskNum; ++i) {
                    boolean dispatchResult;
                    TaskPriority taskPriority = (TaskPriority)this.taskPriorityQueue.poll(1000L, TimeUnit.MILLISECONDS);
                    if (Objects.isNull(taskPriority) || (dispatchResult = this.dispatch(taskPriority))) continue;
                    failedDispatchTasks.add(taskPriority);
                }
                if (failedDispatchTasks.isEmpty()) continue;
                for (TaskPriority dispatchFailedTask : failedDispatchTasks) {
                    this.taskPriorityQueue.put((Object)dispatchFailedTask);
                }
                if (this.taskPriorityQueue.size() > failedDispatchTasks.size()) continue;
                TimeUnit.MILLISECONDS.sleep(1000L);
            }
            catch (Exception e) {
                logger.error("dispatcher task error", (Throwable)e);
            }
        }
    }

    protected boolean dispatch(TaskPriority taskPriority) {
        boolean result = false;
        try {
            TaskExecutionContext context = taskPriority.getTaskExecutionContext();
            ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());
            if (this.taskInstanceIsFinalState(taskPriority.getTaskId()).booleanValue()) {
                return true;
            }
            result = this.dispatcher.dispatch(executionContext);
        }
        catch (ExecuteException e) {
            logger.error("dispatch error: {}", (Object)e.getMessage(), (Object)e);
        }
        return result;
    }

    public Boolean taskInstanceIsFinalState(int taskInstanceId) {
        TaskInstance taskInstance = this.processService.findTaskInstanceById(Integer.valueOf(taskInstanceId));
        return taskInstance.getState().typeIsFinished();
    }
}

