package org.apache.dolphinscheduler.server.master.runner;

import java.util.Date;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.annotation.PostConstruct;
import lombok.NonNull;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
import org.apache.dolphinscheduler.server.master.event.WorkflowStateEvent;
import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey;
import org.apache.dolphinscheduler.service.utils.LoggerUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.class */
public class StateWheelExecuteThread extends BaseDaemonThread {
    private static final Logger logger = LoggerFactory.getLogger(StateWheelExecuteThread.class);
    private final ConcurrentLinkedQueue<Integer> processInstanceTimeoutCheckList;
    private final ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceTimeoutCheckList;
    private final ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceRetryCheckList;
    private final ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceStateCheckList;

    @Autowired
    private MasterConfig masterConfig;

    @Autowired
    @Lazy
    private WorkflowExecuteThreadPool workflowExecuteThreadPool;

    @Autowired
    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;

    protected StateWheelExecuteThread() {
        super("StateWheelExecuteThread");
        this.processInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>();
        this.taskInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>();
        this.taskInstanceRetryCheckList = new ConcurrentLinkedQueue<>();
        this.taskInstanceStateCheckList = new ConcurrentLinkedQueue<>();
    }

    @PostConstruct
    public void startWheelThread() {
        super.start();
    }

    public void run() {
        long millis = this.masterConfig.getStateWheelInterval().toMillis();
        while (!ServerLifeCycleManager.isStopped()) {
            try {
                checkTask4Timeout();
                checkTask4Retry();
                checkTask4State();
                checkProcess4Timeout();
            } catch (Exception e) {
                logger.error("state wheel thread check error:", e);
            }
            try {
                Thread.sleep(millis);
            } catch (InterruptedException e2) {
                logger.error("state wheel thread sleep error, will close the loop", e2);
                Thread.currentThread().interrupt();
                return;
            }
        }
    }

    public void addProcess4TimeoutCheck(ProcessInstance processInstance) {
        this.processInstanceTimeoutCheckList.add(processInstance.getId());
        logger.info("Success add workflow instance {} into timeout check list", processInstance.getId());
    }

    public void removeProcess4TimeoutCheck(int i) {
        if (this.processInstanceTimeoutCheckList.remove(Integer.valueOf(i))) {
            logger.info("Success remove workflow instance {} from timeout check list", Integer.valueOf(i));
        }
    }

    private void checkProcess4Timeout() {
        if (this.processInstanceTimeoutCheckList.isEmpty()) {
            return;
        }
        Iterator<Integer> it = this.processInstanceTimeoutCheckList.iterator();
        while (it.hasNext()) {
            Integer next = it.next();
            try {
                try {
                    LoggerUtils.setWorkflowInstanceIdMDC(next);
                    WorkflowExecuteRunnable byProcessInstanceId = this.processInstanceExecCacheManager.getByProcessInstanceId(next.intValue());
                    if (byProcessInstanceId == null) {
                        logger.warn("Check workflow timeout failed, can not find workflowExecuteThread from cache manager, will remove this workflowInstance from check list");
                        this.processInstanceTimeoutCheckList.remove(next);
                        LoggerUtils.removeWorkflowInstanceIdMDC();
                    } else {
                        ProcessInstance processInstance = byProcessInstanceId.getProcessInstance();
                        if (processInstance == null) {
                            logger.warn("Check workflow timeout failed, the workflowInstance is null");
                            LoggerUtils.removeWorkflowInstanceIdMDC();
                        } else {
                            if (DateUtils.getRemainTime(processInstance.getStartTime(), processInstance.getTimeout() * 60) < 0) {
                                logger.info("Workflow instance {} timeout, adding timeout event", processInstance.getId());
                                addProcessTimeoutEvent(processInstance);
                                this.processInstanceTimeoutCheckList.remove(processInstance.getId());
                                logger.info("Workflow instance timeout, added timeout event");
                            }
                            LoggerUtils.removeWorkflowInstanceIdMDC();
                        }
                    }
                } catch (Exception e) {
                    logger.error("Check workflow instance timeout error");
                    LoggerUtils.removeWorkflowInstanceIdMDC();
                }
            } catch (Throwable th) {
                LoggerUtils.removeWorkflowInstanceIdMDC();
                throw th;
            }
        }
    }

    public void addTask4TimeoutCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
        if (processInstance == null) {
            throw new NullPointerException("processInstance is marked non-null but is null");
        }
        if (taskInstance == null) {
            throw new NullPointerException("taskInstance is marked non-null but is null");
        }
        TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
        logger.info("Adding task instance into timeout check list");
        if (this.taskInstanceTimeoutCheckList.contains(taskInstanceKey)) {
            logger.warn("Task instance is already in timeout check list");
            return;
        }
        TaskDefinition taskDefine = taskInstance.getTaskDefine();
        if (taskDefine == null) {
            logger.error("Failed to add task instance into timeout check list, taskDefinition is null");
        } else if (TimeoutFlag.OPEN == taskDefine.getTimeoutFlag()) {
            this.taskInstanceTimeoutCheckList.add(taskInstanceKey);
            logger.info("Timeout flag is open, added task instance into timeout check list");
        }
    }

    public void removeTask4TimeoutCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
        if (processInstance == null) {
            throw new NullPointerException("processInstance is marked non-null but is null");
        }
        if (taskInstance == null) {
            throw new NullPointerException("taskInstance is marked non-null but is null");
        }
        this.taskInstanceTimeoutCheckList.remove(TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance));
        logger.info("remove task instance from timeout check list");
    }

    public void addTask4RetryCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
        if (processInstance == null) {
            throw new NullPointerException("processInstance is marked non-null but is null");
        }
        if (taskInstance == null) {
            throw new NullPointerException("taskInstance is marked non-null but is null");
        }
        logger.info("Adding task instance into retry check list");
        TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
        if (this.taskInstanceRetryCheckList.contains(taskInstanceKey)) {
            logger.warn("Task instance is already in retry check list");
        } else if (taskInstance.getTaskDefine() == null) {
            logger.error("Add task instance into retry check list error, taskDefinition is null");
        } else {
            this.taskInstanceRetryCheckList.add(taskInstanceKey);
            logger.info("[WorkflowInstance-{}][TaskInstanceKey-{}:{}] Added task instance into retry check list", new Object[]{processInstance.getId(), Long.valueOf(taskInstance.getTaskCode()), Integer.valueOf(taskInstance.getTaskDefinitionVersion())});
        }
    }

    public void removeTask4RetryCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
        if (processInstance == null) {
            throw new NullPointerException("processInstance is marked non-null but is null");
        }
        if (taskInstance == null) {
            throw new NullPointerException("taskInstance is marked non-null but is null");
        }
        this.taskInstanceRetryCheckList.remove(TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance));
        logger.info("remove task instance from retry check list");
    }

    public void addTask4StateCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
        if (processInstance == null) {
            throw new NullPointerException("processInstance is marked non-null but is null");
        }
        if (taskInstance == null) {
            throw new NullPointerException("taskInstance is marked non-null but is null");
        }
        logger.info("Adding task instance into state check list");
        TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
        if (this.taskInstanceStateCheckList.contains(taskInstanceKey)) {
            logger.warn("Task instance is already in state check list");
        } else if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
            this.taskInstanceStateCheckList.add(taskInstanceKey);
            logger.info("Added task instance into state check list");
        }
    }

    public void removeTask4StateCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
        if (processInstance == null) {
            throw new NullPointerException("processInstance is marked non-null but is null");
        }
        if (taskInstance == null) {
            throw new NullPointerException("taskInstance is marked non-null but is null");
        }
        this.taskInstanceStateCheckList.remove(TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance));
        logger.info("Removed task instance from state check list");
    }

    public void clearAllTasks() {
        this.processInstanceTimeoutCheckList.clear();
        this.taskInstanceTimeoutCheckList.clear();
        this.taskInstanceRetryCheckList.clear();
        this.taskInstanceStateCheckList.clear();
    }

    private void checkTask4Timeout() {
        if (this.taskInstanceTimeoutCheckList.isEmpty()) {
            return;
        }
        Iterator<TaskInstanceKey> it = this.taskInstanceTimeoutCheckList.iterator();
        while (it.hasNext()) {
            TaskInstanceKey next = it.next();
            try {
                try {
                    int processInstanceId = next.getProcessInstanceId();
                    LoggerUtils.setWorkflowInstanceIdMDC(Integer.valueOf(processInstanceId));
                    long taskCode = next.getTaskCode();
                    WorkflowExecuteRunnable byProcessInstanceId = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
                    if (byProcessInstanceId == null) {
                        logger.warn("Check task instance timeout failed, can not find workflowExecuteThread from cache manager, will remove this check task");
                        this.taskInstanceTimeoutCheckList.remove(next);
                        LoggerUtils.removeWorkflowInstanceIdMDC();
                    } else {
                        Optional<TaskInstance> activeTaskInstanceByTaskCode = byProcessInstanceId.getActiveTaskInstanceByTaskCode(taskCode);
                        if (activeTaskInstanceByTaskCode.isPresent()) {
                            TaskInstance taskInstance = activeTaskInstanceByTaskCode.get();
                            if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag() && DateUtils.getRemainTime(taskInstance.getStartTime(), taskInstance.getTaskDefine().getTimeout() * 60) < 0) {
                                logger.info("Task instance is timeout, adding task timeout event and remove the check");
                                addTaskTimeoutEvent(taskInstance);
                                this.taskInstanceTimeoutCheckList.remove(next);
                            }
                            LoggerUtils.removeWorkflowInstanceIdMDC();
                        } else {
                            logger.warn("Check task instance timeout failed, can not get taskInstance from workflowExecuteThread, taskCode: {}will remove this check task", Long.valueOf(taskCode));
                            this.taskInstanceTimeoutCheckList.remove(next);
                            LoggerUtils.removeWorkflowInstanceIdMDC();
                        }
                    }
                } catch (Exception e) {
                    logger.error("Check task timeout error, taskInstanceKey: {}", next, e);
                    LoggerUtils.removeWorkflowInstanceIdMDC();
                }
            } catch (Throwable th) {
                LoggerUtils.removeWorkflowInstanceIdMDC();
                throw th;
            }
        }
    }

    private void checkTask4Retry() {
        if (this.taskInstanceRetryCheckList.isEmpty()) {
            return;
        }
        Iterator<TaskInstanceKey> it = this.taskInstanceRetryCheckList.iterator();
        while (it.hasNext()) {
            TaskInstanceKey next = it.next();
            int processInstanceId = next.getProcessInstanceId();
            long taskCode = next.getTaskCode();
            try {
                try {
                    LoggerUtils.setWorkflowInstanceIdMDC(Integer.valueOf(processInstanceId));
                    WorkflowExecuteRunnable byProcessInstanceId = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
                    if (byProcessInstanceId == null) {
                        logger.warn("Task instance retry check failed, can not find workflowExecuteThread from cache manager, will remove this check task");
                        this.taskInstanceRetryCheckList.remove(next);
                        LoggerUtils.removeWorkflowInstanceIdMDC();
                    } else {
                        Optional<TaskInstance> retryTaskInstanceByTaskCode = byProcessInstanceId.getRetryTaskInstanceByTaskCode(taskCode);
                        ProcessInstance processInstance = byProcessInstanceId.getProcessInstance();
                        if (processInstance.getState().isReadyStop()) {
                            logger.warn("The process instance is ready to stop, will send process stop event and remove the check task");
                            addProcessStopEvent(processInstance);
                            this.taskInstanceRetryCheckList.remove(next);
                            LoggerUtils.removeWorkflowInstanceIdMDC();
                            return;
                        }
                        if (retryTaskInstanceByTaskCode.isPresent()) {
                            TaskInstance taskInstance = retryTaskInstanceByTaskCode.get();
                            if (taskInstance.getState() != TaskExecutionStatus.NEED_FAULT_TOLERANCE && taskInstance.retryTaskIntervalOverTime()) {
                                logger.info("[TaskInstanceKey-{}:{}]The task instance can retry, will retry this task instance", Long.valueOf(taskInstance.getTaskCode()), Integer.valueOf(taskInstance.getTaskDefinitionVersion()));
                                taskInstance.setEndTime((Date) null);
                                taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);
                                addTaskRetryEvent(taskInstance);
                                this.taskInstanceRetryCheckList.remove(next);
                            }
                            LoggerUtils.removeWorkflowInstanceIdMDC();
                        } else {
                            logger.warn("Task instance retry check failed, can not find taskInstance from workflowExecuteThread, will remove this check");
                            this.taskInstanceRetryCheckList.remove(next);
                            LoggerUtils.removeWorkflowInstanceIdMDC();
                        }
                    }
                } catch (Exception e) {
                    logger.error("Check task retry error, taskInstanceKey: {}", next, e);
                    LoggerUtils.removeWorkflowInstanceIdMDC();
                }
            } catch (Throwable th) {
                LoggerUtils.removeWorkflowInstanceIdMDC();
                throw th;
            }
        }
    }

    private void checkTask4State() {
        if (this.taskInstanceStateCheckList.isEmpty()) {
            return;
        }
        Iterator<TaskInstanceKey> it = this.taskInstanceStateCheckList.iterator();
        while (it.hasNext()) {
            TaskInstanceKey next = it.next();
            int processInstanceId = next.getProcessInstanceId();
            long taskCode = next.getTaskCode();
            try {
                try {
                    LoggerUtils.setTaskInstanceIdMDC(Integer.valueOf(processInstanceId));
                    WorkflowExecuteRunnable byProcessInstanceId = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
                    if (byProcessInstanceId == null) {
                        logger.warn("Task instance state check failed, can not find workflowExecuteThread from cache manager, will remove this check task");
                        this.taskInstanceStateCheckList.remove(next);
                        LoggerUtils.removeWorkflowInstanceIdMDC();
                    } else {
                        Optional<TaskInstance> activeTaskInstanceByTaskCode = byProcessInstanceId.getActiveTaskInstanceByTaskCode(taskCode);
                        if (activeTaskInstanceByTaskCode.isPresent()) {
                            TaskInstance taskInstance = activeTaskInstanceByTaskCode.get();
                            if (taskInstance.getState().isFinished()) {
                                LoggerUtils.removeWorkflowInstanceIdMDC();
                            } else {
                                addTaskStateChangeEvent(taskInstance);
                                LoggerUtils.removeWorkflowInstanceIdMDC();
                            }
                        } else {
                            logger.warn("Task instance state check failed, can not find taskInstance from workflowExecuteThread, will remove this check event");
                            this.taskInstanceStateCheckList.remove(next);
                            LoggerUtils.removeWorkflowInstanceIdMDC();
                        }
                    }
                } catch (Exception e) {
                    logger.error("Task state check error, taskInstanceKey: {}", next, e);
                    LoggerUtils.removeWorkflowInstanceIdMDC();
                }
            } catch (Throwable th) {
                LoggerUtils.removeWorkflowInstanceIdMDC();
                throw th;
            }
        }
    }

    private void addTaskStateChangeEvent(TaskInstance taskInstance) {
        this.workflowExecuteThreadPool.submitStateEvent(TaskStateEvent.builder().processInstanceId(taskInstance.getProcessInstanceId()).taskInstanceId(taskInstance.getId()).taskCode(taskInstance.getTaskCode()).type(StateEventType.TASK_STATE_CHANGE).status(TaskExecutionStatus.RUNNING_EXECUTION).build());
    }

    private void addProcessStopEvent(ProcessInstance processInstance) {
        this.workflowExecuteThreadPool.submitStateEvent(WorkflowStateEvent.builder().processInstanceId(processInstance.getId().intValue()).type(StateEventType.PROCESS_STATE_CHANGE).status(WorkflowExecutionStatus.STOP).build());
    }

    private void addTaskRetryEvent(TaskInstance taskInstance) {
        this.workflowExecuteThreadPool.submitStateEvent(TaskStateEvent.builder().processInstanceId(taskInstance.getProcessInstanceId()).taskCode(taskInstance.getTaskCode()).status(TaskExecutionStatus.RUNNING_EXECUTION).type(StateEventType.TASK_RETRY).build());
    }

    private void addTaskTimeoutEvent(TaskInstance taskInstance) {
        this.workflowExecuteThreadPool.submitStateEvent(TaskStateEvent.builder().processInstanceId(taskInstance.getProcessInstanceId()).taskInstanceId(taskInstance.getId()).type(StateEventType.TASK_TIMEOUT).taskCode(taskInstance.getTaskCode()).build());
    }

    private void addProcessTimeoutEvent(ProcessInstance processInstance) {
        this.workflowExecuteThreadPool.submitStateEvent(WorkflowStateEvent.builder().processInstanceId(processInstance.getId().intValue()).type(StateEventType.PROCESS_TIMEOUT).build());
    }
}
