/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.api.service.impl;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Lists;
import java.io.Serializable;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.MonitorService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
import org.apache.dolphinscheduler.common.enums.CycleEnum;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteStartCommand;
import org.apache.dolphinscheduler.remote.command.WorkflowExecutingDataRequestCommand;
import org.apache.dolphinscheduler.remote.command.WorkflowExecutingDataResponseCommand;
import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand;
import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.cron.CronUtils;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class ExecutorServiceImpl
extends BaseServiceImpl
implements ExecutorService {
    private static final Logger logger = LoggerFactory.getLogger(ExecutorServiceImpl.class);
    @Autowired
    private ProjectMapper projectMapper;
    @Autowired
    private ProjectService projectService;
    @Autowired
    private ProcessDefinitionMapper processDefinitionMapper;
    @Autowired
    private MonitorService monitorService;
    @Autowired
    private ProcessInstanceMapper processInstanceMapper;
    @Autowired
    private ProcessService processService;
    @Autowired
    private ProcessInstanceDao processInstanceDao;
    @Autowired
    private StateEventCallbackService stateEventCallbackService;
    @Autowired
    private TaskDefinitionMapper taskDefinitionMapper;
    @Autowired
    private ProcessTaskRelationMapper processTaskRelationMapper;
    @Autowired
    private TaskGroupQueueMapper taskGroupQueueMapper;

    @Override
    public Map<String, Object> execProcessInstance(User loginUser, long projectCode, long processDefinitionCode, String cronTime, CommandType commandType, FailureStrategy failureStrategy, String startNodeList, TaskDependType taskDependType, WarningType warningType, int warningGroupId, RunMode runMode, Priority processInstancePriority, String workerGroup, Long environmentCode, Integer timeout, Map<String, String> startParams, Integer expectedParallelismNumber, int dryRun, ComplementDependentMode complementDependentMode) {
        Project project = this.projectMapper.queryByCode(projectCode);
        Map<String, Object> result = this.projectService.checkProjectAndAuth(loginUser, project, projectCode, "project:executors:start");
        if (result.get("status") != Status.SUCCESS) {
            return result;
        }
        if (timeout <= 0 || timeout > 86400) {
            this.putMsg(result, Status.TASK_TIMEOUT_PARAMS_ERROR, new Object[0]);
            return result;
        }
        ProcessDefinition processDefinition = this.processDefinitionMapper.queryByCode(processDefinitionCode);
        result = this.checkProcessDefinitionValid(projectCode, processDefinition, processDefinitionCode, processDefinition.getVersion());
        if (result.get("status") != Status.SUCCESS) {
            return result;
        }
        if (!this.checkTenantSuitable(processDefinition)) {
            logger.error("There is not any valid tenant for the process definition, processDefinitionCode:{}, processDefinitionName:{}.", (Object)processDefinition.getCode(), (Object)processDefinition.getName());
            this.putMsg(result, Status.TENANT_NOT_SUITABLE, new Object[0]);
            return result;
        }
        if (!this.checkScheduleTimeNum(commandType, cronTime)) {
            this.putMsg(result, Status.SCHEDULE_TIME_NUMBER, new Object[0]);
            return result;
        }
        if (!this.checkMasterExists(result)) {
            return result;
        }
        int create = this.createCommand(commandType, processDefinition.getCode(), taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(), warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode, startParams, expectedParallelismNumber, dryRun, complementDependentMode);
        if (create > 0) {
            processDefinition.setWarningGroupId(warningGroupId);
            this.processDefinitionMapper.updateById(processDefinition);
            logger.info("Create command complete, processDefinitionCode:{}, commandCount:{}.", (Object)processDefinition.getCode(), (Object)create);
            this.putMsg(result, Status.SUCCESS, new Object[0]);
        } else {
            logger.error("Start process instance failed because create command error, processDefinitionCode:{}.", (Object)processDefinition.getCode());
            this.putMsg(result, Status.START_PROCESS_INSTANCE_ERROR, new Object[0]);
        }
        return result;
    }

    private boolean checkMasterExists(Map<String, Object> result) {
        List<Server> masterServers = this.monitorService.getServerListFromRegistry(true);
        if (masterServers.isEmpty()) {
            this.putMsg(result, Status.MASTER_NOT_EXISTS, new Object[0]);
            return false;
        }
        return true;
    }

    private boolean checkScheduleTimeNum(CommandType complementData, String cronTime) {
        String[] stringDates;
        if (!CommandType.COMPLEMENT_DATA.equals((Object)complementData)) {
            return true;
        }
        if (cronTime == null) {
            return true;
        }
        Map cronMap = JSONUtils.toMap((String)cronTime);
        return !cronMap.containsKey("complementScheduleDateList") || (stringDates = ((String)cronMap.get("complementScheduleDateList")).split(",")).length <= 100;
    }

    @Override
    public Map<String, Object> checkProcessDefinitionValid(long projectCode, ProcessDefinition processDefinition, long processDefineCode, Integer version) {
        HashMap<String, Object> result = new HashMap<String, Object>();
        if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
            logger.error("Process definition does not exist, projectCode:{}, processDefinitionCode:{}.", (Object)projectCode, (Object)processDefineCode);
            this.putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefineCode));
        } else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
            logger.warn("Process definition is not {}, processDefinitionCode:{}, version:{}.", new Object[]{ReleaseState.ONLINE.getDescp(), processDefineCode, version});
            this.putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(processDefineCode), version);
        } else if (!this.checkSubProcessDefinitionValid(processDefinition)) {
            logger.warn("Subprocess definition of process definition is not {}, processDefinitionCode:{}.", (Object)ReleaseState.ONLINE.getDescp(), (Object)processDefineCode);
            this.putMsg(result, Status.SUB_PROCESS_DEFINE_NOT_RELEASE, new Object[0]);
        } else {
            result.put("status", (Object)Status.SUCCESS);
        }
        return result;
    }

    @Override
    public boolean checkSubProcessDefinitionValid(ProcessDefinition processDefinition) {
        List processTaskRelations = this.processTaskRelationMapper.queryDownstreamByProcessDefinitionCode(processDefinition.getCode());
        if (processTaskRelations.isEmpty()) {
            return true;
        }
        Set relationCodes = processTaskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toSet());
        List taskDefinitions = this.taskDefinitionMapper.queryByCodeList(relationCodes);
        HashSet processDefinitionCodeSet = new HashSet();
        taskDefinitions.stream().filter(task -> "SUB_PROCESS".equalsIgnoreCase(task.getTaskType())).forEach(taskDefinition -> processDefinitionCodeSet.add(Long.valueOf(JSONUtils.getNodeString((String)taskDefinition.getTaskParams(), (String)"processDefinitionCode"))));
        if (processDefinitionCodeSet.isEmpty()) {
            return true;
        }
        List processDefinitions = this.processDefinitionMapper.queryByCodes(processDefinitionCodeSet);
        return processDefinitions.stream().filter(definition -> definition.getReleaseState().equals((Object)ReleaseState.OFFLINE)).collect(Collectors.toSet()).isEmpty();
    }

    @Override
    public Map<String, Object> execute(User loginUser, long projectCode, Integer processInstanceId, ExecuteType executeType) {
        Object startParamsJson;
        Project project = this.projectMapper.queryByCode(projectCode);
        Map<String, Object> result = this.projectService.checkProjectAndAuth(loginUser, project, projectCode, ApiFuncIdentificationConstant.map.get((Object)executeType));
        if (result.get("status") != Status.SUCCESS) {
            return result;
        }
        if (!this.checkMasterExists(result)) {
            return result;
        }
        ProcessInstance processInstance = (ProcessInstance)this.processService.findProcessInstanceDetailById(processInstanceId.intValue()).orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId));
        ProcessDefinition processDefinition = this.processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
        processDefinition.setReleaseState(ReleaseState.ONLINE);
        if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE && (result = this.checkProcessDefinitionValid(projectCode, processDefinition, processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion())).get("status") != Status.SUCCESS) {
            return result;
        }
        result = this.checkExecuteType(processInstance, executeType);
        if (result.get("status") != Status.SUCCESS) {
            return result;
        }
        if (!this.checkTenantSuitable(processDefinition)) {
            logger.error("There is not any valid tenant for the process definition, processDefinitionId:{}, processDefinitionCode:{}, ", (Object)processDefinition.getId(), (Object)processDefinition.getName());
            this.putMsg(result, Status.TENANT_NOT_SUITABLE, new Object[0]);
        }
        Map commandMap = (Map)JSONUtils.parseObject((String)processInstance.getCommandParam(), (TypeReference)new TypeReference<Map<String, Object>>(){});
        String startParams = null;
        if (MapUtils.isNotEmpty((Map)commandMap) && executeType == ExecuteType.REPEAT_RUNNING && (startParamsJson = commandMap.get("StartParams")) != null) {
            startParams = startParamsJson.toString();
        }
        switch (executeType) {
            case REPEAT_RUNNING: {
                result = this.insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams);
                break;
            }
            case RECOVER_SUSPENDED_PROCESS: {
                result = this.insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams);
                break;
            }
            case START_FAILURE_TASK_PROCESS: {
                result = this.insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams);
                break;
            }
            case STOP: {
                if (processInstance.getState() == WorkflowExecutionStatus.READY_STOP) {
                    this.putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
                    break;
                }
                result = this.updateProcessInstancePrepare(processInstance, CommandType.STOP, WorkflowExecutionStatus.READY_STOP);
                break;
            }
            case PAUSE: {
                if (processInstance.getState() == WorkflowExecutionStatus.READY_PAUSE) {
                    logger.warn("Process instance status is already {}, processInstanceName:{}.", (Object)WorkflowExecutionStatus.READY_STOP.getDesc(), (Object)processInstance.getName());
                    this.putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
                    break;
                }
                result = this.updateProcessInstancePrepare(processInstance, CommandType.PAUSE, WorkflowExecutionStatus.READY_PAUSE);
                break;
            }
            default: {
                logger.warn("Unknown execute type for process instance, processInstanceId:{}.", (Object)processInstance.getId());
                this.putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown execute type");
            }
        }
        return result;
    }

    @Override
    public Map<String, Object> forceStartTaskInstance(User loginUser, int queueId) {
        HashMap<String, Object> result = new HashMap<String, Object>();
        TaskGroupQueue taskGroupQueue = (TaskGroupQueue)this.taskGroupQueueMapper.selectById((Serializable)Integer.valueOf(queueId));
        ProcessInstance processInstance = (ProcessInstance)this.processInstanceMapper.selectById((Serializable)Integer.valueOf(taskGroupQueue.getProcessId()));
        if (processInstance == null) {
            logger.error("Process instance does not exist, projectCode:{}, processInstanceId:{}.", (Object)taskGroupQueue.getProjectCode(), (Object)taskGroupQueue.getProcessId());
            this.putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, taskGroupQueue.getProcessId());
            return result;
        }
        if (!this.checkMasterExists(result)) {
            return result;
        }
        return this.forceStart(processInstance, taskGroupQueue);
    }

    private boolean checkTenantSuitable(ProcessDefinition processDefinition) {
        Tenant tenant = this.processService.getTenantForProcess(processDefinition.getTenantId(), processDefinition.getUserId());
        return tenant != null;
    }

    private Map<String, Object> checkExecuteType(ProcessInstance processInstance, ExecuteType executeType) {
        HashMap<String, Object> result = new HashMap<String, Object>();
        WorkflowExecutionStatus executionStatus = processInstance.getState();
        boolean checkResult = false;
        switch (executeType) {
            case PAUSE: {
                if (!executionStatus.isRunning()) break;
                checkResult = true;
                break;
            }
            case STOP: {
                if (!executionStatus.canStop()) break;
                checkResult = true;
                break;
            }
            case REPEAT_RUNNING: {
                if (!executionStatus.isFinished()) break;
                checkResult = true;
                break;
            }
            case START_FAILURE_TASK_PROCESS: {
                if (!executionStatus.isFailure()) break;
                checkResult = true;
                break;
            }
            case RECOVER_SUSPENDED_PROCESS: {
                if (!executionStatus.isPause() && !executionStatus.isStop()) break;
                checkResult = true;
                break;
            }
        }
        if (!checkResult) {
            this.putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, processInstance.getName(), executionStatus.toString(), executeType.toString());
        } else {
            this.putMsg(result, Status.SUCCESS, new Object[0]);
        }
        return result;
    }

    private Map<String, Object> updateProcessInstancePrepare(ProcessInstance processInstance, CommandType commandType, WorkflowExecutionStatus executionStatus) {
        HashMap<String, Object> result = new HashMap<String, Object>();
        processInstance.setCommandType(commandType);
        processInstance.addHistoryCmd(commandType);
        processInstance.setStateWithDesc(executionStatus, commandType.getDescp() + "by ui");
        int update = this.processInstanceDao.updateProcessInstance(processInstance);
        if (update > 0) {
            logger.info("Process instance state is updated to {} in database, processInstanceName:{}.", (Object)executionStatus.getDesc(), (Object)processInstance.getName());
            WorkflowStateEventChangeCommand workflowStateEventChangeCommand = new WorkflowStateEventChangeCommand(processInstance.getId().intValue(), 0, processInstance.getState(), processInstance.getId().intValue(), 0);
            Host host = new Host(processInstance.getHost());
            this.stateEventCallbackService.sendResult(host, workflowStateEventChangeCommand.convert2Command());
            this.putMsg(result, Status.SUCCESS, new Object[0]);
        } else {
            this.putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR, new Object[0]);
        }
        return result;
    }

    private Map<String, Object> forceStart(ProcessInstance processInstance, TaskGroupQueue taskGroupQueue) {
        HashMap<String, Object> result = new HashMap<String, Object>();
        if (taskGroupQueue.getStatus() != TaskGroupQueueStatus.WAIT_QUEUE) {
            this.putMsg(result, Status.TASK_GROUP_QUEUE_ALREADY_START, new Object[0]);
            return result;
        }
        taskGroupQueue.setForceStart(Flag.YES.getCode());
        this.processService.updateTaskGroupQueue(taskGroupQueue);
        this.processService.sendStartTask2Master(processInstance, taskGroupQueue.getTaskId(), org.apache.dolphinscheduler.remote.command.CommandType.TASK_FORCE_STATE_EVENT_REQUEST);
        this.putMsg(result, Status.SUCCESS, new Object[0]);
        return result;
    }

    private Map<String, Object> insertCommand(User loginUser, Integer instanceId, long processDefinitionCode, int processVersion, CommandType commandType, String startParams) {
        HashMap<String, Object> result = new HashMap<String, Object>();
        HashMap<String, Object> cmdParam = new HashMap<String, Object>();
        cmdParam.put("ProcessInstanceId", instanceId);
        if (!StringUtils.isEmpty((CharSequence)startParams)) {
            cmdParam.put("StartParams", startParams);
        }
        org.apache.dolphinscheduler.dao.entity.Command command = new org.apache.dolphinscheduler.dao.entity.Command();
        command.setCommandType(commandType);
        command.setProcessDefinitionCode(processDefinitionCode);
        command.setCommandParam(JSONUtils.toJsonString(cmdParam));
        command.setExecutorId(loginUser.getId().intValue());
        command.setProcessDefinitionVersion(processVersion);
        command.setProcessInstanceId(instanceId.intValue());
        if (!this.processService.verifyIsNeedCreateCommand(command)) {
            logger.warn("Process instance is executing the command, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.", new Object[]{processDefinitionCode, processVersion, instanceId});
            this.putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, String.valueOf(processDefinitionCode));
            return result;
        }
        int create = this.processService.createCommand(command);
        if (create > 0) {
            this.putMsg(result, Status.SUCCESS, new Object[0]);
        } else {
            logger.error("Execute process instance failed because create {} command error, processDefinitionCode:{}, processDefinitionVersion:{}\uff0c processInstanceId:{}.", new Object[]{command.getCommandType().getDescp(), command.getProcessDefinitionCode(), processVersion, instanceId});
            this.putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR, new Object[0]);
        }
        return result;
    }

    @Override
    public Map<String, Object> startCheckByProcessDefinedCode(long processDefinitionCode) {
        List processDefinitionList;
        HashMap<String, Object> result = new HashMap<String, Object>();
        ProcessDefinition processDefinition = this.processDefinitionMapper.queryByCode(processDefinitionCode);
        if (processDefinition == null) {
            logger.error("process definition is not found");
            this.putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "processDefinitionCode");
            return result;
        }
        ArrayList codes = new ArrayList();
        this.processService.recurseFindSubProcess(processDefinition.getCode(), codes);
        if (!codes.isEmpty() && (processDefinitionList = this.processDefinitionMapper.queryByCodes(codes)) != null) {
            for (ProcessDefinition processDefinitionTmp : processDefinitionList) {
                if (processDefinitionTmp.getReleaseState() == ReleaseState.ONLINE) continue;
                logger.warn("Subprocess definition {} of process definition {} is not {}.", new Object[]{processDefinitionTmp.getName(), processDefinition.getName(), ReleaseState.ONLINE.getDescp()});
                this.putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinitionTmp.getName());
                logger.info("not release process definition id: {} , name : {}", (Object)processDefinitionTmp.getId(), (Object)processDefinitionTmp.getName());
                return result;
            }
        }
        this.putMsg(result, Status.SUCCESS, new Object[0]);
        return result;
    }

    private int createCommand(CommandType commandType, long processDefineCode, TaskDependType nodeDep, FailureStrategy failureStrategy, String startNodeList, String schedule, WarningType warningType, int executorId, int warningGroupId, RunMode runMode, Priority processInstancePriority, String workerGroup, Long environmentCode, Map<String, String> startParams, Integer expectedParallelismNumber, int dryRun, ComplementDependentMode complementDependentMode) {
        org.apache.dolphinscheduler.dao.entity.Command command = new org.apache.dolphinscheduler.dao.entity.Command();
        HashMap<String, String> cmdParam = new HashMap<String, String>();
        if (commandType == null) {
            command.setCommandType(CommandType.START_PROCESS);
        } else {
            command.setCommandType(commandType);
        }
        command.setProcessDefinitionCode(processDefineCode);
        if (nodeDep != null) {
            command.setTaskDependType(nodeDep);
        }
        if (failureStrategy != null) {
            command.setFailureStrategy(failureStrategy);
        }
        if (!StringUtils.isEmpty((CharSequence)startNodeList)) {
            cmdParam.put("StartNodeList", startNodeList);
        }
        if (warningType != null) {
            command.setWarningType(warningType);
        }
        if (startParams != null && startParams.size() > 0) {
            cmdParam.put("StartParams", JSONUtils.toJsonString(startParams));
        }
        command.setCommandParam(JSONUtils.toJsonString(cmdParam));
        command.setExecutorId(executorId);
        command.setWarningGroupId(Integer.valueOf(warningGroupId));
        command.setProcessInstancePriority(processInstancePriority);
        command.setWorkerGroup(workerGroup);
        command.setEnvironmentCode(environmentCode);
        command.setDryRun(dryRun);
        ProcessDefinition processDefinition = this.processService.findProcessDefinitionByCode(Long.valueOf(processDefineCode));
        if (processDefinition != null) {
            command.setProcessDefinitionVersion(processDefinition.getVersion());
        }
        command.setProcessInstanceId(0);
        if (commandType == CommandType.COMPLEMENT_DATA) {
            if (schedule == null || StringUtils.isEmpty((CharSequence)schedule)) {
                logger.error("Create {} type command error because parameter schedule is invalid.", (Object)command.getCommandType().getDescp());
                return 0;
            }
            if (!this.isValidateScheduleTime(schedule)) {
                return 0;
            }
            try {
                logger.info("Start to create {} command, processDefinitionCode:{}.", (Object)command.getCommandType().getDescp(), (Object)processDefineCode);
                return this.createComplementCommandList(schedule, runMode, command, expectedParallelismNumber, complementDependentMode);
            }
            catch (CronParseException cronParseException) {
                return 0;
            }
        }
        command.setCommandParam(JSONUtils.toJsonString(cmdParam));
        return this.processService.createCommand(command);
    }

    protected int createComplementCommandList(String scheduleTimeParam, RunMode runMode, org.apache.dolphinscheduler.dao.entity.Command command, Integer expectedParallelismNumber, ComplementDependentMode complementDependentMode) throws CronParseException {
        int createCount = 0;
        String startDate = null;
        String endDate = null;
        String dateList = null;
        int dependentProcessDefinitionCreateCount = 0;
        runMode = runMode == null ? RunMode.RUN_MODE_SERIAL : runMode;
        Map cmdParam = JSONUtils.toMap((String)command.getCommandParam());
        Map scheduleParam = JSONUtils.toMap((String)scheduleTimeParam);
        if (scheduleParam.containsKey("complementScheduleDateList")) {
            dateList = (String)scheduleParam.get("complementScheduleDateList");
            dateList = this.removeDuplicates(dateList);
        }
        if (scheduleParam.containsKey("complementStartDate") && scheduleParam.containsKey("complementEndDate")) {
            startDate = (String)scheduleParam.get("complementStartDate");
            endDate = (String)scheduleParam.get("complementEndDate");
        }
        switch (runMode) {
            case RUN_MODE_SERIAL: {
                logger.info("RunMode of {} command is serial run, processDefinitionCode:{}.", (Object)command.getCommandType().getDescp(), (Object)command.getProcessDefinitionCode());
                if (StringUtils.isNotEmpty((CharSequence)dateList)) {
                    cmdParam.put("complementScheduleDateList", dateList);
                    command.setCommandParam(JSONUtils.toJsonString((Object)cmdParam));
                    createCount = this.processService.createCommand(command);
                    if (createCount > 0) {
                        logger.info("Create {} command complete, processDefinitionCode:{}", (Object)command.getCommandType().getDescp(), (Object)command.getProcessDefinitionCode());
                    } else {
                        logger.error("Create {} command error, processDefinitionCode:{}", (Object)command.getCommandType().getDescp(), (Object)command.getProcessDefinitionCode());
                    }
                }
                if (startDate == null || endDate == null) break;
                cmdParam.put("complementStartDate", startDate);
                cmdParam.put("complementEndDate", endDate);
                command.setCommandParam(JSONUtils.toJsonString((Object)cmdParam));
                createCount = this.processService.createCommand(command);
                if (createCount > 0) {
                    logger.info("Create {} command complete, processDefinitionCode:{}", (Object)command.getCommandType().getDescp(), (Object)command.getProcessDefinitionCode());
                } else {
                    logger.error("Create {} command error, processDefinitionCode:{}", (Object)command.getCommandType().getDescp(), (Object)command.getProcessDefinitionCode());
                }
                List schedules = this.processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
                if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
                    logger.info("Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.", (Object)command.getProcessDefinitionCode());
                    break;
                }
                logger.info("Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.", (Object)command.getProcessDefinitionCode());
                dependentProcessDefinitionCreateCount += this.createComplementDependentCommand(schedules, command);
                break;
            }
            case RUN_MODE_PARALLEL: {
                logger.info("RunMode of {} command is parallel run, processDefinitionCode:{}.", (Object)command.getCommandType().getDescp(), (Object)command.getProcessDefinitionCode());
                if (startDate != null && endDate != null) {
                    List schedules = this.processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
                    List listDate = CronUtils.getSelfFireDateList((ZonedDateTime)DateUtils.stringToZoneDateTime((String)startDate), (ZonedDateTime)DateUtils.stringToZoneDateTime((String)endDate), (List)schedules);
                    int listDateSize = listDate.size();
                    createCount = listDate.size();
                    if (!CollectionUtils.isEmpty((Collection)listDate)) {
                        if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
                            createCount = Math.min(createCount, expectedParallelismNumber);
                        }
                        logger.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.", (Object)createCount);
                        int itemsPerCommand = listDateSize / createCount;
                        int remainingItems = listDateSize % createCount;
                        int startDateIndex = 0;
                        int endDateIndex = 0;
                        for (int i = 1; i <= createCount; ++i) {
                            int extra = i <= remainingItems ? 1 : 0;
                            int singleCommandItems = itemsPerCommand + extra;
                            if (i == 1) {
                                endDateIndex += singleCommandItems - 1;
                            } else {
                                startDateIndex = endDateIndex + 1;
                                endDateIndex += singleCommandItems;
                            }
                            cmdParam.put("complementStartDate", DateUtils.dateToString((ZonedDateTime)((ZonedDateTime)listDate.get(startDateIndex))));
                            cmdParam.put("complementEndDate", DateUtils.dateToString((ZonedDateTime)((ZonedDateTime)listDate.get(endDateIndex))));
                            command.setCommandParam(JSONUtils.toJsonString((Object)cmdParam));
                            logger.info("Creating command, commandInfo:{}.", (Object)command);
                            if (this.processService.createCommand(command) > 0) {
                                logger.info("Create {} command complete, processDefinitionCode:{}", (Object)command.getCommandType().getDescp(), (Object)command.getProcessDefinitionCode());
                            } else {
                                logger.error("Create {} command error, processDefinitionCode:{}", (Object)command.getCommandType().getDescp(), (Object)command.getProcessDefinitionCode());
                            }
                            if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
                                logger.info("Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.", (Object)command.getProcessDefinitionCode());
                                continue;
                            }
                            logger.info("Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.", (Object)command.getProcessDefinitionCode());
                            dependentProcessDefinitionCreateCount += this.createComplementDependentCommand(schedules, command);
                        }
                    }
                }
                if (!StringUtils.isNotEmpty((CharSequence)dateList)) break;
                List<String> listDate = Arrays.asList(dateList.split(","));
                createCount = listDate.size();
                if (CollectionUtils.isEmpty(listDate)) break;
                if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
                    createCount = Math.min(createCount, expectedParallelismNumber);
                }
                logger.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.", (Object)createCount);
                for (List stringDate : Lists.partition(listDate, (int)createCount)) {
                    cmdParam.put("complementScheduleDateList", String.join((CharSequence)",", stringDate));
                    command.setCommandParam(JSONUtils.toJsonString((Object)cmdParam));
                    logger.info("Creating command, commandInfo:{}.", (Object)command);
                    if (this.processService.createCommand(command) > 0) {
                        logger.info("Create {} command complete, processDefinitionCode:{}", (Object)command.getCommandType().getDescp(), (Object)command.getProcessDefinitionCode());
                        continue;
                    }
                    logger.error("Create {} command error, processDefinitionCode:{}", (Object)command.getCommandType().getDescp(), (Object)command.getProcessDefinitionCode());
                }
                break;
            }
        }
        logger.info("create complement command count: {}, create dependent complement command count: {}", (Object)createCount, (Object)dependentProcessDefinitionCreateCount);
        return createCount;
    }

    public int createComplementDependentCommand(List<Schedule> schedules, org.apache.dolphinscheduler.dao.entity.Command command) {
        org.apache.dolphinscheduler.dao.entity.Command dependentCommand;
        int dependentProcessDefinitionCreateCount = 0;
        try {
            dependentCommand = (org.apache.dolphinscheduler.dao.entity.Command)BeanUtils.cloneBean((Object)command);
        }
        catch (Exception e) {
            logger.error("copy dependent command error: ", (Throwable)e);
            return dependentProcessDefinitionCreateCount;
        }
        List<DependentProcessDefinition> dependentProcessDefinitionList = this.getComplementDependentDefinitionList(dependentCommand.getProcessDefinitionCode(), CronUtils.getMaxCycle((String)schedules.get(0).getCrontab()), dependentCommand.getWorkerGroup());
        dependentCommand.setTaskDependType(TaskDependType.TASK_POST);
        for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) {
            dependentCommand.setId(null);
            dependentCommand.setProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode());
            dependentCommand.setProcessDefinitionVersion(dependentProcessDefinition.getProcessDefinitionVersion());
            dependentCommand.setWorkerGroup(dependentProcessDefinition.getWorkerGroup());
            Map cmdParam = JSONUtils.toMap((String)dependentCommand.getCommandParam());
            cmdParam.put("StartNodeList", String.valueOf(dependentProcessDefinition.getTaskDefinitionCode()));
            dependentCommand.setCommandParam(JSONUtils.toJsonString((Object)cmdParam));
            dependentProcessDefinitionCreateCount += this.processService.createCommand(dependentCommand);
        }
        return dependentProcessDefinitionCreateCount;
    }

    private List<DependentProcessDefinition> getComplementDependentDefinitionList(long processDefinitionCode, CycleEnum processDefinitionCycle, String workerGroup) {
        List dependentProcessDefinitionList = this.processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode);
        return this.checkDependentProcessDefinitionValid(dependentProcessDefinitionList, processDefinitionCycle, workerGroup, processDefinitionCode);
    }

    private List<DependentProcessDefinition> checkDependentProcessDefinitionValid(List<DependentProcessDefinition> dependentProcessDefinitionList, CycleEnum processDefinitionCycle, String workerGroup, long upstreamProcessDefinitionCode) {
        ArrayList<DependentProcessDefinition> validDependentProcessDefinitionList = new ArrayList<DependentProcessDefinition>();
        List processDefinitionCodeList = dependentProcessDefinitionList.stream().map(DependentProcessDefinition::getProcessDefinitionCode).collect(Collectors.toList());
        Map processDefinitionWorkerGroupMap = this.processService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList);
        for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) {
            if (dependentProcessDefinition.getDependentCycle(upstreamProcessDefinitionCode) != processDefinitionCycle) continue;
            if (processDefinitionWorkerGroupMap.get(dependentProcessDefinition.getProcessDefinitionCode()) == null) {
                dependentProcessDefinition.setWorkerGroup(workerGroup);
            }
            validDependentProcessDefinitionList.add(dependentProcessDefinition);
        }
        return validDependentProcessDefinitionList;
    }

    private boolean isValidateScheduleTime(String schedule) {
        Map scheduleResult = JSONUtils.toMap((String)schedule);
        if (scheduleResult == null) {
            return false;
        }
        if (scheduleResult.containsKey("complementScheduleDateList") && scheduleResult.get("complementScheduleDateList") == null) {
            return false;
        }
        if (scheduleResult.containsKey("complementStartDate")) {
            String startDate = (String)scheduleResult.get("complementStartDate");
            String endDate = (String)scheduleResult.get("complementEndDate");
            if (startDate == null || endDate == null) {
                return false;
            }
            try {
                ZonedDateTime start = DateUtils.stringToZoneDateTime((String)startDate);
                ZonedDateTime end = DateUtils.stringToZoneDateTime((String)endDate);
                if (start == null || end == null) {
                    return false;
                }
                if (start.isAfter(end)) {
                    logger.error("Complement data parameter error, start time should be before end time, startDate:{}, endDate:{}.", (Object)start, (Object)end);
                    return false;
                }
            }
            catch (Exception ex) {
                logger.warn("Parse schedule time error, startDate: {}, endDate: {}", (Object)startDate, (Object)endDate);
                return false;
            }
        }
        return true;
    }

    private String removeDuplicates(String scheduleTimeList) {
        if (StringUtils.isNotEmpty((CharSequence)scheduleTimeList)) {
            Set dateSet = Arrays.stream(scheduleTimeList.split(",")).map(String::trim).collect(Collectors.toSet());
            return String.join((CharSequence)",", dateSet);
        }
        return null;
    }

    @Override
    public WorkflowExecuteDto queryExecutingWorkflowByProcessInstanceId(Integer processInstanceId) {
        ProcessInstance processInstance = this.processService.findProcessInstanceDetailById(processInstanceId.intValue()).orElse(null);
        if (processInstance == null) {
            return null;
        }
        Host host = new Host(processInstance.getHost());
        WorkflowExecutingDataRequestCommand requestCommand = new WorkflowExecutingDataRequestCommand();
        requestCommand.setProcessInstanceId(processInstanceId);
        Command command = this.stateEventCallbackService.sendSync(host, requestCommand.convert2Command());
        if (command == null) {
            logger.error("Query executing process instance from master error, processInstanceId:{}.", (Object)processInstanceId);
            return null;
        }
        WorkflowExecutingDataResponseCommand responseCommand = (WorkflowExecutingDataResponseCommand)JSONUtils.parseObject((byte[])command.getBody(), WorkflowExecutingDataResponseCommand.class);
        return responseCommand.getWorkflowExecuteDto();
    }

    @Override
    public Map<String, Object> execStreamTaskInstance(User loginUser, long projectCode, long taskDefinitionCode, int taskDefinitionVersion, int warningGroupId, String workerGroup, Long environmentCode, Map<String, String> startParams, int dryRun) {
        Project project = this.projectMapper.queryByCode(projectCode);
        Map<String, Object> result = this.projectService.checkProjectAndAuth(loginUser, project, projectCode, "project:executors:start");
        if (result.get("status") != Status.SUCCESS) {
            return result;
        }
        if (!this.checkMasterExists(result)) {
            return result;
        }
        List<Server> masterServerList = this.monitorService.getServerListFromRegistry(true);
        Host host = new Host(masterServerList.get(0).getHost(), masterServerList.get(0).getPort());
        TaskExecuteStartCommand taskExecuteStartCommand = new TaskExecuteStartCommand();
        taskExecuteStartCommand.setExecutorId(loginUser.getId().intValue());
        taskExecuteStartCommand.setExecutorName(loginUser.getUserName());
        taskExecuteStartCommand.setProjectCode(projectCode);
        taskExecuteStartCommand.setTaskDefinitionCode(taskDefinitionCode);
        taskExecuteStartCommand.setTaskDefinitionVersion(taskDefinitionVersion);
        taskExecuteStartCommand.setWorkerGroup(workerGroup);
        taskExecuteStartCommand.setWarningGroupId(warningGroupId);
        taskExecuteStartCommand.setEnvironmentCode(environmentCode);
        taskExecuteStartCommand.setStartParams(startParams);
        taskExecuteStartCommand.setDryRun(dryRun);
        Command response = this.stateEventCallbackService.sendSync(host, taskExecuteStartCommand.convert2Command());
        if (response != null) {
            this.putMsg(result, Status.SUCCESS, new Object[0]);
        } else {
            logger.error("Start to execute stream task instance error, projectCode:{}, taskDefinitionCode:{}, taskVersion:{}.", new Object[]{projectCode, taskDefinitionCode, taskDefinitionVersion});
            this.putMsg(result, Status.START_TASK_INSTANCE_ERROR, new Object[0]);
        }
        return result;
    }
}

