package org.apache.dolphinscheduler.server.master.runner;

import java.util.Date;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey;
import org.apache.hadoop.util.ThreadUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.class */
public class StateWheelExecuteThread extends Thread {
    private static final Logger logger = LoggerFactory.getLogger(StateWheelExecuteThread.class);
    private ConcurrentLinkedQueue<Integer> processInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>();
    private ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>();
    private ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceRetryCheckList = new ConcurrentLinkedQueue<>();
    private ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceStateCheckList = new ConcurrentLinkedQueue<>();

    @Autowired
    private MasterConfig masterConfig;

    @Autowired
    private WorkflowExecuteThreadPool workflowExecuteThreadPool;

    @Autowired
    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (Stopper.isRunning()) {
            try {
                checkTask4Timeout();
                checkTask4Retry();
                checkTask4State();
                checkProcess4Timeout();
            } catch (Exception e) {
                logger.error("state wheel thread check error:", e);
            }
            ThreadUtil.sleepAtLeastIgnoreInterrupts(this.masterConfig.getStateWheelInterval() * 1000);
        }
    }

    public void addProcess4TimeoutCheck(ProcessInstance processInstance) {
        this.processInstanceTimeoutCheckList.add(Integer.valueOf(processInstance.getId()));
    }

    public void removeProcess4TimeoutCheck(ProcessInstance processInstance) {
        this.processInstanceTimeoutCheckList.remove(Integer.valueOf(processInstance.getId()));
    }

    private void checkProcess4Timeout() {
        if (this.processInstanceTimeoutCheckList.isEmpty()) {
            return;
        }
        Iterator<Integer> it = this.processInstanceTimeoutCheckList.iterator();
        while (it.hasNext()) {
            Integer next = it.next();
            if (next != null) {
                WorkflowExecuteThread byProcessInstanceId = this.processInstanceExecCacheManager.getByProcessInstanceId(next.intValue());
                if (byProcessInstanceId == null) {
                    logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}", next);
                    this.processInstanceTimeoutCheckList.remove(next);
                } else {
                    ProcessInstance processInstance = byProcessInstanceId.getProcessInstance();
                    if (processInstance != null && DateUtils.getRemainTime(processInstance.getStartTime(), processInstance.getTimeout() * 60) < 0) {
                        addProcessTimeoutEvent(processInstance);
                        this.processInstanceTimeoutCheckList.remove(Integer.valueOf(processInstance.getId()));
                    }
                }
            }
        }
    }

    public void addTask4TimeoutCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
        TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
        if (taskInstanceKey == null) {
            logger.error("taskInstanceKey is null");
            return;
        }
        if (this.taskInstanceTimeoutCheckList.contains(taskInstanceKey)) {
            return;
        }
        TaskDefinition taskDefine = taskInstance.getTaskDefine();
        if (taskDefine == null) {
            logger.error("taskDefinition is null, taskId:{}", Integer.valueOf(taskInstance.getId()));
            return;
        }
        if (TimeoutFlag.OPEN == taskDefine.getTimeoutFlag()) {
            this.taskInstanceTimeoutCheckList.add(taskInstanceKey);
        }
        if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
            this.taskInstanceTimeoutCheckList.add(taskInstanceKey);
        }
    }

    public void removeTask4TimeoutCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
        TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
        if (taskInstanceKey == null) {
            logger.error("taskInstanceKey is null");
        } else {
            this.taskInstanceTimeoutCheckList.remove(taskInstanceKey);
        }
    }

    public void addTask4RetryCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
        TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
        if (taskInstanceKey == null) {
            logger.error("taskInstanceKey is null");
            return;
        }
        if (this.taskInstanceRetryCheckList.contains(taskInstanceKey)) {
            return;
        }
        if (taskInstance.getTaskDefine() == null) {
            logger.error("taskDefinition is null, taskId:{}", Integer.valueOf(taskInstance.getId()));
        } else {
            logger.debug("addTask4RetryCheck, taskCode:{}, processInstanceId:{}", Long.valueOf(taskInstance.getTaskCode()), Integer.valueOf(taskInstance.getProcessInstanceId()));
            this.taskInstanceRetryCheckList.add(taskInstanceKey);
        }
    }

    public void removeTask4RetryCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
        TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
        if (taskInstanceKey == null) {
            logger.error("taskInstanceKey is null");
        } else {
            this.taskInstanceRetryCheckList.remove(taskInstanceKey);
        }
    }

    public void addTask4StateCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
        TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
        if (taskInstanceKey == null) {
            logger.error("taskInstanceKey is null");
        } else {
            if (this.taskInstanceStateCheckList.contains(taskInstanceKey)) {
                return;
            }
            if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
                this.taskInstanceStateCheckList.add(taskInstanceKey);
            }
        }
    }

    public void removeTask4StateCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
        TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
        if (taskInstanceKey == null) {
            logger.error("taskInstanceKey is null");
        } else {
            this.taskInstanceStateCheckList.remove(taskInstanceKey);
        }
    }

    private void checkTask4Timeout() {
        if (this.taskInstanceTimeoutCheckList.isEmpty()) {
            return;
        }
        Iterator<TaskInstanceKey> it = this.taskInstanceTimeoutCheckList.iterator();
        while (it.hasNext()) {
            TaskInstanceKey next = it.next();
            int processInstanceId = next.getProcessInstanceId();
            long taskCode = next.getTaskCode();
            WorkflowExecuteThread byProcessInstanceId = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
            if (byProcessInstanceId == null) {
                logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", Integer.valueOf(processInstanceId), Long.valueOf(taskCode));
                this.taskInstanceTimeoutCheckList.remove(next);
            } else {
                TaskInstance activeTaskInstanceByTaskCode = byProcessInstanceId.getActiveTaskInstanceByTaskCode(taskCode);
                if (activeTaskInstanceByTaskCode == null) {
                    logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", Integer.valueOf(processInstanceId), Long.valueOf(taskCode));
                    this.taskInstanceTimeoutCheckList.remove(next);
                } else if (TimeoutFlag.OPEN == activeTaskInstanceByTaskCode.getTaskDefine().getTimeoutFlag() && DateUtils.getRemainTime(activeTaskInstanceByTaskCode.getStartTime(), activeTaskInstanceByTaskCode.getTaskDefine().getTimeout() * 60) < 0) {
                    addTaskTimeoutEvent(activeTaskInstanceByTaskCode);
                    this.taskInstanceTimeoutCheckList.remove(next);
                }
            }
        }
    }

    private void checkTask4Retry() {
        if (this.taskInstanceRetryCheckList.isEmpty()) {
            return;
        }
        Iterator<TaskInstanceKey> it = this.taskInstanceRetryCheckList.iterator();
        while (it.hasNext()) {
            TaskInstanceKey next = it.next();
            int processInstanceId = next.getProcessInstanceId();
            long taskCode = next.getTaskCode();
            WorkflowExecuteThread byProcessInstanceId = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
            if (byProcessInstanceId == null) {
                logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", Integer.valueOf(processInstanceId), Long.valueOf(taskCode));
                this.taskInstanceRetryCheckList.remove(next);
            } else {
                TaskInstance retryTaskInstanceByTaskCode = byProcessInstanceId.getRetryTaskInstanceByTaskCode(taskCode);
                ProcessInstance processInstance = byProcessInstanceId.getProcessInstance();
                if (processInstance.getState() == ExecutionStatus.READY_STOP) {
                    addProcessStopEvent(processInstance);
                    this.taskInstanceRetryCheckList.remove(next);
                    return;
                } else if (retryTaskInstanceByTaskCode == null) {
                    logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", Integer.valueOf(processInstanceId), Long.valueOf(taskCode));
                    this.taskInstanceRetryCheckList.remove(next);
                } else if (retryTaskInstanceByTaskCode.retryTaskIntervalOverTime()) {
                    retryTaskInstanceByTaskCode.setEndTime((Date) null);
                    retryTaskInstanceByTaskCode.setState(ExecutionStatus.SUBMITTED_SUCCESS);
                    addTaskRetryEvent(retryTaskInstanceByTaskCode);
                    this.taskInstanceRetryCheckList.remove(next);
                }
            }
        }
    }

    private void checkTask4State() {
        if (this.taskInstanceStateCheckList.isEmpty()) {
            return;
        }
        Iterator<TaskInstanceKey> it = this.taskInstanceStateCheckList.iterator();
        while (it.hasNext()) {
            TaskInstanceKey next = it.next();
            int processInstanceId = next.getProcessInstanceId();
            long taskCode = next.getTaskCode();
            WorkflowExecuteThread byProcessInstanceId = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
            if (byProcessInstanceId == null) {
                logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", Integer.valueOf(processInstanceId), Long.valueOf(taskCode));
                this.taskInstanceStateCheckList.remove(next);
            } else {
                TaskInstance activeTaskInstanceByTaskCode = byProcessInstanceId.getActiveTaskInstanceByTaskCode(taskCode);
                if (activeTaskInstanceByTaskCode == null) {
                    logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", Integer.valueOf(processInstanceId), Long.valueOf(taskCode));
                    this.taskInstanceStateCheckList.remove(next);
                } else if (!activeTaskInstanceByTaskCode.getState().typeIsFinished()) {
                    addTaskStateChangeEvent(activeTaskInstanceByTaskCode);
                }
            }
        }
    }

    private void addTaskStateChangeEvent(TaskInstance taskInstance) {
        StateEvent stateEvent = new StateEvent();
        stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
        stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId());
        stateEvent.setTaskInstanceId(taskInstance.getId());
        stateEvent.setTaskCode(taskInstance.getTaskCode());
        stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
        this.workflowExecuteThreadPool.submitStateEvent(stateEvent);
    }

    private void addProcessStopEvent(ProcessInstance processInstance) {
        StateEvent stateEvent = new StateEvent();
        stateEvent.setType(StateEventType.PROCESS_STATE_CHANGE);
        stateEvent.setProcessInstanceId(processInstance.getId());
        stateEvent.setExecutionStatus(ExecutionStatus.STOP);
        this.workflowExecuteThreadPool.submitStateEvent(stateEvent);
    }

    private void addTaskRetryEvent(TaskInstance taskInstance) {
        StateEvent stateEvent = new StateEvent();
        stateEvent.setType(StateEventType.TASK_RETRY);
        stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId());
        stateEvent.setTaskInstanceId(taskInstance.getId());
        stateEvent.setTaskCode(taskInstance.getTaskCode());
        stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
        this.workflowExecuteThreadPool.submitStateEvent(stateEvent);
    }

    private void addTaskTimeoutEvent(TaskInstance taskInstance) {
        StateEvent stateEvent = new StateEvent();
        stateEvent.setType(StateEventType.TASK_TIMEOUT);
        stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId());
        stateEvent.setTaskInstanceId(taskInstance.getId());
        stateEvent.setTaskCode(taskInstance.getTaskCode());
        this.workflowExecuteThreadPool.submitStateEvent(stateEvent);
    }

    private void addProcessTimeoutEvent(ProcessInstance processInstance) {
        StateEvent stateEvent = new StateEvent();
        stateEvent.setType(StateEventType.PROCESS_TIMEOUT);
        stateEvent.setProcessInstanceId(processInstance.getId());
        this.workflowExecuteThreadPool.submitStateEvent(stateEvent);
    }
}
