/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.runner;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.remote.command.HostUpdateCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.event.StateEventHandleError;
import org.apache.dolphinscheduler.server.master.event.StateEventHandleException;
import org.apache.dolphinscheduler.server.master.event.StateEventHandler;
import org.apache.dolphinscheduler.server.master.event.StateEventHandlerManager;
import org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
import org.apache.dolphinscheduler.server.master.event.WorkflowStateEvent;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.WorkflowSubmitStatue;
import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.cron.CronUtils;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.model.TaskNode;
import org.apache.dolphinscheduler.service.process.ProcessDag;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
import org.apache.dolphinscheduler.service.utils.DagHelper;
import org.apache.dolphinscheduler.service.utils.LoggerUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;

public class WorkflowExecuteRunnable
implements Callable<WorkflowSubmitStatue> {
    private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteRunnable.class);
    private final ProcessService processService;
    private ProcessInstanceDao processInstanceDao;
    private final ProcessAlertManager processAlertManager;
    private final NettyExecutorManager nettyExecutorManager;
    private final ProcessInstance processInstance;
    private ProcessDefinition processDefinition;
    private DAG<String, TaskNode, TaskNodeRelation> dag;
    private String key;
    private WorkflowRunnableStatus workflowRunnableStatus = WorkflowRunnableStatus.CREATED;
    private boolean taskFailedSubmit = false;
    private final Map<Integer, TaskInstance> taskInstanceMap = new ConcurrentHashMap<Integer, TaskInstance>();
    private final Map<Long, ITaskProcessor> activeTaskProcessorMaps = new ConcurrentHashMap<Long, ITaskProcessor>();
    private final Map<Long, Integer> validTaskMap = new ConcurrentHashMap<Long, Integer>();
    private final Map<Long, Integer> errorTaskMap = new ConcurrentHashMap<Long, Integer>();
    private final Map<Long, Integer> completeTaskMap = new ConcurrentHashMap<Long, Integer>();
    private final Set<Long> dependFailedTaskSet = Sets.newConcurrentHashSet();
    private final Map<Long, TaskNode> forbiddenTaskMap = new ConcurrentHashMap<Long, TaskNode>();
    private final Map<String, TaskNode> skipTaskNodeMap = new ConcurrentHashMap<String, TaskNode>();
    private List<Date> complementListDate = Lists.newLinkedList();
    private final ConcurrentLinkedQueue<StateEvent> stateEvents = new ConcurrentLinkedQueue();
    private final PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue();
    private final Map<Long, TaskInstance> waitToRetryTaskInstanceMap = new ConcurrentHashMap<Long, TaskInstance>();
    private final StateWheelExecuteThread stateWheelExecuteThread;
    private final CuringParamsService curingParamsService;
    private final String masterAddress;

    public WorkflowExecuteRunnable(@NonNull ProcessInstance processInstance, @NonNull ProcessService processService, @NonNull ProcessInstanceDao processInstanceDao, @NonNull NettyExecutorManager nettyExecutorManager, @NonNull ProcessAlertManager processAlertManager, @NonNull MasterConfig masterConfig, @NonNull StateWheelExecuteThread stateWheelExecuteThread, @NonNull CuringParamsService curingParamsService) {
        if (processInstance == null) {
            throw new NullPointerException("processInstance is marked non-null but is null");
        }
        if (processService == null) {
            throw new NullPointerException("processService is marked non-null but is null");
        }
        if (processInstanceDao == null) {
            throw new NullPointerException("processInstanceDao is marked non-null but is null");
        }
        if (nettyExecutorManager == null) {
            throw new NullPointerException("nettyExecutorManager is marked non-null but is null");
        }
        if (processAlertManager == null) {
            throw new NullPointerException("processAlertManager is marked non-null but is null");
        }
        if (masterConfig == null) {
            throw new NullPointerException("masterConfig is marked non-null but is null");
        }
        if (stateWheelExecuteThread == null) {
            throw new NullPointerException("stateWheelExecuteThread is marked non-null but is null");
        }
        if (curingParamsService == null) {
            throw new NullPointerException("curingParamsService is marked non-null but is null");
        }
        this.processService = processService;
        this.processInstanceDao = processInstanceDao;
        this.processInstance = processInstance;
        this.nettyExecutorManager = nettyExecutorManager;
        this.processAlertManager = processAlertManager;
        this.stateWheelExecuteThread = stateWheelExecuteThread;
        this.curingParamsService = curingParamsService;
        this.masterAddress = NetUtils.getAddr((int)masterConfig.getListenPort());
        TaskMetrics.registerTaskPrepared(() -> ((PeerTaskInstancePriorityQueue)this.readyToSubmitTaskQueue).size());
    }

    public boolean isStart() {
        return WorkflowRunnableStatus.STARTED == this.workflowRunnableStatus;
    }

    public void handleEvents() {
        if (!this.isStart()) {
            logger.info("The workflow instance is not started, will not handle its state event, current state event size: {}", this.stateEvents);
            return;
        }
        StateEvent stateEvent = null;
        while (!this.stateEvents.isEmpty()) {
            try {
                stateEvent = this.stateEvents.peek();
                LoggerUtils.setWorkflowAndTaskInstanceIDMDC((Integer)stateEvent.getProcessInstanceId(), (Integer)stateEvent.getTaskInstanceId());
                this.checkProcessInstance(stateEvent);
                StateEventHandler stateEventHandler = StateEventHandlerManager.getStateEventHandler(stateEvent.getType()).orElseThrow(() -> new StateEventHandleError("Cannot find handler for the given state event"));
                logger.info("Begin to handle state event, {}", (Object)stateEvent);
                if (!stateEventHandler.handleStateEvent(this, stateEvent)) continue;
                this.stateEvents.remove(stateEvent);
            }
            catch (StateEventHandleError stateEventHandleError) {
                logger.error("State event handle error, will remove this event: {}", (Object)stateEvent, (Object)stateEventHandleError);
                this.stateEvents.remove(stateEvent);
                ThreadUtils.sleep((long)1000L);
            }
            catch (StateEventHandleException stateEventHandleException) {
                logger.error("State event handle error, will retry this event: {}", (Object)stateEvent, (Object)stateEventHandleException);
                ThreadUtils.sleep((long)1000L);
            }
            catch (Exception e) {
                logger.error("State event handle error, get a unknown exception, will retry this event: {}", (Object)stateEvent, (Object)e);
                ThreadUtils.sleep((long)1000L);
            }
            finally {
                LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
            }
        }
    }

    public String getKey() {
        if (StringUtils.isNotEmpty((CharSequence)this.key) || this.processDefinition == null) {
            return this.key;
        }
        this.key = String.format("%d_%d_%d", this.processDefinition.getCode(), this.processDefinition.getVersion(), this.processInstance.getId());
        return this.key;
    }

    public boolean addStateEvent(StateEvent stateEvent) {
        if (this.processInstance.getId().intValue() != stateEvent.getProcessInstanceId()) {
            logger.info("state event would be abounded :{}", (Object)stateEvent);
            return false;
        }
        this.stateEvents.add(stateEvent);
        return true;
    }

    public int eventSize() {
        return this.stateEvents.size();
    }

    public ProcessInstance getProcessInstance() {
        return this.processInstance;
    }

    public boolean checkForceStartAndWakeUp(StateEvent stateEvent) {
        boolean acquireTaskGroup;
        TaskGroupQueue taskGroupQueue = this.processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId().intValue());
        if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) {
            TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
            ITaskProcessor taskProcessor = this.activeTaskProcessorMaps.get(taskInstance.getTaskCode());
            taskProcessor.action(TaskAction.DISPATCH);
            this.processService.updateTaskGroupQueueStatus(Integer.valueOf(taskGroupQueue.getTaskId()), TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode());
            return true;
        }
        if (taskGroupQueue.getInQueue() == Flag.YES.getCode() && (acquireTaskGroup = this.processService.robTaskGroupResource(taskGroupQueue))) {
            TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
            ITaskProcessor taskProcessor = this.activeTaskProcessorMaps.get(taskInstance.getTaskCode());
            taskProcessor.action(TaskAction.DISPATCH);
            return true;
        }
        return false;
    }

    public void processTimeout() {
        ProjectUser projectUser = this.processService.queryProjectWithUserByProcessInstanceId(this.processInstance.getId().intValue());
        this.processAlertManager.sendProcessTimeoutAlert(this.processInstance, projectUser);
    }

    public void taskTimeout(TaskInstance taskInstance) {
        ProjectUser projectUser = this.processService.queryProjectWithUserByProcessInstanceId(this.processInstance.getId().intValue());
        this.processAlertManager.sendTaskTimeoutAlert(this.processInstance, taskInstance, projectUser);
    }

    public void taskFinished(TaskInstance taskInstance) throws StateEventHandleException {
        logger.info("TaskInstance finished task code:{} state:{}", (Object)taskInstance.getTaskCode(), (Object)taskInstance.getState());
        try {
            this.activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
            this.stateWheelExecuteThread.removeTask4TimeoutCheck(this.processInstance, taskInstance);
            this.stateWheelExecuteThread.removeTask4RetryCheck(this.processInstance, taskInstance);
            this.stateWheelExecuteThread.removeTask4StateCheck(this.processInstance, taskInstance);
            if (taskInstance.getState().isSuccess()) {
                this.completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
                this.processInstance.setVarPool(taskInstance.getVarPool());
                this.processInstanceDao.upsertProcessInstance(this.processInstance);
                if (!this.processInstance.isBlocked()) {
                    this.submitPostNode(Long.toString(taskInstance.getTaskCode()));
                }
            } else if (taskInstance.taskCanRetry() && !this.processInstance.getState().isReadyStop()) {
                logger.info("Retry taskInstance taskInstance state: {}", (Object)taskInstance.getState());
                this.retryTaskInstance(taskInstance);
            } else if (taskInstance.getState().isFailure()) {
                this.completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
                this.errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
                if (this.processInstance.getFailureStrategy() == FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode((String)Long.toString(taskInstance.getTaskCode()), this.dag)) {
                    this.submitPostNode(Long.toString(taskInstance.getTaskCode()));
                } else if (this.processInstance.getFailureStrategy() == FailureStrategy.END) {
                    this.killAllTasks();
                }
            } else if (taskInstance.getState().isFinished()) {
                this.completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
            }
            logger.info("TaskInstance finished will try to update the workflow instance state, task code:{} state:{}", (Object)taskInstance.getTaskCode(), (Object)taskInstance.getState());
            this.updateProcessInstanceState();
        }
        catch (Exception ex) {
            logger.error("Task finish failed, get a exception, will remove this taskInstance from completeTaskMap", (Throwable)ex);
            this.completeTaskMap.remove(taskInstance.getTaskCode());
            throw ex;
        }
    }

    public void releaseTaskGroup(TaskInstance taskInstance) {
        TaskInstance nextTaskInstance;
        logger.info("Release task group");
        if (taskInstance.getTaskGroupId() > 0 && (nextTaskInstance = this.processService.releaseTaskGroup(taskInstance)) != null) {
            if (nextTaskInstance.getProcessInstanceId() == taskInstance.getProcessInstanceId()) {
                TaskStateEvent nextEvent = TaskStateEvent.builder().processInstanceId(this.processInstance.getId()).taskInstanceId(nextTaskInstance.getId()).type(StateEventType.WAIT_TASK_GROUP).build();
                this.stateEvents.add(nextEvent);
            } else {
                ProcessInstance processInstance = this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId());
                this.processService.sendStartTask2Master(processInstance, nextTaskInstance.getId().intValue(), org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST);
            }
        }
    }

    private void retryTaskInstance(TaskInstance taskInstance) throws StateEventHandleException {
        if (!taskInstance.taskCanRetry()) {
            return;
        }
        TaskInstance newTaskInstance = this.cloneRetryTaskInstance(taskInstance);
        if (newTaskInstance == null) {
            logger.error("Retry task fail because new taskInstance is null, task code:{}, task id:{}", (Object)taskInstance.getTaskCode(), (Object)taskInstance.getId());
            return;
        }
        this.waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance);
        if (!taskInstance.retryTaskIntervalOverTime()) {
            logger.info("Failure task will be submitted, process id: {}, task instance code: {}, state: {}, retry times: {} / {}, interval: {}", new Object[]{this.processInstance.getId(), newTaskInstance.getTaskCode(), newTaskInstance.getState(), newTaskInstance.getRetryTimes(), newTaskInstance.getMaxRetryTimes(), newTaskInstance.getRetryInterval()});
            this.stateWheelExecuteThread.addTask4TimeoutCheck(this.processInstance, newTaskInstance);
            this.stateWheelExecuteThread.addTask4RetryCheck(this.processInstance, newTaskInstance);
        } else {
            this.addTaskToStandByList(newTaskInstance);
            this.submitStandByTask();
            this.waitToRetryTaskInstanceMap.remove(newTaskInstance.getTaskCode());
        }
    }

    public void refreshProcessInstance(int processInstanceId) {
        logger.info("process instance update: {}", (Object)processInstanceId);
        ProcessInstance newProcessInstance = this.processService.findProcessInstanceById(processInstanceId);
        BeanUtils.copyProperties((Object)newProcessInstance, (Object)this.processInstance);
        this.processDefinition = this.processService.findProcessDefinition(this.processInstance.getProcessDefinitionCode(), this.processInstance.getProcessDefinitionVersion());
        this.processInstance.setProcessDefinition(this.processDefinition);
    }

    public void refreshTaskInstance(int taskInstanceId) {
        logger.info("task instance update: {} ", (Object)taskInstanceId);
        TaskInstance taskInstance = this.processService.findTaskInstanceById(Integer.valueOf(taskInstanceId));
        if (taskInstance == null) {
            logger.error("can not find task instance, id:{}", (Object)taskInstanceId);
            return;
        }
        this.processService.packageTaskInstance(taskInstance, this.processInstance);
        this.taskInstanceMap.put(taskInstance.getId(), taskInstance);
        this.validTaskMap.remove(taskInstance.getTaskCode());
        if (Flag.YES == taskInstance.getFlag()) {
            this.validTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
        }
    }

    public void checkProcessInstance(StateEvent stateEvent) throws StateEventHandleError {
        if (this.processInstance.getId().intValue() != stateEvent.getProcessInstanceId()) {
            throw new StateEventHandleError("The event doesn't contains process instance id");
        }
    }

    public void checkTaskInstanceByStateEvent(TaskStateEvent stateEvent) throws StateEventHandleError {
        if (stateEvent.getTaskInstanceId() == null || stateEvent.getTaskInstanceId() == 0) {
            throw new StateEventHandleError("The taskInstanceId is 0");
        }
        if (!this.taskInstanceMap.containsKey(stateEvent.getTaskInstanceId())) {
            throw new StateEventHandleError("Cannot find the taskInstance from taskInstanceMap");
        }
    }

    public boolean checkTaskInstanceById(int taskInstanceId) {
        if (this.taskInstanceMap.isEmpty()) {
            return false;
        }
        return this.taskInstanceMap.containsKey(taskInstanceId);
    }

    public Optional<TaskInstance> getTaskInstance(int taskInstanceId) {
        return Optional.ofNullable(this.taskInstanceMap.get(taskInstanceId));
    }

    public Optional<TaskInstance> getTaskInstance(long taskCode) {
        if (this.taskInstanceMap.isEmpty()) {
            return Optional.empty();
        }
        for (TaskInstance taskInstance : this.taskInstanceMap.values()) {
            if (taskInstance.getTaskCode() != taskCode) continue;
            return Optional.of(taskInstance);
        }
        return Optional.empty();
    }

    public Optional<TaskInstance> getActiveTaskInstanceByTaskCode(long taskCode) {
        Integer taskInstanceId = this.validTaskMap.get(taskCode);
        if (taskInstanceId != null) {
            return Optional.ofNullable(this.taskInstanceMap.get(taskInstanceId));
        }
        return Optional.empty();
    }

    public Optional<TaskInstance> getRetryTaskInstanceByTaskCode(long taskCode) {
        if (this.waitToRetryTaskInstanceMap.containsKey(taskCode)) {
            return Optional.ofNullable(this.waitToRetryTaskInstanceMap.get(taskCode));
        }
        return Optional.empty();
    }

    public void processBlock() {
        ProjectUser projectUser = this.processService.queryProjectWithUserByProcessInstanceId(this.processInstance.getId().intValue());
        this.processAlertManager.sendProcessBlockingAlert(this.processInstance, projectUser);
        logger.info("processInstance {} block alert send successful!", (Object)this.processInstance.getId());
    }

    public boolean processComplementData() {
        if (!this.needComplementProcess()) {
            return false;
        }
        if (this.processInstance.getState().isReadyStop() || !this.processInstance.getState().isFinished()) {
            return false;
        }
        Date scheduleDate = this.processInstance.getScheduleTime();
        if (scheduleDate == null) {
            scheduleDate = this.complementListDate.get(0);
        } else if (this.processInstance.getState().isFinished()) {
            this.endProcess();
            if (this.complementListDate.isEmpty()) {
                logger.info("process complement end. process id:{}", (Object)this.processInstance.getId());
                return true;
            }
            int index = this.complementListDate.indexOf(scheduleDate);
            if (index >= this.complementListDate.size() - 1 || !this.processInstance.getState().isSuccess()) {
                logger.info("process complement end. process id:{}", (Object)this.processInstance.getId());
                return true;
            }
            logger.info("process complement continue. process id:{}, schedule time:{} complementListDate:{}", new Object[]{this.processInstance.getId(), this.processInstance.getScheduleTime(), this.complementListDate});
            scheduleDate = this.complementListDate.get(index + 1);
        }
        int create = this.createComplementDataCommand(scheduleDate);
        if (create > 0) {
            logger.info("create complement data command successfully.");
        }
        return true;
    }

    private int createComplementDataCommand(Date scheduleDate) {
        Command command = new Command();
        command.setScheduleTime(scheduleDate);
        command.setCommandType(CommandType.COMPLEMENT_DATA);
        command.setProcessDefinitionCode(this.processInstance.getProcessDefinitionCode().longValue());
        Map cmdParam = JSONUtils.toMap((String)this.processInstance.getCommandParam());
        if (cmdParam.containsKey("StartNodeIdList")) {
            cmdParam.remove("StartNodeIdList");
        }
        if (cmdParam.containsKey("complementScheduleDateList")) {
            cmdParam.replace("complementScheduleDateList", ((String)cmdParam.get("complementScheduleDateList")).substring(((String)cmdParam.get("complementScheduleDateList")).indexOf(",") + 1));
        }
        if (cmdParam.containsKey("complementStartDate")) {
            cmdParam.replace("complementStartDate", DateUtils.format((Date)scheduleDate, (String)"yyyy-MM-dd HH:mm:ss", null));
        }
        command.setCommandParam(JSONUtils.toJsonString((Object)cmdParam));
        command.setTaskDependType(this.processInstance.getTaskDependType());
        command.setFailureStrategy(this.processInstance.getFailureStrategy());
        command.setWarningType(this.processInstance.getWarningType());
        command.setWarningGroupId(this.processInstance.getWarningGroupId());
        command.setStartTime(new Date());
        command.setExecutorId(this.processInstance.getExecutorId());
        command.setUpdateTime(new Date());
        command.setProcessInstancePriority(this.processInstance.getProcessInstancePriority());
        command.setWorkerGroup(this.processInstance.getWorkerGroup());
        command.setEnvironmentCode(this.processInstance.getEnvironmentCode());
        command.setDryRun(this.processInstance.getDryRun());
        command.setProcessInstanceId(0);
        command.setProcessDefinitionVersion(this.processInstance.getProcessDefinitionVersion());
        return this.processService.createCommand(command);
    }

    private boolean needComplementProcess() {
        return this.processInstance.isComplementData() && Flag.NO == this.processInstance.getIsSubProcess();
    }

    @Override
    public WorkflowSubmitStatue call() {
        if (this.isStart()) {
            logger.warn("[WorkflowInstance-{}] The workflow has already been started", (Object)this.processInstance.getId());
            return WorkflowSubmitStatue.DUPLICATED_SUBMITTED;
        }
        try {
            LoggerUtils.setWorkflowInstanceIdMDC((Integer)this.processInstance.getId());
            if (this.workflowRunnableStatus == WorkflowRunnableStatus.CREATED) {
                this.buildFlowDag();
                this.workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_DAG;
                logger.info("workflowStatue changed to :{}", (Object)this.workflowRunnableStatus);
            }
            if (this.workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_DAG) {
                this.initTaskQueue();
                this.workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_QUEUE;
                logger.info("workflowStatue changed to :{}", (Object)this.workflowRunnableStatus);
            }
            if (this.workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_QUEUE) {
                this.submitPostNode(null);
                this.workflowRunnableStatus = WorkflowRunnableStatus.STARTED;
                logger.info("workflowStatue changed to :{}", (Object)this.workflowRunnableStatus);
            }
            WorkflowSubmitStatue workflowSubmitStatue = WorkflowSubmitStatue.SUCCESS;
            return workflowSubmitStatue;
        }
        catch (Exception e) {
            logger.error("Start workflow error", (Throwable)e);
            WorkflowSubmitStatue workflowSubmitStatue = WorkflowSubmitStatue.FAILED;
            return workflowSubmitStatue;
        }
        finally {
            LoggerUtils.removeWorkflowInstanceIdMDC();
        }
    }

    public void endProcess() {
        this.stateEvents.clear();
        if (this.processDefinition.getExecutionType().typeIsSerialWait() || this.processDefinition.getExecutionType().typeIsSerialPriority()) {
            this.checkSerialProcess(this.processDefinition);
        }
        ProjectUser projectUser = this.processService.queryProjectWithUserByProcessInstanceId(this.processInstance.getId().intValue());
        if (this.processAlertManager.isNeedToSendWarning(this.processInstance)) {
            this.processAlertManager.sendAlertProcessInstance(this.processInstance, this.getValidTaskList(), projectUser);
        }
        if (this.processInstance.getState().isSuccess()) {
            this.processAlertManager.closeAlert(this.processInstance);
        }
        if (this.checkTaskQueue()) {
            this.processService.releaseAllTaskGroup(this.processInstance.getId().intValue());
        }
    }

    public void checkSerialProcess(ProcessDefinition processDefinition) {
        ProcessInstance nextProcessInstance;
        int nextInstanceId = this.processInstance.getNextProcessInstanceId();
        if (nextInstanceId == 0) {
            nextProcessInstance = this.processService.loadNextProcess4Serial(this.processInstance.getProcessDefinition().getCode(), WorkflowExecutionStatus.SERIAL_WAIT.getCode(), this.processInstance.getId().intValue());
            if (nextProcessInstance == null) {
                return;
            }
            ProcessInstance nextReadyStopProcessInstance = this.processService.loadNextProcess4Serial(this.processInstance.getProcessDefinition().getCode(), WorkflowExecutionStatus.READY_STOP.getCode(), this.processInstance.getId().intValue());
            if (processDefinition.getExecutionType().typeIsSerialPriority() && nextReadyStopProcessInstance != null) {
                return;
            }
            nextInstanceId = nextProcessInstance.getId();
        }
        if ((nextProcessInstance = this.processService.findProcessInstanceById(nextInstanceId)).getState().isFinished() || nextProcessInstance.getState().isRunning()) {
            return;
        }
        HashMap<String, Integer> cmdParam = new HashMap<String, Integer>();
        cmdParam.put("ProcessInstanceId", nextInstanceId);
        Command command = new Command();
        command.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
        command.setProcessInstanceId(nextProcessInstance.getId().intValue());
        command.setProcessDefinitionCode(processDefinition.getCode());
        command.setProcessDefinitionVersion(processDefinition.getVersion());
        command.setCommandParam(JSONUtils.toJsonString(cmdParam));
        this.processService.createCommand(command);
    }

    private void buildFlowDag() throws Exception {
        this.processDefinition = this.processService.findProcessDefinition(this.processInstance.getProcessDefinitionCode(), this.processInstance.getProcessDefinitionVersion());
        this.processInstance.setProcessDefinition(this.processDefinition);
        List<TaskInstance> recoverNodeList = this.getRecoverTaskInstanceList(this.processInstance.getCommandParam());
        List processTaskRelations = this.processService.findRelationByCode(this.processDefinition.getCode(), this.processDefinition.getVersion());
        List taskDefinitionLogs = this.processService.getTaskDefineLogListByRelation(processTaskRelations);
        List taskNodeList = this.processService.transformTask(processTaskRelations, taskDefinitionLogs);
        this.forbiddenTaskMap.clear();
        taskNodeList.forEach(taskNode -> {
            if (taskNode.isForbidden()) {
                this.forbiddenTaskMap.put(taskNode.getCode(), (TaskNode)taskNode);
            }
        });
        List<String> recoveryNodeCodeList = this.getRecoveryNodeCodeList(recoverNodeList);
        List<String> startNodeNameList = this.parseStartNodeName(this.processInstance.getCommandParam());
        ProcessDag processDag = this.generateFlowDag(taskNodeList, startNodeNameList, recoveryNodeCodeList, this.processInstance.getTaskDependType());
        if (processDag == null) {
            logger.error("ProcessDag is null");
            return;
        }
        this.dag = DagHelper.buildDagGraph((ProcessDag)processDag);
        logger.info("Build dag success, dag: {}", this.dag);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initTaskQueue() throws StateEventHandleException, CronParseException {
        Map cmdParam;
        this.taskFailedSubmit = false;
        this.activeTaskProcessorMaps.clear();
        this.dependFailedTaskSet.clear();
        this.completeTaskMap.clear();
        this.errorTaskMap.clear();
        if (!this.isNewProcessInstance()) {
            logger.info("The workflowInstance is not a newly running instance, runtimes: {}, recover flag: {}", (Object)this.processInstance.getRunTimes(), (Object)this.processInstance.getRecovery());
            List validTaskInstanceList = this.processService.findValidTaskListByProcessId(this.processInstance.getId());
            for (TaskInstance task : validTaskInstanceList) {
                try {
                    LoggerUtils.setWorkflowAndTaskInstanceIDMDC((Integer)task.getProcessInstanceId(), (Integer)task.getId());
                    logger.info("Check the taskInstance from a exist workflowInstance, existTaskInstanceCode: {}, taskInstanceStatus: {}", (Object)task.getTaskCode(), (Object)task.getState());
                    if (this.validTaskMap.containsKey(task.getTaskCode())) {
                        logger.warn("Have same taskCode taskInstance when init task queue, need to check taskExecutionStatus, taskCode:{}", (Object)task.getTaskCode());
                        int oldTaskInstanceId = this.validTaskMap.get(task.getTaskCode());
                        TaskInstance oldTaskInstance = this.taskInstanceMap.get(oldTaskInstanceId);
                        if (!oldTaskInstance.getState().isFinished() && task.getState().isFinished()) {
                            task.setFlag(Flag.NO);
                            this.processService.updateTaskInstance(task);
                            continue;
                        }
                    }
                    this.validTaskMap.put(task.getTaskCode(), task.getId());
                    this.taskInstanceMap.put(task.getId(), task);
                    if (task.isTaskComplete()) {
                        logger.info("TaskInstance is already complete.");
                        this.completeTaskMap.put(task.getTaskCode(), task.getId());
                        continue;
                    }
                    if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode((String)Long.toString(task.getTaskCode()), this.dag)) continue;
                    if (task.taskCanRetry()) {
                        if (task.getState().isNeedFaultTolerance()) {
                            logger.info("TaskInstance needs fault tolerance, will be added to standby list.");
                            task.setFlag(Flag.NO);
                            this.processService.updateTaskInstance(task);
                            TaskInstance tolerantTaskInstance = this.cloneTolerantTaskInstance(task);
                            this.addTaskToStandByList(tolerantTaskInstance);
                            continue;
                        }
                        logger.info("Retry taskInstance, taskState: {}", (Object)task.getState());
                        this.retryTaskInstance(task);
                        continue;
                    }
                    if (!task.getState().isFailure()) continue;
                    this.errorTaskMap.put(task.getTaskCode(), task.getId());
                }
                finally {
                    LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
                }
            }
        } else {
            logger.info("The current workflowInstance is a newly running workflowInstance");
        }
        if (this.processInstance.isComplementData() && this.complementListDate.isEmpty() && (cmdParam = JSONUtils.toMap((String)this.processInstance.getCommandParam())) != null) {
            this.setGlobalParamIfCommanded(this.processDefinition, cmdParam);
            Date start = null;
            Date end = null;
            if (cmdParam.containsKey("complementStartDate") && cmdParam.containsKey("complementEndDate")) {
                start = DateUtils.stringToDate((String)((String)cmdParam.get("complementStartDate")));
                end = DateUtils.stringToDate((String)((String)cmdParam.get("complementEndDate")));
            }
            if (this.complementListDate.isEmpty() && this.needComplementProcess()) {
                if (start != null && end != null) {
                    List schedules = this.processService.queryReleaseSchedulerListByProcessDefinitionCode(this.processInstance.getProcessDefinitionCode().longValue());
                    this.complementListDate = CronUtils.getSelfFireDateList((Date)start, (Date)end, (List)schedules);
                }
                if (cmdParam.containsKey("complementScheduleDateList")) {
                    this.complementListDate = CronUtils.getSelfScheduleDateList((Map)cmdParam);
                }
                logger.info(" process definition code:{} complement data: {}", (Object)this.processInstance.getProcessDefinitionCode(), this.complementListDate);
                if (!this.complementListDate.isEmpty() && Flag.NO == this.processInstance.getIsSubProcess()) {
                    this.processInstance.setScheduleTime(this.complementListDate.get(0));
                    String globalParams = this.curingParamsService.curingGlobalParams(this.processInstance.getId(), this.processDefinition.getGlobalParamMap(), this.processDefinition.getGlobalParamList(), CommandType.COMPLEMENT_DATA, this.processInstance.getScheduleTime(), (String)cmdParam.get("schedule_timezone"));
                    this.processInstance.setGlobalParams(globalParams);
                    this.processInstanceDao.updateProcessInstance(this.processInstance);
                }
            }
        }
        logger.info("Initialize task queue, dependFailedTaskSet: {}, completeTaskMap: {}, errorTaskMap: {}", new Object[]{this.dependFailedTaskSet, this.completeTaskMap, this.errorTaskMap});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Optional<TaskInstance> submitTaskExec(TaskInstance taskInstance) {
        try {
            boolean acquireTaskGroup;
            boolean submit;
            this.processService.packageTaskInstance(taskInstance, this.processInstance);
            ITaskProcessor taskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
            taskProcessor.init(taskInstance, this.processInstance);
            if (taskInstance.getState().isRunning() && taskProcessor.getType().equalsIgnoreCase("common")) {
                this.notifyProcessHostUpdate(taskInstance);
            }
            if (!(submit = taskProcessor.action(TaskAction.SUBMIT))) {
                logger.error("Submit standby task failed!, taskCode: {}, taskName: {}", (Object)taskInstance.getTaskCode(), (Object)taskInstance.getName());
                Optional<TaskInstance> optional = Optional.empty();
                return optional;
            }
            LoggerUtils.setWorkflowAndTaskInstanceIDMDC((Integer)taskInstance.getProcessInstanceId(), (Integer)taskInstance.getId());
            if (this.validTaskMap.containsKey(taskInstance.getTaskCode())) {
                int oldTaskInstanceId = this.validTaskMap.get(taskInstance.getTaskCode());
                if (taskInstance.getId() != oldTaskInstanceId) {
                    TaskInstance oldTaskInstance = this.taskInstanceMap.get(oldTaskInstanceId);
                    oldTaskInstance.setFlag(Flag.NO);
                    this.processService.updateTaskInstance(oldTaskInstance);
                    this.validTaskMap.remove(taskInstance.getTaskCode());
                    this.activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
                }
            }
            this.validTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
            this.taskInstanceMap.put(taskInstance.getId(), taskInstance);
            this.activeTaskProcessorMaps.put(taskInstance.getTaskCode(), taskProcessor);
            int taskGroupId = taskInstance.getTaskGroupId();
            if (taskGroupId > 0 && !(acquireTaskGroup = this.processService.acquireTaskGroup(taskInstance.getId().intValue(), taskInstance.getName(), taskGroupId, taskInstance.getProcessInstanceId(), taskInstance.getTaskGroupPriority()))) {
                logger.info("Submitted task will not be dispatch right now because the first time to try to acquire task group failed, taskInstanceName: {}, taskGroupId: {}", (Object)taskInstance.getName(), (Object)taskGroupId);
                Optional<TaskInstance> optional = Optional.of(taskInstance);
                return optional;
            }
            boolean dispatchSuccess = taskProcessor.action(TaskAction.DISPATCH);
            if (!dispatchSuccess) {
                logger.error("Dispatch standby process {} task {} failed", (Object)this.processInstance.getName(), (Object)taskInstance.getName());
                Optional<TaskInstance> optional = Optional.empty();
                return optional;
            }
            taskProcessor.action(TaskAction.RUN);
            this.stateWheelExecuteThread.addTask4TimeoutCheck(this.processInstance, taskInstance);
            this.stateWheelExecuteThread.addTask4StateCheck(this.processInstance, taskInstance);
            if (taskProcessor.taskInstance().getState().isFinished()) {
                if (this.processInstance.isBlocked()) {
                    TaskStateEvent processBlockEvent = TaskStateEvent.builder().processInstanceId(this.processInstance.getId()).taskInstanceId(taskInstance.getId()).status(taskProcessor.taskInstance().getState()).type(StateEventType.PROCESS_BLOCKED).build();
                    this.stateEvents.add(processBlockEvent);
                }
                TaskStateEvent taskStateChangeEvent = TaskStateEvent.builder().processInstanceId(this.processInstance.getId()).taskInstanceId(taskInstance.getId()).status(taskProcessor.taskInstance().getState()).type(StateEventType.TASK_STATE_CHANGE).build();
                this.stateEvents.add(taskStateChangeEvent);
            }
            Optional<TaskInstance> optional = Optional.of(taskInstance);
            return optional;
        }
        catch (Exception e) {
            logger.error("Submit standby task {} error, taskCode: {}", new Object[]{taskInstance.getName(), taskInstance.getTaskCode(), e});
            Optional<TaskInstance> optional = Optional.empty();
            return optional;
        }
        finally {
            LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
        }
    }

    private void notifyProcessHostUpdate(TaskInstance taskInstance) {
        if (StringUtils.isEmpty((CharSequence)taskInstance.getHost())) {
            return;
        }
        try {
            HostUpdateCommand hostUpdateCommand = new HostUpdateCommand();
            hostUpdateCommand.setProcessHost(this.masterAddress);
            hostUpdateCommand.setTaskInstanceId(taskInstance.getId().intValue());
            Host host = new Host(taskInstance.getHost());
            this.nettyExecutorManager.doExecute(host, hostUpdateCommand.convert2Command());
        }
        catch (Exception e) {
            logger.error("notify process host update", (Throwable)e);
        }
    }

    private TaskInstance findTaskIfExists(Long taskCode, int taskVersion) {
        List<TaskInstance> validTaskInstanceList = this.getValidTaskList();
        for (TaskInstance taskInstance : validTaskInstanceList) {
            if (taskInstance.getTaskCode() != taskCode.longValue() || taskInstance.getTaskDefinitionVersion() != taskVersion) continue;
            return taskInstance;
        }
        return null;
    }

    private TaskInstance createTaskInstance(ProcessInstance processInstance, TaskNode taskNode) {
        TaskInstance taskInstance = this.findTaskIfExists(taskNode.getCode(), taskNode.getVersion());
        if (taskInstance != null) {
            return taskInstance;
        }
        return this.newTaskInstance(processInstance, taskNode);
    }

    public TaskInstance cloneRetryTaskInstance(TaskInstance taskInstance) {
        TaskNode taskNode = (TaskNode)this.dag.getNode((Object)Long.toString(taskInstance.getTaskCode()));
        if (taskNode == null) {
            logger.error("Clone retry taskInstance error because taskNode is null, taskCode:{}", (Object)taskInstance.getTaskCode());
            return null;
        }
        TaskInstance newTaskInstance = this.newTaskInstance(this.processInstance, taskNode);
        newTaskInstance.setTaskDefine(taskInstance.getTaskDefine());
        newTaskInstance.setProcessDefine(taskInstance.getProcessDefine());
        newTaskInstance.setProcessInstance(this.processInstance);
        newTaskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1);
        newTaskInstance.setState(taskInstance.getState());
        newTaskInstance.setEndTime(taskInstance.getEndTime());
        if (taskInstance.getState() == TaskExecutionStatus.NEED_FAULT_TOLERANCE) {
            newTaskInstance.setAppLink(taskInstance.getAppLink());
        }
        return newTaskInstance;
    }

    public TaskInstance cloneTolerantTaskInstance(TaskInstance taskInstance) {
        TaskNode taskNode = (TaskNode)this.dag.getNode((Object)Long.toString(taskInstance.getTaskCode()));
        if (taskNode == null) {
            logger.error("Clone tolerant taskInstance error because taskNode is null, taskCode:{}", (Object)taskInstance.getTaskCode());
            return null;
        }
        TaskInstance newTaskInstance = this.newTaskInstance(this.processInstance, taskNode);
        newTaskInstance.setTaskDefine(taskInstance.getTaskDefine());
        newTaskInstance.setProcessDefine(taskInstance.getProcessDefine());
        newTaskInstance.setProcessInstance(this.processInstance);
        newTaskInstance.setRetryTimes(taskInstance.getRetryTimes());
        newTaskInstance.setState(taskInstance.getState());
        newTaskInstance.setAppLink(taskInstance.getAppLink());
        return newTaskInstance;
    }

    public TaskInstance newTaskInstance(ProcessInstance processInstance, TaskNode taskNode) {
        Environment environment;
        Long taskEnvironmentCode;
        TaskInstance taskInstance = new TaskInstance();
        taskInstance.setTaskCode(taskNode.getCode());
        taskInstance.setTaskDefinitionVersion(taskNode.getVersion());
        taskInstance.setName(taskNode.getName());
        taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);
        taskInstance.setProcessInstanceId(processInstance.getId().intValue());
        taskInstance.setTaskType(taskNode.getType().toUpperCase());
        taskInstance.setAlertFlag(Flag.NO);
        taskInstance.setStartTime(null);
        taskInstance.setFlag(Flag.YES);
        taskInstance.setDryRun(processInstance.getDryRun());
        taskInstance.setRetryTimes(0);
        taskInstance.setMaxRetryTimes(taskNode.getMaxRetryTimes());
        taskInstance.setRetryInterval(taskNode.getRetryInterval());
        taskInstance.setTaskParams(taskNode.getTaskParams());
        taskInstance.setTaskGroupId(taskNode.getTaskGroupId());
        taskInstance.setTaskGroupPriority(taskNode.getTaskGroupPriority());
        taskInstance.setCpuQuota(taskNode.getCpuQuota());
        taskInstance.setMemoryMax(taskNode.getMemoryMax());
        if (taskNode.getTaskInstancePriority() == null) {
            taskInstance.setTaskInstancePriority(Priority.MEDIUM);
        } else {
            taskInstance.setTaskInstancePriority(taskNode.getTaskInstancePriority());
        }
        String processWorkerGroup = processInstance.getWorkerGroup();
        processWorkerGroup = StringUtils.isBlank((CharSequence)processWorkerGroup) ? "default" : processWorkerGroup;
        String taskWorkerGroup = StringUtils.isBlank((CharSequence)taskNode.getWorkerGroup()) ? processWorkerGroup : taskNode.getWorkerGroup();
        Long processEnvironmentCode = Objects.isNull(processInstance.getEnvironmentCode()) ? -1L : processInstance.getEnvironmentCode();
        Long l = taskEnvironmentCode = Objects.isNull(taskNode.getEnvironmentCode()) ? processEnvironmentCode : taskNode.getEnvironmentCode();
        if (!processWorkerGroup.equals("default") && taskWorkerGroup.equals("default")) {
            taskInstance.setWorkerGroup(processWorkerGroup);
            taskInstance.setEnvironmentCode(processEnvironmentCode);
        } else {
            taskInstance.setWorkerGroup(taskWorkerGroup);
            taskInstance.setEnvironmentCode(taskEnvironmentCode);
        }
        if (!taskInstance.getEnvironmentCode().equals(-1L) && Objects.nonNull(environment = this.processService.findEnvironmentByCode(taskInstance.getEnvironmentCode())) && StringUtils.isNotEmpty((CharSequence)environment.getConfig())) {
            taskInstance.setEnvironmentConfig(environment.getConfig());
        }
        taskInstance.setDelayTime(taskNode.getDelayTime());
        taskInstance.setTaskExecuteType(taskNode.getTaskExecuteType());
        return taskInstance;
    }

    public void getPreVarPool(TaskInstance taskInstance, Set<String> preTask) {
        HashMap<String, Property> allProperty = new HashMap<String, Property>();
        HashMap<String, TaskInstance> allTaskInstance = new HashMap<String, TaskInstance>();
        if (CollectionUtils.isNotEmpty(preTask)) {
            for (String preTaskCode : preTask) {
                String preVarPool;
                TaskInstance preTaskInstance;
                Integer taskId = this.completeTaskMap.get(Long.parseLong(preTaskCode));
                if (taskId == null || (preTaskInstance = this.taskInstanceMap.get(taskId)) == null || !StringUtils.isNotEmpty((CharSequence)(preVarPool = preTaskInstance.getVarPool()))) continue;
                List properties = JSONUtils.toList((String)preVarPool, Property.class);
                for (Property info : properties) {
                    this.setVarPoolValue(allProperty, allTaskInstance, preTaskInstance, info);
                }
            }
            if (allProperty.size() > 0) {
                taskInstance.setVarPool(JSONUtils.toJsonString(allProperty.values()));
            }
        } else if (StringUtils.isNotEmpty((CharSequence)this.processInstance.getVarPool())) {
            taskInstance.setVarPool(this.processInstance.getVarPool());
        }
    }

    public Collection<TaskInstance> getAllTaskInstances() {
        return this.taskInstanceMap.values();
    }

    private void setVarPoolValue(Map<String, Property> allProperty, Map<String, TaskInstance> allTaskInstance, TaskInstance preTaskInstance, Property thisProperty) {
        thisProperty.setDirect(Direct.IN);
        String proName = thisProperty.getProp();
        if (allProperty.containsKey(proName)) {
            Property otherPro = allProperty.get(proName);
            if (StringUtils.isEmpty((CharSequence)thisProperty.getValue())) {
                allProperty.put(proName, otherPro);
            } else if (StringUtils.isNotEmpty((CharSequence)otherPro.getValue())) {
                TaskInstance otherTask = allTaskInstance.get(proName);
                if (otherTask.getEndTime().getTime() > preTaskInstance.getEndTime().getTime()) {
                    allProperty.put(proName, thisProperty);
                    allTaskInstance.put(proName, preTaskInstance);
                } else {
                    allProperty.put(proName, otherPro);
                }
            } else {
                allProperty.put(proName, thisProperty);
                allTaskInstance.put(proName, preTaskInstance);
            }
        } else {
            allProperty.put(proName, thisProperty);
            allTaskInstance.put(proName, preTaskInstance);
        }
    }

    private Map<String, TaskInstance> getCompleteTaskInstanceMap() {
        HashMap<String, TaskInstance> completeTaskInstanceMap = new HashMap<String, TaskInstance>();
        for (Map.Entry<Long, Integer> entry : this.completeTaskMap.entrySet()) {
            Long taskConde = entry.getKey();
            Integer taskInstanceId = entry.getValue();
            TaskInstance taskInstance = this.taskInstanceMap.get(taskInstanceId);
            if (taskInstance == null) {
                logger.warn("Cannot find the taskInstance from taskInstanceMap, taskInstanceId: {}, taskConde: {}", (Object)taskInstanceId, (Object)taskConde);
                continue;
            }
            completeTaskInstanceMap.put(Long.toString(taskInstance.getTaskCode()), taskInstance);
        }
        return completeTaskInstanceMap;
    }

    private List<TaskInstance> getValidTaskList() {
        ArrayList<TaskInstance> validTaskInstanceList = new ArrayList<TaskInstance>();
        for (Integer taskInstanceId : this.validTaskMap.values()) {
            validTaskInstanceList.add(this.taskInstanceMap.get(taskInstanceId));
        }
        return validTaskInstanceList;
    }

    private void submitPostNode(String parentNodeCode) throws StateEventHandleException {
        TaskInstance endTaskInstance;
        String taskInstanceVarPool;
        Set submitTaskNodeList = DagHelper.parsePostNodes((String)parentNodeCode, this.skipTaskNodeMap, this.dag, this.getCompleteTaskInstanceMap());
        ArrayList<TaskInstance> taskInstances = new ArrayList<TaskInstance>();
        for (String taskNode : submitTaskNodeList) {
            TaskNode taskNodeObject = (TaskNode)this.dag.getNode((Object)taskNode);
            Optional<TaskInstance> existTaskInstanceOptional = this.getTaskInstance(taskNodeObject.getCode());
            if (existTaskInstanceOptional.isPresent()) {
                taskInstances.add(existTaskInstanceOptional.get());
                continue;
            }
            TaskInstance task = this.createTaskInstance(this.processInstance, taskNodeObject);
            taskInstances.add(task);
        }
        if (StringUtils.isNotEmpty((CharSequence)parentNodeCode) && this.dag.getEndNode().contains(parentNodeCode) && StringUtils.isNotEmpty((CharSequence)(taskInstanceVarPool = (endTaskInstance = this.taskInstanceMap.get(this.completeTaskMap.get(NumberUtils.toLong((String)parentNodeCode)))).getVarPool()))) {
            HashSet taskProperties = new HashSet(JSONUtils.toList((String)taskInstanceVarPool, Property.class));
            String processInstanceVarPool = this.processInstance.getVarPool();
            if (StringUtils.isNotEmpty((CharSequence)processInstanceVarPool)) {
                HashSet properties = new HashSet(JSONUtils.toList((String)processInstanceVarPool, Property.class));
                properties.addAll(taskProperties);
                this.processInstance.setVarPool(JSONUtils.toJsonString(properties));
            } else {
                this.processInstance.setVarPool(JSONUtils.toJsonString(taskProperties));
            }
        }
        for (TaskInstance task : taskInstances) {
            if (this.readyToSubmitTaskQueue.contains(task)) {
                logger.warn("Task is already at submit queue, taskInstanceId: {}", (Object)task.getId());
                continue;
            }
            if (task.getId() != null && this.completeTaskMap.containsKey(task.getTaskCode())) {
                logger.info("Task has already run success, taskName: {}", (Object)task.getName());
                continue;
            }
            if (task.getState().isKill()) {
                logger.info("Task is be stopped, the state is {}, taskInstanceId: {}", (Object)task.getState(), (Object)task.getId());
                continue;
            }
            this.addTaskToStandByList(task);
        }
        this.submitStandByTask();
        this.updateProcessInstanceState();
    }

    private DependResult isTaskDepsComplete(String taskCode) {
        Collection startNodes = this.dag.getBeginNode();
        if (startNodes.contains(taskCode)) {
            return DependResult.SUCCESS;
        }
        TaskNode taskNode = (TaskNode)this.dag.getNode((Object)taskCode);
        ArrayList<String> indirectDepCodeList = new ArrayList<String>();
        this.setIndirectDepList(taskCode, indirectDepCodeList);
        for (String depsNode : indirectDepCodeList) {
            if (!this.dag.containsNode((Object)depsNode) || this.skipTaskNodeMap.containsKey(depsNode)) continue;
            Long despNodeTaskCode = Long.parseLong(depsNode);
            if (!this.completeTaskMap.containsKey(despNodeTaskCode)) {
                return DependResult.WAITING;
            }
            Integer depsTaskId = this.completeTaskMap.get(despNodeTaskCode);
            TaskExecutionStatus depTaskState = this.taskInstanceMap.get(depsTaskId).getState();
            if (depTaskState.isKill()) {
                return DependResult.NON_EXEC;
            }
            if (taskNode.isBlockingTask() || taskNode.isConditionsTask() || this.dependTaskSuccess(depsNode, taskCode)) continue;
            return DependResult.FAILED;
        }
        logger.info("The dependTasks of task all success, currentTaskCode: {}, dependTaskCodes: {}", (Object)taskCode, (Object)Arrays.toString(this.completeTaskMap.keySet().toArray()));
        return DependResult.SUCCESS;
    }

    private void setIndirectDepList(String taskCode, List<String> indirectDepCodeList) {
        TaskNode taskNode = (TaskNode)this.dag.getNode((Object)taskCode);
        List depCodeList = taskNode.getDepList();
        for (String depsNode : depCodeList) {
            if (this.forbiddenTaskMap.containsKey(Long.parseLong(depsNode))) {
                this.setIndirectDepList(depsNode, indirectDepCodeList);
                continue;
            }
            indirectDepCodeList.add(depsNode);
        }
    }

    private boolean dependTaskSuccess(String dependNodeName, String nextNodeName) {
        if (((TaskNode)this.dag.getNode((Object)dependNodeName)).isConditionsTask()) {
            List nextTaskList = DagHelper.parseConditionTask((String)dependNodeName, this.skipTaskNodeMap, this.dag, this.getCompleteTaskInstanceMap());
            if (!nextTaskList.contains(nextNodeName)) {
                logger.info("DependTask is a condition task, and its next condition branch does not hava current task, dependTaskCode: {}, currentTaskCode: {}", (Object)dependNodeName, (Object)nextNodeName);
                return false;
            }
        } else {
            long taskCode = Long.parseLong(dependNodeName);
            Integer taskInstanceId = this.completeTaskMap.get(taskCode);
            TaskExecutionStatus depTaskState = this.taskInstanceMap.get(taskInstanceId).getState();
            return !depTaskState.isFailure();
        }
        return true;
    }

    private List<TaskInstance> getCompleteTaskByState(TaskExecutionStatus state) {
        ArrayList<TaskInstance> resultList = new ArrayList<TaskInstance>();
        for (Integer taskInstanceId : this.completeTaskMap.values()) {
            TaskInstance taskInstance = this.taskInstanceMap.get(taskInstanceId);
            if (taskInstance == null || taskInstance.getState() != state) continue;
            resultList.add(taskInstance);
        }
        return resultList;
    }

    private WorkflowExecutionStatus runningState(WorkflowExecutionStatus state) {
        if (state == WorkflowExecutionStatus.READY_STOP || state == WorkflowExecutionStatus.READY_PAUSE || state == WorkflowExecutionStatus.READY_BLOCK || state == WorkflowExecutionStatus.DELAY_EXECUTION) {
            return state;
        }
        return WorkflowExecutionStatus.RUNNING_EXECUTION;
    }

    private boolean hasFailedTask() {
        if (this.taskFailedSubmit) {
            return true;
        }
        if (this.errorTaskMap.size() > 0) {
            return true;
        }
        return this.dependFailedTaskSet.size() > 0;
    }

    private boolean processFailed() {
        if (this.hasFailedTask()) {
            logger.info("The current process has failed task, the current process failed");
            if (this.processInstance.getFailureStrategy() == FailureStrategy.END) {
                return true;
            }
            if (this.processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
                return this.readyToSubmitTaskQueue.size() == 0 && this.activeTaskProcessorMaps.size() == 0 && this.waitToRetryTaskInstanceMap.size() == 0;
            }
        }
        return false;
    }

    private WorkflowExecutionStatus processReadyPause() {
        if (this.hasRetryTaskInStandBy()) {
            return WorkflowExecutionStatus.FAILURE;
        }
        List<TaskInstance> pauseList = this.getCompleteTaskByState(TaskExecutionStatus.PAUSE);
        if (CollectionUtils.isNotEmpty(pauseList) || this.processInstance.isBlocked() || !this.isComplementEnd() || this.readyToSubmitTaskQueue.size() > 0) {
            return WorkflowExecutionStatus.PAUSE;
        }
        return WorkflowExecutionStatus.SUCCESS;
    }

    private WorkflowExecutionStatus processReadyBlock() {
        if (this.activeTaskProcessorMaps.size() > 0) {
            for (ITaskProcessor taskProcessor : this.activeTaskProcessorMaps.values()) {
                if ("BLOCKING".equals(taskProcessor.getType())) continue;
                taskProcessor.action(TaskAction.PAUSE);
            }
        }
        if (this.readyToSubmitTaskQueue.size() > 0) {
            Iterator iter = this.readyToSubmitTaskQueue.iterator();
            while (iter.hasNext()) {
                ((TaskInstance)iter.next()).setState(TaskExecutionStatus.PAUSE);
            }
        }
        return WorkflowExecutionStatus.BLOCK;
    }

    private WorkflowExecutionStatus getProcessInstanceState(ProcessInstance instance) {
        WorkflowExecutionStatus state = instance.getState();
        if (this.activeTaskProcessorMaps.size() > 0 || this.hasRetryTaskInStandBy()) {
            WorkflowExecutionStatus executionStatus = this.runningState(state);
            logger.info("The workflowInstance has task running, the workflowInstance status is {}", (Object)executionStatus);
            return executionStatus;
        }
        if (state == WorkflowExecutionStatus.READY_BLOCK) {
            WorkflowExecutionStatus executionStatus = this.processReadyBlock();
            logger.info("The workflowInstance is ready to block, the workflowInstance status is {}", (Object)executionStatus);
            return executionStatus;
        }
        if (state == WorkflowExecutionStatus.READY_PAUSE) {
            WorkflowExecutionStatus executionStatus = this.processReadyPause();
            logger.info("The workflowInstance is ready to pause, the workflow status is {}", (Object)executionStatus);
            return executionStatus;
        }
        if (state == WorkflowExecutionStatus.READY_STOP) {
            List<TaskInstance> killList = this.getCompleteTaskByState(TaskExecutionStatus.KILL);
            List<TaskInstance> failList = this.getCompleteTaskByState(TaskExecutionStatus.FAILURE);
            WorkflowExecutionStatus executionStatus = CollectionUtils.isNotEmpty(killList) || CollectionUtils.isNotEmpty(failList) || !this.isComplementEnd() ? WorkflowExecutionStatus.STOP : WorkflowExecutionStatus.SUCCESS;
            logger.info("The workflowInstance is ready to stop, the workflow status is {}", (Object)executionStatus);
            return executionStatus;
        }
        if (this.processFailed()) {
            logger.info("The workflowInstance is failed, the workflow status is {}", (Object)WorkflowExecutionStatus.FAILURE);
            return WorkflowExecutionStatus.FAILURE;
        }
        if (state == WorkflowExecutionStatus.RUNNING_EXECUTION) {
            List<TaskInstance> killTasks = this.getCompleteTaskByState(TaskExecutionStatus.KILL);
            if (this.readyToSubmitTaskQueue.size() > 0 || this.waitToRetryTaskInstanceMap.size() > 0) {
                return WorkflowExecutionStatus.RUNNING_EXECUTION;
            }
            if (CollectionUtils.isNotEmpty(killTasks)) {
                return WorkflowExecutionStatus.FAILURE;
            }
            return WorkflowExecutionStatus.SUCCESS;
        }
        return state;
    }

    private boolean isComplementEnd() {
        if (!this.processInstance.isComplementData()) {
            return true;
        }
        Map cmdParam = JSONUtils.toMap((String)this.processInstance.getCommandParam());
        Date endTime = DateUtils.stringToDate((String)((String)cmdParam.get("complementEndDate")));
        return this.processInstance.getScheduleTime().equals(endTime);
    }

    private void updateProcessInstanceState() throws StateEventHandleException {
        WorkflowExecutionStatus state = this.getProcessInstanceState(this.processInstance);
        if (this.processInstance.getState() != state) {
            logger.info("Update workflowInstance states, origin state: {}, target state: {}", (Object)this.processInstance.getState(), (Object)state);
            this.updateWorkflowInstanceStatesToDB(state);
            WorkflowStateEvent stateEvent = WorkflowStateEvent.builder().processInstanceId(this.processInstance.getId()).status(this.processInstance.getState()).type(StateEventType.PROCESS_STATE_CHANGE).build();
            this.stateEvents.add(stateEvent);
        } else {
            logger.info("There is no need to update the workflow instance state, origin state: {}, target state: {}", (Object)this.processInstance.getState(), (Object)state);
        }
    }

    public void updateProcessInstanceState(WorkflowStateEvent stateEvent) throws StateEventHandleException {
        WorkflowExecutionStatus state = stateEvent.getStatus();
        this.updateWorkflowInstanceStatesToDB(state);
    }

    private void updateWorkflowInstanceStatesToDB(WorkflowExecutionStatus newStates) throws StateEventHandleException {
        WorkflowExecutionStatus originStates = this.processInstance.getState();
        if (originStates != newStates) {
            logger.info("Begin to update workflow instance state , state will change from {} to {}", (Object)originStates, (Object)newStates);
            this.processInstance.setStateWithDesc(newStates, "update by workflow executor");
            if (newStates.isFinished()) {
                this.processInstance.setEndTime(new Date());
            }
            try {
                this.processInstanceDao.updateProcessInstance(this.processInstance);
            }
            catch (Exception ex) {
                this.processInstance.setStateWithDesc(originStates, "recover state by DB error");
                this.processInstance.setEndTime(null);
                throw new StateEventHandleException("Update process instance status to DB error", ex);
            }
        }
    }

    private DependResult getDependResultForTask(TaskInstance taskInstance) {
        return this.isTaskDepsComplete(Long.toString(taskInstance.getTaskCode()));
    }

    public void addTaskToStandByList(TaskInstance taskInstance) {
        if (this.readyToSubmitTaskQueue.contains(taskInstance)) {
            logger.warn("Task already exists in ready submit queue, no need to add again, task code:{}", (Object)taskInstance.getTaskCode());
            return;
        }
        logger.info("Add task to stand by list, task name:{}, task id:{}, task code:{}", new Object[]{taskInstance.getName(), taskInstance.getId(), taskInstance.getTaskCode()});
        TaskMetrics.incTaskInstanceByState("submit");
        this.readyToSubmitTaskQueue.put(taskInstance);
    }

    private boolean removeTaskFromStandbyList(TaskInstance taskInstance) {
        return this.readyToSubmitTaskQueue.remove(taskInstance);
    }

    private boolean hasRetryTaskInStandBy() {
        Iterator iter = this.readyToSubmitTaskQueue.iterator();
        while (iter.hasNext()) {
            if (!((TaskInstance)iter.next()).getState().isFailure()) continue;
            return true;
        }
        return false;
    }

    public void killAllTasks() {
        logger.info("kill called on process instance id: {}, num: {}", (Object)this.processInstance.getId(), (Object)this.activeTaskProcessorMaps.size());
        if (this.readyToSubmitTaskQueue.size() > 0) {
            this.readyToSubmitTaskQueue.clear();
        }
        for (long taskCode : this.activeTaskProcessorMaps.keySet()) {
            TaskInstance taskInstance;
            ITaskProcessor taskProcessor = this.activeTaskProcessorMaps.get(taskCode);
            Integer taskInstanceId = this.validTaskMap.get(taskCode);
            if (taskInstanceId == null || taskInstanceId.equals(0) || (taskInstance = this.processService.findTaskInstanceById(taskInstanceId)) == null || taskInstance.getState().isFinished()) continue;
            taskProcessor.action(TaskAction.STOP);
            if (!taskProcessor.taskInstance().getState().isFinished()) continue;
            TaskStateEvent taskStateEvent = TaskStateEvent.builder().processInstanceId(this.processInstance.getId()).taskInstanceId(taskInstance.getId()).status(taskProcessor.taskInstance().getState()).type(StateEventType.TASK_STATE_CHANGE).build();
            this.addStateEvent(taskStateEvent);
        }
    }

    public boolean workFlowFinish() {
        return this.processInstance.getState().isFinished();
    }

    public void submitStandByTask() throws StateEventHandleException {
        int length = this.readyToSubmitTaskQueue.size();
        for (int i = 0; i < length; ++i) {
            DependResult dependResult;
            TaskInstance retryTask;
            TaskInstance task = this.readyToSubmitTaskQueue.peek();
            if (task == null) continue;
            if (task.taskCanRetry() && (retryTask = this.processService.findTaskInstanceById(task.getId())) != null && retryTask.getState().isForceSuccess()) {
                task.setState(retryTask.getState());
                logger.info("Task {} has been forced success, put it into complete task list and stop retrying, taskInstanceId: {}", (Object)task.getName(), (Object)task.getId());
                this.removeTaskFromStandbyList(task);
                this.completeTaskMap.put(task.getTaskCode(), task.getId());
                this.taskInstanceMap.put(task.getId(), task);
                this.submitPostNode(Long.toString(task.getTaskCode()));
                continue;
            }
            if (task.isFirstRun()) {
                Set preTask = this.dag.getPreviousNodes((Object)Long.toString(task.getTaskCode()));
                this.getPreVarPool(task, preTask);
            }
            if (DependResult.SUCCESS == (dependResult = this.getDependResultForTask(task))) {
                logger.info("The dependResult of task {} is success, so ready to submit to execute", (Object)task.getName());
                Optional<TaskInstance> taskInstanceOptional = this.submitTaskExec(task);
                if (!taskInstanceOptional.isPresent()) {
                    this.taskFailedSubmit = true;
                    if (!this.removeTaskFromStandbyList(task)) {
                        logger.error("Task submit failed, remove from standby list failed, workflowInstanceId: {}, taskCode: {}", (Object)this.processInstance.getId(), (Object)task.getTaskCode());
                    }
                    this.completeTaskMap.put(task.getTaskCode(), task.getId());
                    this.taskInstanceMap.put(task.getId(), task);
                    this.errorTaskMap.put(task.getTaskCode(), task.getId());
                    this.activeTaskProcessorMaps.remove(task.getTaskCode());
                    logger.error("Task submitted failed, workflowInstanceId: {}, taskInstanceId: {}, taskCode: {}", new Object[]{task.getProcessInstanceId(), task.getId(), task.getTaskCode()});
                    continue;
                }
                this.removeTaskFromStandbyList(task);
                continue;
            }
            if (DependResult.FAILED == dependResult) {
                this.dependFailedTaskSet.add(task.getTaskCode());
                this.removeTaskFromStandbyList(task);
                logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}", (Object)task.getId(), (Object)dependResult);
                continue;
            }
            if (DependResult.NON_EXEC != dependResult) continue;
            this.removeTaskFromStandbyList(task);
            logger.info("Remove task due to depend result not executed, taskInstanceId:{} depend result : {}", (Object)task.getId(), (Object)dependResult);
        }
    }

    protected List<TaskInstance> getRecoverTaskInstanceList(String cmdParam) {
        List startTaskInstanceIds;
        Map paramMap = JSONUtils.toMap((String)cmdParam);
        if (paramMap != null && paramMap.containsKey("StartNodeIdList") && CollectionUtils.isNotEmpty(startTaskInstanceIds = Arrays.stream(((String)paramMap.get("StartNodeIdList")).split(",")).filter(StringUtils::isNotEmpty).map(Integer::valueOf).collect(Collectors.toList()))) {
            return this.processService.findTaskInstanceByIdList(startTaskInstanceIds);
        }
        return Collections.emptyList();
    }

    private List<String> parseStartNodeName(String cmdParam) {
        List<String> startNodeNameList = new ArrayList<String>();
        Map paramMap = JSONUtils.toMap((String)cmdParam);
        if (paramMap == null) {
            return startNodeNameList;
        }
        if (paramMap.containsKey("StartNodeList")) {
            startNodeNameList = Arrays.asList(((String)paramMap.get("StartNodeList")).split(","));
        }
        return startNodeNameList;
    }

    private List<String> getRecoveryNodeCodeList(List<TaskInstance> recoverNodeList) {
        ArrayList<String> recoveryNodeCodeList = new ArrayList<String>();
        if (CollectionUtils.isNotEmpty(recoverNodeList)) {
            for (TaskInstance task : recoverNodeList) {
                recoveryNodeCodeList.add(Long.toString(task.getTaskCode()));
            }
        }
        return recoveryNodeCodeList;
    }

    public ProcessDag generateFlowDag(List<TaskNode> totalTaskNodeList, List<String> startNodeNameList, List<String> recoveryNodeCodeList, TaskDependType depNodeType) throws Exception {
        return DagHelper.generateFlowDag(totalTaskNodeList, startNodeNameList, recoveryNodeCodeList, (TaskDependType)depNodeType);
    }

    private boolean checkTaskQueue() {
        AtomicBoolean result = new AtomicBoolean(false);
        this.taskInstanceMap.forEach((id, taskInstance) -> {
            if (taskInstance != null && taskInstance.getTaskGroupId() > 0) {
                result.set(true);
            }
        });
        return result.get();
    }

    private boolean isNewProcessInstance() {
        if (Flag.YES.equals((Object)this.processInstance.getRecovery())) {
            logger.info("This workInstance will be recover by this execution");
            return false;
        }
        if (WorkflowExecutionStatus.RUNNING_EXECUTION == this.processInstance.getState() && this.processInstance.getRunTimes() == 1) {
            return true;
        }
        logger.info("The workflowInstance has been executed before, this execution is to reRun, processInstance status: {}, runTimes: {}", (Object)this.processInstance.getState(), (Object)this.processInstance.getRunTimes());
        return false;
    }

    public void resubmit(long taskCode) throws Exception {
        ITaskProcessor taskProcessor = this.activeTaskProcessorMaps.get(taskCode);
        if (taskProcessor == null) {
            throw new Exception("resubmit error, taskProcessor is null, task code: " + taskCode);
        }
        taskProcessor.action(TaskAction.RESUBMIT);
        logger.debug("RESUBMIT: task code:{}", (Object)taskCode);
    }

    public Map<Long, Integer> getCompleteTaskMap() {
        return this.completeTaskMap;
    }

    public Map<Long, ITaskProcessor> getActiveTaskProcessMap() {
        return this.activeTaskProcessorMaps;
    }

    public Map<Long, TaskInstance> getWaitToRetryTaskInstanceMap() {
        return this.waitToRetryTaskInstanceMap;
    }

    private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam) {
        Map startParamMap = new HashMap();
        if (cmdParam.containsKey("StartParams")) {
            String startParamJson = cmdParam.get("StartParams");
            startParamMap = JSONUtils.toMap((String)startParamJson);
        }
        Map fatherParamMap = new HashMap();
        if (cmdParam.containsKey("fatherParams")) {
            String fatherParamJson = cmdParam.get("fatherParams");
            fatherParamMap = JSONUtils.toMap((String)fatherParamJson);
        }
        startParamMap.putAll(fatherParamMap);
        Map globalMap = processDefinition.getGlobalParamMap();
        List globalParamList = processDefinition.getGlobalParamList();
        if (startParamMap.size() > 0 && globalMap != null) {
            for (Map.Entry entry : globalMap.entrySet()) {
                String val = (String)startParamMap.get(entry.getKey());
                if (val == null) continue;
                entry.setValue(val);
            }
            for (Map.Entry entry : startParamMap.entrySet()) {
                if (globalMap.containsKey(entry.getKey())) continue;
                globalMap.put((String)entry.getKey(), (String)entry.getValue());
                globalParamList.add(new Property((String)entry.getKey(), Direct.IN, DataType.VARCHAR, (String)entry.getValue()));
            }
        }
    }

    private static enum WorkflowRunnableStatus {
        CREATED,
        INITIALIZE_DAG,
        INITIALIZE_QUEUE,
        STARTED;

    }
}

