/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.runner;

import java.util.Date;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey;
import org.apache.hadoop.util.ThreadUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class StateWheelExecuteThread
extends Thread {
    private static final Logger logger = LoggerFactory.getLogger(StateWheelExecuteThread.class);
    private ConcurrentLinkedQueue<Integer> processInstanceTimeoutCheckList = new ConcurrentLinkedQueue();
    private ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceTimeoutCheckList = new ConcurrentLinkedQueue();
    private ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceRetryCheckList = new ConcurrentLinkedQueue();
    private ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceStateCheckList = new ConcurrentLinkedQueue();
    @Autowired
    private MasterConfig masterConfig;
    @Autowired
    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
    @Autowired
    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;

    @Override
    public void run() {
        while (Stopper.isRunning()) {
            try {
                this.checkTask4Timeout();
                this.checkTask4Retry();
                this.checkTask4State();
                this.checkProcess4Timeout();
            }
            catch (Exception e) {
                logger.error("state wheel thread check error:", (Throwable)e);
            }
            ThreadUtil.sleepAtLeastIgnoreInterrupts((long)((long)this.masterConfig.getStateWheelInterval() * 1000L));
        }
    }

    public void addProcess4TimeoutCheck(ProcessInstance processInstance) {
        this.processInstanceTimeoutCheckList.add(processInstance.getId());
    }

    public void removeProcess4TimeoutCheck(ProcessInstance processInstance) {
        this.processInstanceTimeoutCheckList.remove(processInstance.getId());
    }

    private void checkProcess4Timeout() {
        if (this.processInstanceTimeoutCheckList.isEmpty()) {
            return;
        }
        for (Integer processInstanceId : this.processInstanceTimeoutCheckList) {
            long timeRemain;
            if (processInstanceId == null) continue;
            WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
            if (workflowExecuteThread == null) {
                logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}", (Object)processInstanceId);
                this.processInstanceTimeoutCheckList.remove(processInstanceId);
                continue;
            }
            ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
            if (processInstance == null || (timeRemain = DateUtils.getRemainTime((Date)processInstance.getStartTime(), (long)((long)processInstance.getTimeout() * 60L))) >= 0L) continue;
            this.addProcessTimeoutEvent(processInstance);
            this.processInstanceTimeoutCheckList.remove(processInstance.getId());
        }
    }

    public void addTask4TimeoutCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
        TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
        if (taskInstanceKey == null) {
            logger.error("taskInstanceKey is null");
            return;
        }
        if (this.taskInstanceTimeoutCheckList.contains(taskInstanceKey)) {
            return;
        }
        TaskDefinition taskDefinition = taskInstance.getTaskDefine();
        if (taskDefinition == null) {
            logger.error("taskDefinition is null, taskId:{}", (Object)taskInstance.getId());
            return;
        }
        if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag()) {
            this.taskInstanceTimeoutCheckList.add(taskInstanceKey);
        }
        if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
            this.taskInstanceTimeoutCheckList.add(taskInstanceKey);
        }
    }

    public void removeTask4TimeoutCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
        TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
        if (taskInstanceKey == null) {
            logger.error("taskInstanceKey is null");
            return;
        }
        this.taskInstanceTimeoutCheckList.remove(taskInstanceKey);
    }

    public void addTask4RetryCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
        TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
        if (taskInstanceKey == null) {
            logger.error("taskInstanceKey is null");
            return;
        }
        if (this.taskInstanceRetryCheckList.contains(taskInstanceKey)) {
            return;
        }
        TaskDefinition taskDefinition = taskInstance.getTaskDefine();
        if (taskDefinition == null) {
            logger.error("taskDefinition is null, taskId:{}", (Object)taskInstance.getId());
            return;
        }
        logger.debug("addTask4RetryCheck, taskCode:{}, processInstanceId:{}", (Object)taskInstance.getTaskCode(), (Object)taskInstance.getProcessInstanceId());
        this.taskInstanceRetryCheckList.add(taskInstanceKey);
    }

    public void removeTask4RetryCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
        TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
        if (taskInstanceKey == null) {
            logger.error("taskInstanceKey is null");
            return;
        }
        this.taskInstanceRetryCheckList.remove(taskInstanceKey);
    }

    public void addTask4StateCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
        TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
        if (taskInstanceKey == null) {
            logger.error("taskInstanceKey is null");
            return;
        }
        if (this.taskInstanceStateCheckList.contains(taskInstanceKey)) {
            return;
        }
        if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
            this.taskInstanceStateCheckList.add(taskInstanceKey);
        }
    }

    public void removeTask4StateCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
        TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
        if (taskInstanceKey == null) {
            logger.error("taskInstanceKey is null");
            return;
        }
        this.taskInstanceStateCheckList.remove(taskInstanceKey);
    }

    private void checkTask4Timeout() {
        if (this.taskInstanceTimeoutCheckList.isEmpty()) {
            return;
        }
        for (TaskInstanceKey taskInstanceKey : this.taskInstanceTimeoutCheckList) {
            long timeRemain;
            int processInstanceId = taskInstanceKey.getProcessInstanceId();
            long taskCode = taskInstanceKey.getTaskCode();
            WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
            if (workflowExecuteThread == null) {
                logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", (Object)processInstanceId, (Object)taskCode);
                this.taskInstanceTimeoutCheckList.remove(taskInstanceKey);
                continue;
            }
            TaskInstance taskInstance = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
            if (taskInstance == null) {
                logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", (Object)processInstanceId, (Object)taskCode);
                this.taskInstanceTimeoutCheckList.remove(taskInstanceKey);
                continue;
            }
            if (TimeoutFlag.OPEN != taskInstance.getTaskDefine().getTimeoutFlag() || (timeRemain = DateUtils.getRemainTime((Date)taskInstance.getStartTime(), (long)((long)taskInstance.getTaskDefine().getTimeout() * 60L))) >= 0L) continue;
            this.addTaskTimeoutEvent(taskInstance);
            this.taskInstanceTimeoutCheckList.remove(taskInstanceKey);
        }
    }

    private void checkTask4Retry() {
        if (this.taskInstanceRetryCheckList.isEmpty()) {
            return;
        }
        for (TaskInstanceKey taskInstanceKey : this.taskInstanceRetryCheckList) {
            int processInstanceId = taskInstanceKey.getProcessInstanceId();
            long taskCode = taskInstanceKey.getTaskCode();
            WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
            if (workflowExecuteThread == null) {
                logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", (Object)processInstanceId, (Object)taskCode);
                this.taskInstanceRetryCheckList.remove(taskInstanceKey);
                continue;
            }
            TaskInstance taskInstance = workflowExecuteThread.getRetryTaskInstanceByTaskCode(taskCode);
            ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
            if (processInstance.getState() == ExecutionStatus.READY_STOP) {
                this.addProcessStopEvent(processInstance);
                this.taskInstanceRetryCheckList.remove(taskInstanceKey);
                break;
            }
            if (taskInstance == null) {
                logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", (Object)processInstanceId, (Object)taskCode);
                this.taskInstanceRetryCheckList.remove(taskInstanceKey);
                continue;
            }
            if (!taskInstance.retryTaskIntervalOverTime()) continue;
            taskInstance.setEndTime(null);
            taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
            this.addTaskRetryEvent(taskInstance);
            this.taskInstanceRetryCheckList.remove(taskInstanceKey);
        }
    }

    private void checkTask4State() {
        if (this.taskInstanceStateCheckList.isEmpty()) {
            return;
        }
        for (TaskInstanceKey taskInstanceKey : this.taskInstanceStateCheckList) {
            int processInstanceId = taskInstanceKey.getProcessInstanceId();
            long taskCode = taskInstanceKey.getTaskCode();
            WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
            if (workflowExecuteThread == null) {
                logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", (Object)processInstanceId, (Object)taskCode);
                this.taskInstanceStateCheckList.remove(taskInstanceKey);
                continue;
            }
            TaskInstance taskInstance = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
            if (taskInstance == null) {
                logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", (Object)processInstanceId, (Object)taskCode);
                this.taskInstanceStateCheckList.remove(taskInstanceKey);
                continue;
            }
            if (taskInstance.getState().typeIsFinished()) continue;
            this.addTaskStateChangeEvent(taskInstance);
        }
    }

    private void addTaskStateChangeEvent(TaskInstance taskInstance) {
        StateEvent stateEvent = new StateEvent();
        stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
        stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId());
        stateEvent.setTaskInstanceId(taskInstance.getId());
        stateEvent.setTaskCode(taskInstance.getTaskCode());
        stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
        this.workflowExecuteThreadPool.submitStateEvent(stateEvent);
    }

    private void addProcessStopEvent(ProcessInstance processInstance) {
        StateEvent stateEvent = new StateEvent();
        stateEvent.setType(StateEventType.PROCESS_STATE_CHANGE);
        stateEvent.setProcessInstanceId(processInstance.getId());
        stateEvent.setExecutionStatus(ExecutionStatus.STOP);
        this.workflowExecuteThreadPool.submitStateEvent(stateEvent);
    }

    private void addTaskRetryEvent(TaskInstance taskInstance) {
        StateEvent stateEvent = new StateEvent();
        stateEvent.setType(StateEventType.TASK_RETRY);
        stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId());
        stateEvent.setTaskInstanceId(taskInstance.getId());
        stateEvent.setTaskCode(taskInstance.getTaskCode());
        stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
        this.workflowExecuteThreadPool.submitStateEvent(stateEvent);
    }

    private void addTaskTimeoutEvent(TaskInstance taskInstance) {
        StateEvent stateEvent = new StateEvent();
        stateEvent.setType(StateEventType.TASK_TIMEOUT);
        stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId());
        stateEvent.setTaskInstanceId(taskInstance.getId());
        stateEvent.setTaskCode(taskInstance.getTaskCode());
        this.workflowExecuteThreadPool.submitStateEvent(stateEvent);
    }

    private void addProcessTimeoutEvent(ProcessInstance processInstance) {
        StateEvent stateEvent = new StateEvent();
        stateEvent.setType(StateEventType.PROCESS_TIMEOUT);
        stateEvent.setProcessInstanceId(processInstance.getId());
        this.workflowExecuteThreadPool.submitStateEvent(stateEvent);
    }
}

