package org.apache.dolphinscheduler.server.master.consumer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.class */
public class TaskPriorityQueueConsumer extends BaseDaemonThread {
    private static final Logger logger = LoggerFactory.getLogger(TaskPriorityQueueConsumer.class);

    @Autowired
    private TaskPriorityQueue<TaskPriority> taskPriorityQueue;

    @Autowired
    private ProcessService processService;

    @Autowired
    private ExecutorDispatcher dispatcher;

    @Autowired
    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;

    @Autowired
    private MasterConfig masterConfig;

    @Autowired
    private TaskEventService taskEventService;
    private ThreadPoolExecutor consumerThreadPoolExecutor;

    protected TaskPriorityQueueConsumer() {
        super("TaskPriorityQueueConsumeThread");
    }

    @PostConstruct
    public void init() {
        this.consumerThreadPoolExecutor = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("TaskUpdateQueueConsumerThread", this.masterConfig.getDispatchTaskNumber());
        logger.info("Task priority queue consume thread staring");
        super.start();
        logger.info("Task priority queue consume thread started");
    }

    public void run() {
        int dispatchTaskNumber = this.masterConfig.getDispatchTaskNumber();
        while (!ServerLifeCycleManager.isStopped()) {
            try {
                List<TaskPriority> batchDispatch = batchDispatch(dispatchTaskNumber);
                if (CollectionUtils.isNotEmpty(batchDispatch)) {
                    logger.info("{} tasks dispatch failed, will retry to dispatch", Integer.valueOf(batchDispatch.size()));
                    TaskMetrics.incTaskDispatchFailed(batchDispatch.size());
                    Iterator<TaskPriority> it = batchDispatch.iterator();
                    while (it.hasNext()) {
                        this.taskPriorityQueue.put(it.next());
                    }
                    if (dispatchTaskNumber == batchDispatch.size()) {
                        logger.info("All tasks dispatch failed, will sleep a while to avoid the master cpu higher");
                        TimeUnit.MILLISECONDS.sleep(1000L);
                    }
                }
            } catch (Exception e) {
                TaskMetrics.incTaskDispatchError();
                logger.error("dispatcher task error", e);
            }
        }
    }

    public List<TaskPriority> batchDispatch(int i) throws TaskPriorityQueueException, InterruptedException {
        List<TaskPriority> synchronizedList = Collections.synchronizedList(new ArrayList());
        CountDownLatch countDownLatch = new CountDownLatch(i);
        for (int i2 = 0; i2 < i; i2++) {
            TaskPriority taskPriority = (TaskPriority) this.taskPriorityQueue.poll(1000L, TimeUnit.MILLISECONDS);
            if (Objects.isNull(taskPriority)) {
                countDownLatch.countDown();
            } else {
                this.consumerThreadPoolExecutor.submit(() -> {
                    try {
                        if (!dispatchTask(taskPriority)) {
                            synchronizedList.add(taskPriority);
                        }
                    } finally {
                        countDownLatch.countDown();
                    }
                });
            }
        }
        countDownLatch.await();
        return synchronizedList;
    }

    protected boolean dispatchTask(TaskPriority taskPriority) {
        WorkflowExecuteRunnable byProcessInstanceId;
        TaskMetrics.incTaskDispatch();
        boolean z = false;
        try {
            byProcessInstanceId = this.processInstanceExecCacheManager.getByProcessInstanceId(taskPriority.getProcessInstanceId());
        } catch (RuntimeException | ExecuteException e) {
            logger.error("Master dispatch task to worker error, taskPriority: {}", taskPriority, e);
        }
        if (byProcessInstanceId == null) {
            logger.error("Cannot find the related processInstance of the task, taskPriority: {}", taskPriority);
            return true;
        }
        Optional<TaskInstance> taskInstance = byProcessInstanceId.getTaskInstance(taskPriority.getTaskId());
        if (!taskInstance.isPresent()) {
            logger.error("Cannot find the task instance from related processInstance, taskPriority: {}", taskPriority);
            return true;
        }
        TaskInstance taskInstance2 = taskInstance.get();
        TaskExecutionContext taskExecutionContext = taskPriority.getTaskExecutionContext();
        ExecutionContext executionContext = new ExecutionContext(toCommand(taskExecutionContext), ExecutorType.WORKER, taskExecutionContext.getWorkerGroup(), taskInstance2);
        if (isTaskNeedToCheck(taskPriority) && taskInstanceIsFinalState(taskPriority.getTaskId())) {
            logger.info("Task {} is already finished, no need to dispatch, task instance id: {}", taskInstance2.getName(), taskInstance2.getId());
            return true;
        }
        z = this.dispatcher.dispatch(executionContext).booleanValue();
        if (z) {
            logger.info("Master success dispatch task to worker, taskInstanceId: {}, worker: {}", Integer.valueOf(taskPriority.getTaskId()), executionContext.getHost());
            addDispatchEvent(taskExecutionContext, executionContext);
        } else {
            logger.info("Master failed to dispatch task to worker, taskInstanceId: {}, worker: {}", Integer.valueOf(taskPriority.getTaskId()), executionContext.getHost());
        }
        return z;
    }

    private void addDispatchEvent(TaskExecutionContext taskExecutionContext, ExecutionContext executionContext) {
        this.taskEventService.addEvent(TaskEvent.newDispatchEvent(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId(), executionContext.getHost().getAddress()));
    }

    private Command toCommand(TaskExecutionContext taskExecutionContext) {
        return new TaskDispatchCommand(taskExecutionContext, this.masterConfig.getMasterAddress(), taskExecutionContext.getHost(), System.currentTimeMillis()).convert2Command();
    }

    public boolean taskInstanceIsFinalState(int i) {
        return this.processService.findTaskInstanceById(Integer.valueOf(i)).getState().isFinished();
    }

    private boolean isTaskNeedToCheck(TaskPriority taskPriority) {
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis - taskPriority.getCheckpoint() <= 1000) {
            return false;
        }
        taskPriority.setCheckpoint(currentTimeMillis);
        return true;
    }
}
