/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.api.service.impl;

import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import java.io.Serializable;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant;
import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowExecuteResponse;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.executor.ExecuteClient;
import org.apache.dolphinscheduler.api.executor.ExecuteContext;
import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.MonitorService;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.WorkerGroupService;
import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.ApiTriggerType;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
import org.apache.dolphinscheduler.common.enums.CycleEnum;
import org.apache.dolphinscheduler.common.enums.ExecutionOrder;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.remote.command.Message;
import org.apache.dolphinscheduler.remote.command.task.TaskExecuteStartMessage;
import org.apache.dolphinscheduler.remote.command.task.TaskForceStartRequest;
import org.apache.dolphinscheduler.remote.command.workflow.WorkflowExecutingDataRequest;
import org.apache.dolphinscheduler.remote.command.workflow.WorkflowExecutingDataResponse;
import org.apache.dolphinscheduler.remote.command.workflow.WorkflowStateEventChangeRequest;
import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.cron.CronUtils;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.process.TriggerRelationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class ExecutorServiceImpl
extends BaseServiceImpl
implements ExecutorService {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ExecutorServiceImpl.class);
    @Autowired
    private ProjectMapper projectMapper;
    @Autowired
    private ProjectService projectService;
    @Autowired
    private ProcessDefinitionMapper processDefinitionMapper;
    @Autowired
    ProcessDefinitionMapper processDefineMapper;
    @Autowired
    private MonitorService monitorService;
    @Autowired
    private ProcessInstanceMapper processInstanceMapper;
    @Lazy
    @Autowired
    private ProcessService processService;
    @Autowired
    private ProcessInstanceDao processInstanceDao;
    @Autowired
    private ProcessDefinitionService processDefinitionService;
    @Autowired
    private CommandService commandService;
    @Autowired
    private TaskDefinitionLogMapper taskDefinitionLogMapper;
    @Autowired
    private StateEventCallbackService stateEventCallbackService;
    @Autowired
    private TaskDefinitionMapper taskDefinitionMapper;
    @Autowired
    private ProcessTaskRelationMapper processTaskRelationMapper;
    @Autowired
    private TaskGroupQueueMapper taskGroupQueueMapper;
    @Autowired
    private WorkerGroupService workerGroupService;
    @Autowired
    private TriggerRelationService triggerRelationService;
    @Autowired
    private ExecuteClient executeClient;
    @Autowired
    private TenantMapper tenantMapper;

    @Override
    @Transactional(rollbackFor={Exception.class})
    public Map<String, Object> execProcessInstance(User loginUser, long projectCode, long processDefinitionCode, String cronTime, CommandType commandType, FailureStrategy failureStrategy, String startNodeList, TaskDependType taskDependType, WarningType warningType, Integer warningGroupId, RunMode runMode, Priority processInstancePriority, String workerGroup, String tenantCode, Long environmentCode, Integer timeout, Map<String, String> startParams, Integer expectedParallelismNumber, int dryRun, int testFlag, ComplementDependentMode complementDependentMode, Integer version, boolean allLevelDependent, ExecutionOrder executionOrder) {
        Project project = this.projectMapper.queryByCode(projectCode);
        Map<String, Object> result = this.projectService.checkProjectAndAuth(loginUser, project, projectCode, "project:executors:start");
        if (result.get("status") != Status.SUCCESS) {
            return result;
        }
        if (timeout <= 0 || timeout > 86400) {
            log.warn("Parameter timeout is invalid, timeout:{}.", (Object)timeout);
            this.putMsg(result, Status.TASK_TIMEOUT_PARAMS_ERROR, new Object[0]);
            return result;
        }
        if (Objects.nonNull(expectedParallelismNumber) && expectedParallelismNumber <= 0) {
            log.warn("Parameter expectedParallelismNumber is invalid, expectedParallelismNumber:{}.", (Object)expectedParallelismNumber);
            this.putMsg(result, Status.TASK_PARALLELISM_PARAMS_ERROR, new Object[0]);
            return result;
        }
        this.checkValidTenant(tenantCode);
        ProcessDefinition processDefinition = null != version ? this.processService.findProcessDefinition(Long.valueOf(processDefinitionCode), version.intValue()) : this.processDefinitionMapper.queryByCode(processDefinitionCode);
        this.checkProcessDefinitionValid(projectCode, processDefinition, processDefinitionCode, processDefinition.getVersion());
        this.checkStartNodeList(startNodeList, processDefinitionCode, processDefinition.getVersion());
        this.checkScheduleTimeNumExceed(commandType, cronTime);
        this.checkMasterExists();
        long triggerCode = CodeGenerateUtils.getInstance().genCode();
        int create = this.createCommand(triggerCode, commandType, processDefinition.getCode(), taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(), warningGroupId, runMode, processInstancePriority, workerGroup, tenantCode, environmentCode, startParams, expectedParallelismNumber, dryRun, testFlag, complementDependentMode, allLevelDependent, executionOrder);
        if (create > 0) {
            processDefinition.setWarningGroupId(warningGroupId);
            this.processDefinitionMapper.updateById(processDefinition);
            log.info("Create command complete, processDefinitionCode:{}, commandCount:{}.", (Object)processDefinition.getCode(), (Object)create);
            result.put("data", triggerCode);
            this.putMsg(result, Status.SUCCESS, new Object[0]);
        } else {
            log.error("Start process instance failed because create command error, processDefinitionCode:{}.", (Object)processDefinition.getCode());
            this.putMsg(result, Status.START_PROCESS_INSTANCE_ERROR, new Object[0]);
        }
        return result;
    }

    private void checkMasterExists() {
        List<Server> masterServers = this.monitorService.getServerListFromRegistry(true);
        if (masterServers.isEmpty()) {
            throw new ServiceException(Status.MASTER_NOT_EXISTS);
        }
    }

    private void checkScheduleTimeNumExceed(CommandType complementData, String cronTime) {
        String[] stringDates;
        if (!CommandType.COMPLEMENT_DATA.equals((Object)complementData)) {
            return;
        }
        if (cronTime == null) {
            return;
        }
        Map cronMap = JSONUtils.toMap((String)cronTime);
        if (cronMap.containsKey("complementScheduleDateList") && (stringDates = ((String)cronMap.get("complementScheduleDateList")).split(",")).length > 100) {
            log.warn("Parameter cornTime is bigger than {}.", (Object)100);
            throw new ServiceException(Status.SCHEDULE_TIME_NUMBER_EXCEED);
        }
    }

    @Override
    public void checkProcessDefinitionValid(long projectCode, ProcessDefinition processDefinition, long processDefineCode, Integer version) {
        if (projectCode != processDefinition.getProjectCode()) {
            throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, processDefinition.getCode());
        }
        if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
            throw new ServiceException(Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getCode(), processDefinition.getVersion());
        }
        if (!this.checkSubProcessDefinitionValid(processDefinition)) {
            throw new ServiceException(Status.SUB_PROCESS_DEFINE_NOT_RELEASE);
        }
    }

    @Override
    public boolean checkSubProcessDefinitionValid(ProcessDefinition processDefinition) {
        List processTaskRelations = this.processTaskRelationMapper.queryDownstreamByProcessDefinitionCode(processDefinition.getCode());
        if (processTaskRelations.isEmpty()) {
            return true;
        }
        Set relationCodes = processTaskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toSet());
        List taskDefinitions = this.taskDefinitionMapper.queryByCodeList(relationCodes);
        HashSet processDefinitionCodeSet = new HashSet();
        taskDefinitions.stream().filter(task -> "SUB_PROCESS".equalsIgnoreCase(task.getTaskType())).forEach(taskDefinition -> processDefinitionCodeSet.add(Long.valueOf(JSONUtils.getNodeString((String)taskDefinition.getTaskParams(), (String)"processDefinitionCode"))));
        if (processDefinitionCodeSet.isEmpty()) {
            return true;
        }
        List processDefinitions = this.processDefinitionMapper.queryByCodes(processDefinitionCodeSet);
        return processDefinitions.stream().filter(definition -> definition.getReleaseState().equals((Object)ReleaseState.OFFLINE)).collect(Collectors.toSet()).isEmpty();
    }

    private void checkValidTenant(String tenantCode) {
        Tenant tenant;
        if (!"default".equals(tenantCode) && (tenant = this.tenantMapper.queryByTenantCode(tenantCode)) == null) {
            throw new ServiceException(Status.TENANT_NOT_EXIST, tenantCode);
        }
    }

    @Override
    public Map<String, Object> execute(User loginUser, long projectCode, Integer processInstanceId, ExecuteType executeType) {
        Preconditions.checkNotNull((Object)processInstanceId, (Object)"workflowInstanceId cannot be null");
        Preconditions.checkNotNull((Object)((Object)executeType), (Object)"executeType cannot be null");
        this.projectService.checkProjectAndAuthThrowException(loginUser, projectCode, ApiFuncIdentificationConstant.map.get((Object)executeType));
        this.checkMasterExists();
        ProcessInstance workflowInstance = (ProcessInstance)this.processInstanceDao.queryOptionalById((Serializable)processInstanceId).orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId));
        Preconditions.checkState((workflowInstance.getProjectCode() == projectCode ? 1 : 0) != 0, (Object)"The workflow instance's project code doesn't equals to the given project");
        ProcessDefinition processDefinition = this.processDefinitionService.queryWorkflowDefinitionThrowExceptionIfNotFound(workflowInstance.getProcessDefinitionCode(), workflowInstance.getProcessDefinitionVersion());
        this.executeClient.executeWorkflowInstance(new ExecuteContext(workflowInstance, processDefinition, loginUser, executeType));
        HashMap<String, Object> result = new HashMap<String, Object>();
        result.put("status", (Object)Status.SUCCESS);
        return result;
    }

    @Override
    public Map<String, Object> execute(User loginUser, Integer workflowInstanceId, ExecuteType executeType) {
        ProcessInstance processInstance = (ProcessInstance)this.processInstanceMapper.selectById((Serializable)workflowInstanceId);
        return this.execute(loginUser, processInstance.getProjectCode(), workflowInstanceId, executeType);
    }

    @Override
    public WorkflowExecuteResponse executeTask(User loginUser, long projectCode, Integer processInstanceId, String startNodeList, TaskDependType taskDependType) {
        long startNodeListLong;
        WorkflowExecuteResponse response = new WorkflowExecuteResponse();
        Project project = this.projectMapper.queryByCode(projectCode);
        this.projectService.checkProjectAndAuthThrowException(loginUser, project, ApiFuncIdentificationConstant.map.get((Object)ExecuteType.EXECUTE_TASK));
        ProcessInstance processInstance = (ProcessInstance)this.processService.findProcessInstanceDetailById(processInstanceId.intValue()).orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId));
        if (!processInstance.getState().isFinished()) {
            log.error("Can not execute task for process instance which is not finished, processInstanceId:{}.", (Object)processInstanceId);
            this.putMsg((Result)response, Status.WORKFLOW_INSTANCE_IS_NOT_FINISHED, new Object[0]);
            return response;
        }
        ProcessDefinition processDefinition = this.processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
        processDefinition.setReleaseState(ReleaseState.ONLINE);
        this.checkProcessDefinitionValid(projectCode, processDefinition, processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
        try {
            startNodeListLong = Long.parseLong(startNodeList);
        }
        catch (NumberFormatException e) {
            log.error("startNodeList is not a number");
            this.putMsg((Result)response, Status.REQUEST_PARAMS_NOT_VALID_ERROR, new Object[]{startNodeList});
            return response;
        }
        if (this.taskDefinitionLogMapper.queryMaxVersionForDefinition(startNodeListLong) == null) {
            this.putMsg((Result)response, Status.EXECUTE_NOT_DEFINE_TASK, new Object[0]);
            return response;
        }
        HashMap<String, Object> cmdParam = new HashMap<String, Object>();
        cmdParam.put("ProcessInstanceId", processInstanceId);
        cmdParam.put("StartNodeList", startNodeList);
        Command command = new Command();
        command.setCommandType(CommandType.EXECUTE_TASK);
        command.setProcessDefinitionCode(processDefinition.getCode());
        command.setCommandParam(JSONUtils.toJsonString(cmdParam));
        command.setExecutorId(loginUser.getId().intValue());
        command.setProcessDefinitionVersion(processDefinition.getVersion());
        command.setProcessInstanceId(processInstanceId.intValue());
        command.setTestFlag(processInstance.getTestFlag());
        command.setTaskDependType(taskDependType);
        if (!this.commandService.verifyIsNeedCreateCommand(command)) {
            log.warn("Process instance is executing the command, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.", new Object[]{processDefinition.getCode(), processDefinition.getVersion(), processInstanceId});
            this.putMsg((Result)response, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, new Object[]{String.valueOf(processDefinition.getCode())});
            return response;
        }
        log.info("Creating command, commandInfo:{}.", (Object)command);
        int create = this.commandService.createCommand(command);
        if (create > 0) {
            log.info("Create {} command complete, processDefinitionCode:{}, processDefinitionVersion:{}.", new Object[]{command.getCommandType().getDescp(), command.getProcessDefinitionCode(), processDefinition.getVersion()});
            this.putMsg((Result)response, Status.SUCCESS, new Object[0]);
        } else {
            log.error("Execute process instance failed because create {} command error, processDefinitionCode:{}, processDefinitionVersion:{}\uff0c processInstanceId:{}.", new Object[]{command.getCommandType().getDescp(), command.getProcessDefinitionCode(), processDefinition.getVersion(), processInstanceId});
            this.putMsg((Result)response, Status.EXECUTE_PROCESS_INSTANCE_ERROR, new Object[0]);
        }
        return response;
    }

    @Override
    public Map<String, Object> forceStartTaskInstance(User loginUser, int queueId) {
        HashMap<String, Object> result = new HashMap<String, Object>();
        TaskGroupQueue taskGroupQueue = (TaskGroupQueue)this.taskGroupQueueMapper.selectById((Serializable)Integer.valueOf(queueId));
        ProcessInstance processInstance = (ProcessInstance)this.processInstanceMapper.selectById((Serializable)Integer.valueOf(taskGroupQueue.getProcessId()));
        if (processInstance == null) {
            log.error("Process instance does not exist, projectCode:{}, processInstanceId:{}.", (Object)taskGroupQueue.getProjectCode(), (Object)taskGroupQueue.getProcessId());
            this.putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, taskGroupQueue.getProcessId());
            return result;
        }
        this.checkMasterExists();
        return this.forceStart(processInstance, taskGroupQueue);
    }

    public void checkStartNodeList(String startNodeList, Long processDefinitionCode, int version) {
        if (StringUtils.isNotEmpty((CharSequence)startNodeList)) {
            List processTaskRelations = this.processService.findRelationByCode(processDefinitionCode.longValue(), version);
            List existsNodes = processTaskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toList());
            for (String startNode : startNodeList.split(",")) {
                if (existsNodes.contains(Long.valueOf(startNode))) continue;
                throw new ServiceException(Status.START_NODE_NOT_EXIST_IN_LAST_PROCESS, startNode);
            }
        }
    }

    private Map<String, Object> checkExecuteType(ProcessInstance processInstance, ExecuteType executeType) {
        HashMap<String, Object> result = new HashMap<String, Object>();
        WorkflowExecutionStatus executionStatus = processInstance.getState();
        boolean checkResult = false;
        switch (executeType) {
            case PAUSE: {
                if (!executionStatus.isRunning()) break;
                checkResult = true;
                break;
            }
            case STOP: {
                if (!executionStatus.canStop()) break;
                checkResult = true;
                break;
            }
            case REPEAT_RUNNING: {
                if (!executionStatus.isFinished()) break;
                checkResult = true;
                break;
            }
            case START_FAILURE_TASK_PROCESS: {
                if (!executionStatus.isFailure()) break;
                checkResult = true;
                break;
            }
            case RECOVER_SUSPENDED_PROCESS: {
                if (!executionStatus.isPause() && !executionStatus.isStop()) break;
                checkResult = true;
                break;
            }
        }
        if (!checkResult) {
            this.putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, processInstance.getName(), executionStatus.toString(), executeType.toString());
        } else {
            this.putMsg(result, Status.SUCCESS, new Object[0]);
        }
        return result;
    }

    private Map<String, Object> updateProcessInstancePrepare(ProcessInstance processInstance, CommandType commandType, WorkflowExecutionStatus executionStatus) {
        HashMap<String, Object> result = new HashMap<String, Object>();
        processInstance.setCommandType(commandType);
        processInstance.addHistoryCmd(commandType);
        processInstance.setStateWithDesc(executionStatus, commandType.getDescp() + "by ui");
        boolean update = this.processInstanceDao.updateById((Object)processInstance);
        if (update) {
            log.info("Process instance state is updated to {} in database, processInstanceName:{}.", (Object)executionStatus.getDesc(), (Object)processInstance.getName());
            WorkflowStateEventChangeRequest workflowStateEventChangeRequest = new WorkflowStateEventChangeRequest(processInstance.getId().intValue(), 0, processInstance.getState(), processInstance.getId().intValue(), 0);
            Host host = new Host(processInstance.getHost());
            this.stateEventCallbackService.sendResult(host, workflowStateEventChangeRequest.convert2Command());
            this.putMsg(result, Status.SUCCESS, new Object[0]);
        } else {
            log.error("Process instance state update error, processInstanceName:{}.", (Object)processInstance.getName());
            this.putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR, new Object[0]);
        }
        return result;
    }

    private Map<String, Object> forceStart(ProcessInstance processInstance, TaskGroupQueue taskGroupQueue) {
        HashMap<String, Object> result = new HashMap<String, Object>();
        if (taskGroupQueue.getStatus() != TaskGroupQueueStatus.WAIT_QUEUE) {
            log.warn("Task group queue already starts, taskGroupQueueId:{}.", (Object)taskGroupQueue.getId());
            this.putMsg(result, Status.TASK_GROUP_QUEUE_ALREADY_START, new Object[0]);
            return result;
        }
        taskGroupQueue.setForceStart(Flag.YES.getCode());
        this.processService.updateTaskGroupQueue(taskGroupQueue);
        log.info("Sending force start command to master: {}.", (Object)processInstance.getHost());
        this.stateEventCallbackService.sendResult(Host.of((String)processInstance.getHost()), new TaskForceStartRequest(processInstance.getId().intValue(), taskGroupQueue.getTaskId()).convert2Command());
        this.putMsg(result, Status.SUCCESS, new Object[0]);
        return result;
    }

    @Override
    public Map<String, Object> startCheckByProcessDefinedCode(long processDefinitionCode) {
        List processDefinitionList;
        HashMap<String, Object> result = new HashMap<String, Object>();
        ProcessDefinition processDefinition = this.processDefinitionMapper.queryByCode(processDefinitionCode);
        if (processDefinition == null) {
            log.error("Process definition is not be found, processDefinitionCode:{}.", (Object)processDefinitionCode);
            this.putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "processDefinitionCode");
            return result;
        }
        ArrayList codes = new ArrayList();
        this.processService.recurseFindSubProcess(processDefinition.getCode(), codes);
        if (!codes.isEmpty() && (processDefinitionList = this.processDefinitionMapper.queryByCodes(codes)) != null) {
            for (ProcessDefinition processDefinitionTmp : processDefinitionList) {
                if (processDefinitionTmp.getReleaseState() == ReleaseState.ONLINE) continue;
                log.warn("Subprocess definition {} of process definition {} is not {}.", new Object[]{processDefinitionTmp.getName(), processDefinition.getName(), ReleaseState.ONLINE.getDescp()});
                this.putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinitionTmp.getName());
                return result;
            }
        }
        this.putMsg(result, Status.SUCCESS, new Object[0]);
        return result;
    }

    private int createCommand(Long triggerCode, CommandType commandType, long processDefineCode, TaskDependType nodeDep, FailureStrategy failureStrategy, String startNodeList, String schedule, WarningType warningType, int executorId, Integer warningGroupId, RunMode runMode, Priority processInstancePriority, String workerGroup, String tenantCode, Long environmentCode, Map<String, String> startParams, Integer expectedParallelismNumber, int dryRun, int testFlag, ComplementDependentMode complementDependentMode, boolean allLevelDependent, ExecutionOrder executionOrder) {
        Command command = new Command();
        HashMap<String, String> cmdParam = new HashMap<String, String>();
        if (commandType == null) {
            command.setCommandType(CommandType.START_PROCESS);
        } else {
            command.setCommandType(commandType);
        }
        command.setProcessDefinitionCode(processDefineCode);
        if (nodeDep != null) {
            command.setTaskDependType(nodeDep);
        }
        if (failureStrategy != null) {
            command.setFailureStrategy(failureStrategy);
        }
        if (!StringUtils.isEmpty((CharSequence)startNodeList)) {
            cmdParam.put("StartNodeList", startNodeList);
        }
        if (warningType != null) {
            command.setWarningType(warningType);
        }
        if (startParams != null && startParams.size() > 0) {
            cmdParam.put("StartParams", JSONUtils.toJsonString(startParams));
        }
        command.setCommandParam(JSONUtils.toJsonString(cmdParam));
        command.setExecutorId(executorId);
        command.setWarningGroupId(warningGroupId);
        command.setProcessInstancePriority(processInstancePriority);
        command.setWorkerGroup(workerGroup);
        command.setTenantCode(tenantCode);
        command.setEnvironmentCode(environmentCode);
        command.setDryRun(dryRun);
        command.setTestFlag(testFlag);
        ProcessDefinition processDefinition = this.processService.findProcessDefinitionByCode(Long.valueOf(processDefineCode));
        if (processDefinition != null) {
            command.setProcessDefinitionVersion(processDefinition.getVersion());
        }
        command.setProcessInstanceId(0);
        if (commandType == CommandType.COMPLEMENT_DATA) {
            if (schedule == null || StringUtils.isEmpty((CharSequence)schedule)) {
                log.error("Create {} type command error because parameter schedule is invalid.", (Object)command.getCommandType().getDescp());
                return 0;
            }
            if (!this.isValidateScheduleTime(schedule)) {
                return 0;
            }
            try {
                log.info("Start to create {} command, processDefinitionCode:{}.", (Object)command.getCommandType().getDescp(), (Object)processDefineCode);
                return this.createComplementCommandList(triggerCode, schedule, runMode, command, expectedParallelismNumber, complementDependentMode, allLevelDependent, executionOrder);
            }
            catch (CronParseException cronParseException) {
                return 0;
            }
        }
        command.setCommandParam(JSONUtils.toJsonString(cmdParam));
        int count = this.commandService.createCommand(command);
        if (count > 0) {
            this.triggerRelationService.saveTriggerToDb(ApiTriggerType.COMMAND, triggerCode, command.getId());
        }
        return count;
    }

    private int createComplementCommand(Long triggerCode, Command command, Map<String, String> cmdParam, List<ZonedDateTime> dateTimeList, List<Schedule> schedules, ComplementDependentMode complementDependentMode, boolean allLevelDependent) {
        String dateTimeListStr = dateTimeList.stream().map(item -> DateUtils.dateToString((ZonedDateTime)item)).collect(Collectors.joining(","));
        cmdParam.put("complementScheduleDateList", dateTimeListStr);
        command.setCommandParam(JSONUtils.toJsonString(cmdParam));
        log.info("Creating command, commandInfo:{}.", (Object)command);
        int createCount = this.commandService.createCommand(command);
        if (createCount > 0) {
            log.info("Create {} command complete, processDefinitionCode:{}", (Object)command.getCommandType().getDescp(), (Object)command.getProcessDefinitionCode());
        } else {
            log.error("Create {} command error, processDefinitionCode:{}", (Object)command.getCommandType().getDescp(), (Object)command.getProcessDefinitionCode());
        }
        if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
            log.info("Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.", (Object)command.getProcessDefinitionCode());
        } else {
            log.info("Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.", (Object)command.getProcessDefinitionCode());
            this.createComplementDependentCommand(schedules, command, allLevelDependent);
        }
        if (createCount > 0) {
            this.triggerRelationService.saveTriggerToDb(ApiTriggerType.COMMAND, triggerCode, command.getId());
        }
        return createCount;
    }

    protected int createComplementCommandList(Long triggerCode, String scheduleTimeParam, RunMode runMode, Command command, Integer expectedParallelismNumber, ComplementDependentMode complementDependentMode, boolean allLevelDependent, ExecutionOrder executionOrder) throws CronParseException {
        String dateList;
        int createCount = 0;
        int dependentProcessDefinitionCreateCount = 0;
        runMode = runMode == null ? RunMode.RUN_MODE_SERIAL : runMode;
        Map cmdParam = JSONUtils.toMap((String)command.getCommandParam());
        Map scheduleParam = JSONUtils.toMap((String)scheduleTimeParam);
        if (Objects.isNull(executionOrder)) {
            executionOrder = ExecutionOrder.DESC_ORDER;
        }
        List schedules = this.processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
        List<Object> listDate = new ArrayList();
        if (scheduleParam.containsKey("complementStartDate") && scheduleParam.containsKey("complementEndDate")) {
            String startDate = (String)scheduleParam.get("complementStartDate");
            String endDate = (String)scheduleParam.get("complementEndDate");
            if (startDate != null && endDate != null) {
                listDate = CronUtils.getSelfFireDateList((ZonedDateTime)DateUtils.stringToZoneDateTime((String)startDate), (ZonedDateTime)DateUtils.stringToZoneDateTime((String)endDate), (List)schedules);
            }
        }
        if (scheduleParam.containsKey("complementScheduleDateList") && StringUtils.isNotBlank((CharSequence)(dateList = (String)scheduleParam.get("complementScheduleDateList")))) {
            listDate = Splitter.on((String)",").splitToStream((CharSequence)dateList).map(item -> DateUtils.stringToZoneDateTime((String)item.trim())).distinct().collect(Collectors.toList());
        }
        if (CollectionUtils.isEmpty(listDate)) {
            throw new ServiceException(Status.TASK_COMPLEMENT_DATA_DATE_ERROR);
        }
        if (executionOrder.equals((Object)ExecutionOrder.DESC_ORDER)) {
            Collections.sort(listDate, Collections.reverseOrder());
        } else {
            Collections.sort(listDate);
        }
        switch (runMode) {
            case RUN_MODE_SERIAL: {
                log.info("RunMode of {} command is serial run, processDefinitionCode:{}.", (Object)command.getCommandType().getDescp(), (Object)command.getProcessDefinitionCode());
                createCount = this.createComplementCommand(triggerCode, command, cmdParam, listDate, schedules, complementDependentMode, allLevelDependent);
                break;
            }
            case RUN_MODE_PARALLEL: {
                log.info("RunMode of {} command is parallel run, processDefinitionCode:{}.", (Object)command.getCommandType().getDescp(), (Object)command.getProcessDefinitionCode());
                int queueNum = 0;
                if (!CollectionUtils.isNotEmpty(listDate)) break;
                queueNum = listDate.size();
                if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
                    queueNum = Math.min(queueNum, expectedParallelismNumber);
                }
                log.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.", (Object)queueNum);
                List[] queues = new List[queueNum];
                for (int i = 0; i < listDate.size(); ++i) {
                    if (Objects.isNull(queues[i % queueNum])) {
                        queues[i % queueNum] = new ArrayList();
                    }
                    queues[i % queueNum].add(listDate.get(i));
                }
                for (List queue : queues) {
                    createCount = this.createComplementCommand(triggerCode, command, cmdParam, queue, schedules, complementDependentMode, allLevelDependent);
                }
                break;
            }
        }
        log.info("Create complement command count:{}, Create dependent complement command count:{}", (Object)createCount, (Object)dependentProcessDefinitionCreateCount);
        return createCount;
    }

    public int createComplementDependentCommand(List<Schedule> schedules, Command command, boolean allLevelDependent) {
        Command dependentCommand;
        int dependentProcessDefinitionCreateCount = 0;
        try {
            dependentCommand = (Command)BeanUtils.cloneBean((Object)command);
        }
        catch (Exception e) {
            log.error("Copy dependent command error.", (Throwable)e);
            return dependentProcessDefinitionCreateCount;
        }
        List<DependentProcessDefinition> dependentProcessDefinitionList = this.getComplementDependentDefinitionList(dependentCommand.getProcessDefinitionCode(), CronUtils.getMaxCycle((String)schedules.get(0).getCrontab()), dependentCommand.getWorkerGroup(), allLevelDependent);
        dependentCommand.setTaskDependType(TaskDependType.TASK_POST);
        for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) {
            dependentCommand.setId(null);
            dependentCommand.setProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode());
            dependentCommand.setProcessDefinitionVersion(dependentProcessDefinition.getProcessDefinitionVersion());
            dependentCommand.setWorkerGroup(dependentProcessDefinition.getWorkerGroup());
            Map cmdParam = JSONUtils.toMap((String)dependentCommand.getCommandParam());
            cmdParam.put("StartNodeList", String.valueOf(dependentProcessDefinition.getTaskDefinitionCode()));
            dependentCommand.setCommandParam(JSONUtils.toJsonString((Object)cmdParam));
            log.info("Creating complement dependent command, commandInfo:{}.", (Object)command);
            dependentProcessDefinitionCreateCount += this.commandService.createCommand(dependentCommand);
        }
        return dependentProcessDefinitionCreateCount;
    }

    private List<DependentProcessDefinition> getComplementDependentDefinitionList(long processDefinitionCode, CycleEnum processDefinitionCycle, String workerGroup, boolean allLevelDependent) {
        List<DependentProcessDefinition> dependentProcessDefinitionList = this.checkDependentProcessDefinitionValid(this.processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode), processDefinitionCycle, workerGroup, processDefinitionCode);
        if (dependentProcessDefinitionList.isEmpty()) {
            return dependentProcessDefinitionList;
        }
        if (allLevelDependent) {
            List childDependentList;
            ArrayList<Object> childList = new ArrayList<DependentProcessDefinition>(dependentProcessDefinitionList);
            while (!(childDependentList = childList.stream().flatMap(dependentProcessDefinition -> this.checkDependentProcessDefinitionValid(this.processService.queryDependentProcessDefinitionByProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode()), processDefinitionCycle, workerGroup, dependentProcessDefinition.getProcessDefinitionCode()).stream()).collect(Collectors.toList())).isEmpty()) {
                dependentProcessDefinitionList.addAll(childDependentList);
                childList = new ArrayList(childDependentList);
            }
        }
        return dependentProcessDefinitionList;
    }

    private List<DependentProcessDefinition> checkDependentProcessDefinitionValid(List<DependentProcessDefinition> dependentProcessDefinitionList, CycleEnum processDefinitionCycle, String workerGroup, long upstreamProcessDefinitionCode) {
        ArrayList<DependentProcessDefinition> validDependentProcessDefinitionList = new ArrayList<DependentProcessDefinition>();
        List<Long> processDefinitionCodeList = dependentProcessDefinitionList.stream().map(DependentProcessDefinition::getProcessDefinitionCode).collect(Collectors.toList());
        Map<Long, String> processDefinitionWorkerGroupMap = this.workerGroupService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList);
        for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) {
            if (dependentProcessDefinition.getDependentCycle(upstreamProcessDefinitionCode) != processDefinitionCycle) continue;
            if (processDefinitionWorkerGroupMap.get(dependentProcessDefinition.getProcessDefinitionCode()) == null) {
                dependentProcessDefinition.setWorkerGroup(workerGroup);
            }
            validDependentProcessDefinitionList.add(dependentProcessDefinition);
        }
        return validDependentProcessDefinitionList;
    }

    private boolean isValidateScheduleTime(String schedule) {
        Map scheduleResult = JSONUtils.toMap((String)schedule);
        if (scheduleResult == null) {
            return false;
        }
        if (scheduleResult.containsKey("complementScheduleDateList") && scheduleResult.get("complementScheduleDateList") == null) {
            return false;
        }
        if (scheduleResult.containsKey("complementStartDate")) {
            String startDate = (String)scheduleResult.get("complementStartDate");
            String endDate = (String)scheduleResult.get("complementEndDate");
            if (startDate == null || endDate == null) {
                return false;
            }
            try {
                ZonedDateTime start = DateUtils.stringToZoneDateTime((String)startDate);
                ZonedDateTime end = DateUtils.stringToZoneDateTime((String)endDate);
                if (start == null || end == null) {
                    return false;
                }
                if (start.isAfter(end)) {
                    log.error("Complement data parameter error, start time should be before end time, startDate:{}, endDate:{}.", (Object)start, (Object)end);
                    return false;
                }
            }
            catch (Exception ex) {
                log.warn("Parse schedule time error, startDate:{}, endDate:{}.", (Object)startDate, (Object)endDate);
                return false;
            }
        }
        return true;
    }

    private String removeDuplicates(String scheduleTimeList) {
        if (StringUtils.isNotEmpty((CharSequence)scheduleTimeList)) {
            return Arrays.stream(scheduleTimeList.split(",")).map(String::trim).distinct().collect(Collectors.joining(","));
        }
        return null;
    }

    @Override
    public WorkflowExecuteDto queryExecutingWorkflowByProcessInstanceId(Integer processInstanceId) {
        ProcessInstance processInstance = this.processService.findProcessInstanceDetailById(processInstanceId.intValue()).orElse(null);
        if (processInstance == null) {
            log.error("Process instance does not exist, processInstanceId:{}.", (Object)processInstanceId);
            return null;
        }
        Host host = new Host(processInstance.getHost());
        WorkflowExecutingDataRequest requestCommand = new WorkflowExecutingDataRequest();
        requestCommand.setProcessInstanceId(processInstanceId);
        Message message = this.stateEventCallbackService.sendSync(host, requestCommand.convert2Command());
        if (message == null) {
            log.error("Query executing process instance from master error, processInstanceId:{}.", (Object)processInstanceId);
            return null;
        }
        WorkflowExecutingDataResponse responseCommand = (WorkflowExecutingDataResponse)JSONUtils.parseObject((byte[])message.getBody(), WorkflowExecutingDataResponse.class);
        return responseCommand.getWorkflowExecuteDto();
    }

    @Override
    public Map<String, Object> execStreamTaskInstance(User loginUser, long projectCode, long taskDefinitionCode, int taskDefinitionVersion, int warningGroupId, String workerGroup, String tenantCode, Long environmentCode, Map<String, String> startParams, int dryRun) {
        Project project = this.projectMapper.queryByCode(projectCode);
        Map<String, Object> result = this.projectService.checkProjectAndAuth(loginUser, project, projectCode, "project:executors:start");
        if (result.get("status") != Status.SUCCESS) {
            return result;
        }
        this.checkValidTenant(tenantCode);
        this.checkMasterExists();
        List<Server> masterServerList = this.monitorService.getServerListFromRegistry(true);
        Host host = new Host(masterServerList.get(0).getHost(), masterServerList.get(0).getPort());
        TaskExecuteStartMessage taskExecuteStartMessage = new TaskExecuteStartMessage();
        taskExecuteStartMessage.setExecutorId(loginUser.getId().intValue());
        taskExecuteStartMessage.setExecutorName(loginUser.getUserName());
        taskExecuteStartMessage.setProjectCode(projectCode);
        taskExecuteStartMessage.setTaskDefinitionCode(taskDefinitionCode);
        taskExecuteStartMessage.setTaskDefinitionVersion(taskDefinitionVersion);
        taskExecuteStartMessage.setWorkerGroup(workerGroup);
        taskExecuteStartMessage.setTenantCode(tenantCode);
        taskExecuteStartMessage.setWarningGroupId(warningGroupId);
        taskExecuteStartMessage.setEnvironmentCode(environmentCode);
        taskExecuteStartMessage.setStartParams(startParams);
        taskExecuteStartMessage.setDryRun(dryRun);
        Message response = this.stateEventCallbackService.sendSync(host, taskExecuteStartMessage.convert2Command());
        if (response != null) {
            log.info("Send task execute start command complete, response is {}.", (Object)response);
            this.putMsg(result, Status.SUCCESS, new Object[0]);
        } else {
            log.error("Start to execute stream task instance error, projectCode:{}, taskDefinitionCode:{}, taskVersion:{}.", new Object[]{projectCode, taskDefinitionCode, taskDefinitionVersion});
            this.putMsg(result, Status.START_TASK_INSTANCE_ERROR, new Object[0]);
        }
        return result;
    }
}

