/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.runner;

import java.util.Date;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import lombok.Generated;
import lombok.NonNull;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
import org.apache.dolphinscheduler.server.master.event.WorkflowStateEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

@Component
public class StateWheelExecuteThread
extends BaseDaemonThread {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(StateWheelExecuteThread.class);
    private final ConcurrentLinkedQueue<Integer> processInstanceTimeoutCheckList = new ConcurrentLinkedQueue();
    private final ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceTimeoutCheckList = new ConcurrentLinkedQueue();
    private final ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceRetryCheckList = new ConcurrentLinkedQueue();
    @Autowired
    private MasterConfig masterConfig;
    @Lazy
    @Autowired
    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
    @Autowired
    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;

    protected StateWheelExecuteThread() {
        super("StateWheelExecuteThread");
    }

    @PostConstruct
    public void startWheelThread() {
        super.start();
    }

    public void run() {
        long checkInterval = this.masterConfig.getStateWheelInterval().toMillis();
        while (!ServerLifeCycleManager.isStopped()) {
            try {
                this.checkTask4Timeout();
                this.checkTask4Retry();
                this.checkProcess4Timeout();
            }
            catch (Exception e) {
                log.error("state wheel thread check error:", (Throwable)e);
            }
            try {
                Thread.sleep(checkInterval);
            }
            catch (InterruptedException e) {
                log.error("state wheel thread sleep error, will close the loop", (Throwable)e);
                Thread.currentThread().interrupt();
                break;
            }
        }
    }

    public void addProcess4TimeoutCheck(ProcessInstance processInstance) {
        this.processInstanceTimeoutCheckList.add(processInstance.getId());
        log.info("Success add workflow instance {} into timeout check list", (Object)processInstance.getId());
    }

    public void removeProcess4TimeoutCheck(int processInstanceId) {
        boolean removeFlag = this.processInstanceTimeoutCheckList.remove(processInstanceId);
        if (removeFlag) {
            log.info("Success remove workflow instance {} from timeout check list", (Object)processInstanceId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkProcess4Timeout() {
        if (this.processInstanceTimeoutCheckList.isEmpty()) {
            return;
        }
        for (Integer processInstanceId : this.processInstanceTimeoutCheckList) {
            try {
                LogUtils.setWorkflowInstanceIdMDC((Integer)processInstanceId);
                WorkflowExecuteRunnable workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
                if (workflowExecuteThread == null) {
                    log.warn("Check workflow timeout failed, can not find workflowExecuteThread from cache manager, will remove this workflowInstance from check list");
                    this.processInstanceTimeoutCheckList.remove(processInstanceId);
                    continue;
                }
                ProcessInstance processInstance = workflowExecuteThread.getWorkflowExecuteContext().getWorkflowInstance();
                if (processInstance == null) {
                    log.warn("Check workflow timeout failed, the workflowInstance is null");
                    continue;
                }
                long timeRemain = DateUtils.getRemainTime((Date)processInstance.getStartTime(), (long)TimeUnit.MINUTES.toSeconds(processInstance.getTimeout()));
                if (timeRemain >= 0L) continue;
                log.info("Workflow instance {} timeout, adding timeout event", (Object)processInstance.getId());
                this.addProcessTimeoutEvent(processInstance);
                this.processInstanceTimeoutCheckList.remove(processInstance.getId());
                log.info("Workflow instance timeout, added timeout event");
            }
            catch (Exception ex) {
                log.error("Check workflow instance timeout error");
            }
            finally {
                LogUtils.removeWorkflowInstanceIdMDC();
            }
        }
    }

    public void addTask4TimeoutCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
        if (processInstance == null) {
            throw new NullPointerException("processInstance is marked non-null but is null");
        }
        if (taskInstance == null) {
            throw new NullPointerException("taskInstance is marked non-null but is null");
        }
        TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
        log.info("Adding task instance into timeout check list");
        if (this.taskInstanceTimeoutCheckList.contains(taskInstanceKey)) {
            log.warn("Task instance is already in timeout check list");
            return;
        }
        TaskDefinition taskDefinition = taskInstance.getTaskDefine();
        if (taskDefinition == null) {
            log.error("Failed to add task instance into timeout check list, taskDefinition is null");
            return;
        }
        if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag()) {
            this.taskInstanceTimeoutCheckList.add(taskInstanceKey);
            log.info("Timeout flag is open, added task instance into timeout check list");
        }
    }

    public void removeTask4TimeoutCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
        if (processInstance == null) {
            throw new NullPointerException("processInstance is marked non-null but is null");
        }
        if (taskInstance == null) {
            throw new NullPointerException("taskInstance is marked non-null but is null");
        }
        TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
        this.taskInstanceTimeoutCheckList.remove(taskInstanceKey);
        log.info("remove task instance from timeout check list");
    }

    public void addTask4RetryCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
        if (processInstance == null) {
            throw new NullPointerException("processInstance is marked non-null but is null");
        }
        if (taskInstance == null) {
            throw new NullPointerException("taskInstance is marked non-null but is null");
        }
        log.info("Adding task instance into retry check list");
        TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
        if (this.taskInstanceRetryCheckList.contains(taskInstanceKey)) {
            log.warn("Task instance is already in retry check list");
            return;
        }
        TaskDefinition taskDefinition = taskInstance.getTaskDefine();
        if (taskDefinition == null) {
            log.error("Add task instance into retry check list error, taskDefinition is null");
            return;
        }
        this.taskInstanceRetryCheckList.add(taskInstanceKey);
        log.info("[WorkflowInstance-{}][TaskInstanceKey-{}:{}] Added task instance into retry check list", new Object[]{processInstance.getId(), taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()});
    }

    public void removeTask4RetryCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
        if (processInstance == null) {
            throw new NullPointerException("processInstance is marked non-null but is null");
        }
        if (taskInstance == null) {
            throw new NullPointerException("taskInstance is marked non-null but is null");
        }
        TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
        this.taskInstanceRetryCheckList.remove(taskInstanceKey);
        log.info("remove task instance from retry check list");
    }

    public void clearAllTasks() {
        this.processInstanceTimeoutCheckList.clear();
        this.taskInstanceTimeoutCheckList.clear();
        this.taskInstanceRetryCheckList.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkTask4Timeout() {
        if (this.taskInstanceTimeoutCheckList.isEmpty()) {
            return;
        }
        for (TaskInstanceKey taskInstanceKey : this.taskInstanceTimeoutCheckList) {
            try {
                long timeRemain;
                LogUtils.setWorkflowInstanceIdMDC((Integer)taskInstanceKey.getProcessInstanceId());
                int processInstanceId = taskInstanceKey.getProcessInstanceId();
                long taskCode = taskInstanceKey.getTaskCode();
                WorkflowExecuteRunnable workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
                if (workflowExecuteThread == null) {
                    log.warn("Check task instance timeout failed, can not find workflowExecuteThread from cache manager, will remove this check task");
                    this.taskInstanceTimeoutCheckList.remove(taskInstanceKey);
                    continue;
                }
                Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
                if (!taskInstanceOptional.isPresent()) {
                    log.warn("Check task instance timeout failed, can not get taskInstance from workflowExecuteThread, taskCode: {}will remove this check task", (Object)taskCode);
                    this.taskInstanceTimeoutCheckList.remove(taskInstanceKey);
                    continue;
                }
                TaskInstance taskInstance = taskInstanceOptional.get();
                if (TimeoutFlag.OPEN != taskInstance.getTaskDefine().getTimeoutFlag() || (timeRemain = DateUtils.getRemainTime((Date)taskInstance.getStartTime(), (long)TimeUnit.MINUTES.toSeconds(taskInstance.getTaskDefine().getTimeout()))) >= 0L) continue;
                log.info("Task instance is timeout, adding task timeout event and remove the check");
                this.addTaskTimeoutEvent(taskInstance);
                this.taskInstanceTimeoutCheckList.remove(taskInstanceKey);
            }
            catch (Exception ex) {
                log.error("Check task timeout error, taskInstanceKey: {}", (Object)taskInstanceKey, (Object)ex);
            }
            finally {
                LogUtils.removeWorkflowInstanceIdMDC();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkTask4Retry() {
        if (this.taskInstanceRetryCheckList.isEmpty()) {
            return;
        }
        for (TaskInstanceKey taskInstanceKey : this.taskInstanceRetryCheckList) {
            int processInstanceId = taskInstanceKey.getProcessInstanceId();
            long taskCode = taskInstanceKey.getTaskCode();
            try {
                LogUtils.setWorkflowInstanceIdMDC((Integer)processInstanceId);
                WorkflowExecuteRunnable workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
                if (workflowExecuteThread == null) {
                    log.warn("Task instance retry check failed, can not find workflowExecuteThread from cache manager, will remove this check task");
                    this.taskInstanceRetryCheckList.remove(taskInstanceKey);
                    continue;
                }
                Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getRetryTaskInstanceByTaskCode(taskCode);
                ProcessInstance processInstance = workflowExecuteThread.getWorkflowExecuteContext().getWorkflowInstance();
                if (processInstance.getState().isReadyStop()) {
                    log.warn("The process instance is ready to stop, will send process stop event and remove the check task");
                    this.addProcessStopEvent(processInstance);
                    this.taskInstanceRetryCheckList.remove(taskInstanceKey);
                    break;
                }
                if (!taskInstanceOptional.isPresent()) {
                    log.warn("Task instance retry check failed, can not find taskInstance from workflowExecuteThread, will remove this check");
                    this.taskInstanceRetryCheckList.remove(taskInstanceKey);
                    continue;
                }
                TaskInstance taskInstance = taskInstanceOptional.get();
                if (taskInstance.getState() == TaskExecutionStatus.NEED_FAULT_TOLERANCE || !taskInstance.retryTaskIntervalOverTime()) continue;
                log.info("[TaskInstanceKey-{}:{}]The task instance can retry, will retry this task instance", (Object)taskInstance.getTaskCode(), (Object)taskInstance.getTaskDefinitionVersion());
                taskInstance.setEndTime(null);
                taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);
                this.addTaskRetryEvent(taskInstance);
                this.taskInstanceRetryCheckList.remove(taskInstanceKey);
            }
            catch (Exception ex) {
                log.error("Check task retry error, taskInstanceKey: {}", (Object)taskInstanceKey, (Object)ex);
            }
            finally {
                LogUtils.removeWorkflowInstanceIdMDC();
            }
        }
    }

    private void addProcessStopEvent(ProcessInstance processInstance) {
        WorkflowStateEvent stateEvent = WorkflowStateEvent.builder().processInstanceId(processInstance.getId()).type(StateEventType.PROCESS_STATE_CHANGE).status(WorkflowExecutionStatus.STOP).build();
        this.workflowExecuteThreadPool.submitStateEvent(stateEvent);
    }

    private void addTaskRetryEvent(TaskInstance taskInstance) {
        TaskStateEvent stateEvent = TaskStateEvent.builder().processInstanceId(taskInstance.getProcessInstanceId()).taskCode(taskInstance.getTaskCode()).status(TaskExecutionStatus.RUNNING_EXECUTION).type(StateEventType.TASK_RETRY).build();
        this.workflowExecuteThreadPool.submitStateEvent(stateEvent);
    }

    private void addTaskTimeoutEvent(TaskInstance taskInstance) {
        TaskStateEvent stateEvent = TaskStateEvent.builder().processInstanceId(taskInstance.getProcessInstanceId()).taskInstanceId(taskInstance.getId()).type(StateEventType.TASK_TIMEOUT).taskCode(taskInstance.getTaskCode()).build();
        this.workflowExecuteThreadPool.submitStateEvent(stateEvent);
    }

    private void addProcessTimeoutEvent(ProcessInstance processInstance) {
        WorkflowStateEvent stateEvent = WorkflowStateEvent.builder().processInstanceId(processInstance.getId()).type(StateEventType.PROCESS_TIMEOUT).build();
        this.workflowExecuteThreadPool.submitStateEvent(stateEvent);
    }
}

