/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.consumer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TaskPriorityQueueConsumer
extends Thread {
    private static final Logger logger = LoggerFactory.getLogger(TaskPriorityQueueConsumer.class);
    @Autowired
    private TaskPriorityQueue<TaskPriority> taskPriorityQueue;
    @Autowired
    private ProcessService processService;
    @Autowired
    private ExecutorDispatcher dispatcher;
    @Autowired
    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
    @Autowired
    private MasterConfig masterConfig;
    @Autowired
    private TaskEventService taskEventService;
    private ThreadPoolExecutor consumerThreadPoolExecutor;

    @PostConstruct
    public void init() {
        this.consumerThreadPoolExecutor = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor((String)"TaskUpdateQueueConsumerThread", (int)this.masterConfig.getDispatchTaskNumber());
        super.start();
    }

    @Override
    public void run() {
        int fetchTaskNum = this.masterConfig.getDispatchTaskNumber();
        while (Stopper.isRunning()) {
            try {
                List<TaskPriority> failedDispatchTasks = this.batchDispatch(fetchTaskNum);
                if (failedDispatchTasks.isEmpty()) continue;
                for (TaskPriority dispatchFailedTask : failedDispatchTasks) {
                    this.taskPriorityQueue.put((Object)dispatchFailedTask);
                }
                if (this.taskPriorityQueue.size() > failedDispatchTasks.size()) continue;
                TimeUnit.MILLISECONDS.sleep(1000L);
            }
            catch (Exception e) {
                logger.error("dispatcher task error", (Throwable)e);
            }
        }
    }

    private List<TaskPriority> batchDispatch(int fetchTaskNum) throws TaskPriorityQueueException, InterruptedException {
        List<TaskPriority> failedDispatchTasks = Collections.synchronizedList(new ArrayList());
        CountDownLatch latch = new CountDownLatch(fetchTaskNum);
        for (int i = 0; i < fetchTaskNum; ++i) {
            TaskPriority taskPriority = (TaskPriority)this.taskPriorityQueue.poll(1000L, TimeUnit.MILLISECONDS);
            if (Objects.isNull(taskPriority)) {
                latch.countDown();
                continue;
            }
            this.consumerThreadPoolExecutor.submit(() -> {
                boolean dispatchResult = this.dispatchTask(taskPriority);
                if (!dispatchResult) {
                    failedDispatchTasks.add(taskPriority);
                }
                latch.countDown();
            });
        }
        latch.await();
        return failedDispatchTasks;
    }

    protected boolean dispatchTask(TaskPriority taskPriority) {
        boolean result = false;
        try {
            TaskExecutionContext context = taskPriority.getTaskExecutionContext();
            ExecutionContext executionContext = new ExecutionContext(this.toCommand(context), ExecutorType.WORKER, context.getWorkerGroup());
            if (this.isTaskNeedToCheck(taskPriority) && this.taskInstanceIsFinalState(taskPriority.getTaskId()).booleanValue()) {
                return true;
            }
            result = this.dispatcher.dispatch(executionContext);
            if (result) {
                this.addDispatchEvent(context, executionContext);
            }
        }
        catch (RuntimeException e) {
            logger.error("dispatch error: ", (Throwable)e);
        }
        catch (ExecuteException e) {
            logger.error("dispatch error: {}", (Object)e.getMessage());
        }
        return result;
    }

    private void addDispatchEvent(TaskExecutionContext context, ExecutionContext executionContext) {
        TaskEvent taskEvent = TaskEvent.newDispatchEvent(context.getProcessInstanceId(), context.getTaskInstanceId(), executionContext.getHost().getAddress());
        this.taskEventService.addEvent(taskEvent);
    }

    private Command toCommand(TaskExecutionContext taskExecutionContext) {
        TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
        requestCommand.setTaskExecutionContext(JSONUtils.toJsonString((Object)taskExecutionContext));
        return requestCommand.convert2Command();
    }

    public Boolean taskInstanceIsFinalState(int taskInstanceId) {
        TaskInstance taskInstance = this.processService.findTaskInstanceById(Integer.valueOf(taskInstanceId));
        return taskInstance.getState().typeIsFinished();
    }

    private boolean isTaskNeedToCheck(TaskPriority taskPriority) {
        long now = System.currentTimeMillis();
        if (now - taskPriority.getCheckpoint() > 1000L) {
            taskPriority.setCheckpoint(now);
            return true;
        }
        return false;
    }
}

