/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.api.service.impl;

import com.fasterxml.jackson.core.type.TypeReference;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.MonitorService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
import org.apache.dolphinscheduler.common.enums.CycleEnum;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class ExecutorServiceImpl
extends BaseServiceImpl
implements ExecutorService {
    private static final Logger logger = LoggerFactory.getLogger(ExecutorServiceImpl.class);
    @Autowired
    private ProjectMapper projectMapper;
    @Autowired
    private ProjectService projectService;
    @Autowired
    private ProcessDefinitionMapper processDefinitionMapper;
    @Autowired
    private MonitorService monitorService;
    @Autowired
    private ProcessInstanceMapper processInstanceMapper;
    @Autowired
    private ProcessService processService;
    @Autowired
    StateEventCallbackService stateEventCallbackService;

    @Override
    public Map<String, Object> execProcessInstance(User loginUser, long projectCode, long processDefinitionCode, String cronTime, CommandType commandType, FailureStrategy failureStrategy, String startNodeList, TaskDependType taskDependType, WarningType warningType, int warningGroupId, RunMode runMode, Priority processInstancePriority, String workerGroup, Long environmentCode, Integer timeout, Map<String, String> startParams, Integer expectedParallelismNumber, int dryRun, ComplementDependentMode complementDependentMode) {
        Project project = this.projectMapper.queryByCode(projectCode);
        Map<String, Object> result = this.projectService.checkProjectAndAuth(loginUser, project, projectCode);
        if (result.get("status") != Status.SUCCESS) {
            return result;
        }
        if (timeout <= 0 || timeout > 86400) {
            this.putMsg(result, Status.TASK_TIMEOUT_PARAMS_ERROR, new Object[0]);
            return result;
        }
        ProcessDefinition processDefinition = this.processDefinitionMapper.queryByCode(processDefinitionCode);
        result = this.checkProcessDefinitionValid(projectCode, processDefinition, processDefinitionCode);
        if (result.get("status") != Status.SUCCESS) {
            return result;
        }
        if (!this.checkTenantSuitable(processDefinition)) {
            logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ", (Object)processDefinition.getId(), (Object)processDefinition.getName());
            this.putMsg(result, Status.TENANT_NOT_SUITABLE, new Object[0]);
            return result;
        }
        if (!this.checkMasterExists(result)) {
            return result;
        }
        int create = this.createCommand(commandType, processDefinition.getCode(), taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(), warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode, startParams, expectedParallelismNumber, dryRun, complementDependentMode);
        if (create > 0) {
            processDefinition.setWarningGroupId(warningGroupId);
            this.processDefinitionMapper.updateById(processDefinition);
            this.putMsg(result, Status.SUCCESS, new Object[0]);
        } else {
            this.putMsg(result, Status.START_PROCESS_INSTANCE_ERROR, new Object[0]);
        }
        return result;
    }

    private boolean checkMasterExists(Map<String, Object> result) {
        List<Server> masterServers = this.monitorService.getServerListFromRegistry(true);
        if (masterServers.isEmpty()) {
            this.putMsg(result, Status.MASTER_NOT_EXISTS, new Object[0]);
            return false;
        }
        return true;
    }

    @Override
    public Map<String, Object> checkProcessDefinitionValid(long projectCode, ProcessDefinition processDefinition, long processDefineCode) {
        HashMap<String, Object> result = new HashMap<String, Object>();
        if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
            this.putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineCode);
        } else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
            this.putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefineCode);
        } else {
            result.put("status", (Object)Status.SUCCESS);
        }
        return result;
    }

    @Override
    public Map<String, Object> execute(User loginUser, long projectCode, Integer processInstanceId, ExecuteType executeType) {
        Object startParamsJson;
        Project project = this.projectMapper.queryByCode(projectCode);
        Map<String, Object> result = this.projectService.checkProjectAndAuth(loginUser, project, projectCode);
        if (result.get("status") != Status.SUCCESS) {
            return result;
        }
        if (!this.checkMasterExists(result)) {
            return result;
        }
        ProcessInstance processInstance = this.processService.findProcessInstanceDetailById(processInstanceId.intValue());
        if (processInstance == null) {
            this.putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
            return result;
        }
        ProcessDefinition processDefinition = this.processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
        if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE && (result = this.checkProcessDefinitionValid(projectCode, processDefinition, processInstance.getProcessDefinitionCode())).get("status") != Status.SUCCESS) {
            return result;
        }
        result = this.checkExecuteType(processInstance, executeType);
        if (result.get("status") != Status.SUCCESS) {
            return result;
        }
        if (!this.checkTenantSuitable(processDefinition)) {
            logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ", (Object)processDefinition.getId(), (Object)processDefinition.getName());
            this.putMsg(result, Status.TENANT_NOT_SUITABLE, new Object[0]);
        }
        Map commandMap = (Map)JSONUtils.parseObject((String)processInstance.getCommandParam(), (TypeReference)new TypeReference<Map<String, Object>>(){});
        String startParams = null;
        if (MapUtils.isNotEmpty((Map)commandMap) && executeType == ExecuteType.REPEAT_RUNNING && (startParamsJson = commandMap.get("StartParams")) != null) {
            startParams = startParamsJson.toString();
        }
        switch (executeType) {
            case REPEAT_RUNNING: {
                result = this.insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams);
                break;
            }
            case RECOVER_SUSPENDED_PROCESS: {
                result = this.insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams);
                break;
            }
            case START_FAILURE_TASK_PROCESS: {
                result = this.insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams);
                break;
            }
            case STOP: {
                if (processInstance.getState() == ExecutionStatus.READY_STOP) {
                    this.putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
                    break;
                }
                result = this.updateProcessInstancePrepare(processInstance, CommandType.STOP, ExecutionStatus.READY_STOP);
                break;
            }
            case PAUSE: {
                if (processInstance.getState() == ExecutionStatus.READY_PAUSE) {
                    this.putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
                    break;
                }
                result = this.updateProcessInstancePrepare(processInstance, CommandType.PAUSE, ExecutionStatus.READY_PAUSE);
                break;
            }
            default: {
                logger.error("unknown execute type : {}", (Object)executeType);
                this.putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown execute type");
            }
        }
        return result;
    }

    private boolean checkTenantSuitable(ProcessDefinition processDefinition) {
        Tenant tenant = this.processService.getTenantForProcess(processDefinition.getTenantId(), processDefinition.getUserId());
        return tenant != null;
    }

    private Map<String, Object> checkExecuteType(ProcessInstance processInstance, ExecuteType executeType) {
        HashMap<String, Object> result = new HashMap<String, Object>();
        ExecutionStatus executionStatus = processInstance.getState();
        boolean checkResult = false;
        switch (executeType) {
            case STOP: 
            case PAUSE: {
                if (!executionStatus.typeIsRunning()) break;
                checkResult = true;
                break;
            }
            case REPEAT_RUNNING: {
                if (!executionStatus.typeIsFinished()) break;
                checkResult = true;
                break;
            }
            case START_FAILURE_TASK_PROCESS: {
                if (!executionStatus.typeIsFailure()) break;
                checkResult = true;
                break;
            }
            case RECOVER_SUSPENDED_PROCESS: {
                if (!executionStatus.typeIsPause() && !executionStatus.typeIsCancel()) break;
                checkResult = true;
                break;
            }
        }
        if (!checkResult) {
            this.putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, processInstance.getName(), executionStatus.toString(), executeType.toString());
        } else {
            this.putMsg(result, Status.SUCCESS, new Object[0]);
        }
        return result;
    }

    private Map<String, Object> updateProcessInstancePrepare(ProcessInstance processInstance, CommandType commandType, ExecutionStatus executionStatus) {
        HashMap<String, Object> result = new HashMap<String, Object>();
        processInstance.setCommandType(commandType);
        processInstance.addHistoryCmd(commandType);
        processInstance.setState(executionStatus);
        int update = this.processService.updateProcessInstance(processInstance);
        if (update > 0) {
            String host = processInstance.getHost();
            String address = host.split(":")[0];
            int port = Integer.parseInt(host.split(":")[1]);
            StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(processInstance.getId(), 0, processInstance.getState(), processInstance.getId(), 0);
            this.stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command());
            this.putMsg(result, Status.SUCCESS, new Object[0]);
        } else {
            this.putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR, new Object[0]);
        }
        return result;
    }

    private Map<String, Object> forceStartTaskInstance(ProcessInstance processInstance, int taskId) {
        HashMap<String, Object> result = new HashMap<String, Object>();
        TaskGroupQueue taskGroupQueue = this.processService.loadTaskGroupQueue(taskId);
        if (taskGroupQueue.getStatus() != TaskGroupQueueStatus.WAIT_QUEUE) {
            this.putMsg(result, Status.TASK_GROUP_QUEUE_ALREADY_START, new Object[0]);
            return result;
        }
        taskGroupQueue.setForceStart(Flag.YES.getCode());
        this.processService.updateTaskGroupQueue(taskGroupQueue);
        this.processService.sendStartTask2Master(processInstance, taskId, org.apache.dolphinscheduler.remote.command.CommandType.TASK_FORCE_STATE_EVENT_REQUEST);
        this.putMsg(result, Status.SUCCESS, new Object[0]);
        return result;
    }

    private Map<String, Object> insertCommand(User loginUser, Integer instanceId, long processDefinitionCode, int processVersion, CommandType commandType, String startParams) {
        HashMap<String, Object> result = new HashMap<String, Object>();
        HashMap<String, Object> cmdParam = new HashMap<String, Object>();
        cmdParam.put("ProcessInstanceId", instanceId);
        if (!StringUtils.isEmpty((String)startParams)) {
            cmdParam.put("StartParams", startParams);
        }
        Command command = new Command();
        command.setCommandType(commandType);
        command.setProcessDefinitionCode(processDefinitionCode);
        command.setCommandParam(JSONUtils.toJsonString(cmdParam));
        command.setExecutorId(loginUser.getId());
        command.setProcessDefinitionVersion(processVersion);
        command.setProcessInstanceId(instanceId.intValue());
        if (!this.processService.verifyIsNeedCreateCommand(command)) {
            this.putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, processDefinitionCode);
            return result;
        }
        int create = this.processService.createCommand(command);
        if (create > 0) {
            this.putMsg(result, Status.SUCCESS, new Object[0]);
        } else {
            this.putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR, new Object[0]);
        }
        return result;
    }

    @Override
    public Map<String, Object> startCheckByProcessDefinedCode(long processDefinitionCode) {
        List processDefinitionList;
        HashMap<String, Object> result = new HashMap<String, Object>();
        ProcessDefinition processDefinition = this.processDefinitionMapper.queryByCode(processDefinitionCode);
        if (processDefinition == null) {
            logger.error("process definition is not found");
            this.putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "processDefinitionCode");
            return result;
        }
        ArrayList codes = new ArrayList();
        this.processService.recurseFindSubProcess(processDefinition.getCode(), codes);
        if (!codes.isEmpty() && (processDefinitionList = this.processDefinitionMapper.queryByCodes(codes)) != null) {
            for (ProcessDefinition processDefinitionTmp : processDefinitionList) {
                if (processDefinitionTmp.getReleaseState() == ReleaseState.ONLINE) continue;
                this.putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinitionTmp.getName());
                logger.info("not release process definition id: {} , name : {}", (Object)processDefinitionTmp.getId(), (Object)processDefinitionTmp.getName());
                return result;
            }
        }
        this.putMsg(result, Status.SUCCESS, new Object[0]);
        return result;
    }

    private int createCommand(CommandType commandType, long processDefineCode, TaskDependType nodeDep, FailureStrategy failureStrategy, String startNodeList, String schedule, WarningType warningType, int executorId, int warningGroupId, RunMode runMode, Priority processInstancePriority, String workerGroup, Long environmentCode, Map<String, String> startParams, Integer expectedParallelismNumber, int dryRun, ComplementDependentMode complementDependentMode) {
        String[] interval;
        Command command = new Command();
        HashMap<String, String> cmdParam = new HashMap<String, String>();
        if (commandType == null) {
            command.setCommandType(CommandType.START_PROCESS);
        } else {
            command.setCommandType(commandType);
        }
        command.setProcessDefinitionCode(processDefineCode);
        if (nodeDep != null) {
            command.setTaskDependType(nodeDep);
        }
        if (failureStrategy != null) {
            command.setFailureStrategy(failureStrategy);
        }
        if (!StringUtils.isEmpty((String)startNodeList)) {
            cmdParam.put("StartNodeList", startNodeList);
        }
        if (warningType != null) {
            command.setWarningType(warningType);
        }
        if (startParams != null && startParams.size() > 0) {
            cmdParam.put("StartParams", JSONUtils.toJsonString(startParams));
        }
        command.setCommandParam(JSONUtils.toJsonString(cmdParam));
        command.setExecutorId(executorId);
        command.setWarningGroupId(Integer.valueOf(warningGroupId));
        command.setProcessInstancePriority(processInstancePriority);
        command.setWorkerGroup(workerGroup);
        command.setEnvironmentCode(environmentCode);
        command.setDryRun(dryRun);
        ProcessDefinition processDefinition = this.processService.findProcessDefinitionByCode(Long.valueOf(processDefineCode));
        if (processDefinition != null) {
            command.setProcessDefinitionVersion(processDefinition.getVersion());
        }
        command.setProcessInstanceId(0);
        Date start = null;
        Date end = null;
        if (!StringUtils.isEmpty((String)schedule) && (interval = schedule.split(",")).length == 2 && (start = DateUtils.getScheduleDate((String)interval[0])).after(end = DateUtils.getScheduleDate((String)interval[1]))) {
            logger.info("complement data error, wrong date start:{} and end date:{} ", (Object)start, (Object)end);
            return 0;
        }
        if (commandType == CommandType.COMPLEMENT_DATA) {
            if (start == null || end == null) {
                return 0;
            }
            return this.createComplementCommandList(start, end, runMode, command, expectedParallelismNumber, complementDependentMode);
        }
        command.setCommandParam(JSONUtils.toJsonString(cmdParam));
        return this.processService.createCommand(command);
    }

    protected int createComplementCommandList(Date start, Date end, RunMode runMode, Command command, Integer expectedParallelismNumber, ComplementDependentMode complementDependentMode) {
        int createCount = 0;
        int dependentProcessDefinitionCreateCount = 0;
        runMode = runMode == null ? RunMode.RUN_MODE_SERIAL : runMode;
        Map cmdParam = JSONUtils.toMap((String)command.getCommandParam());
        switch (runMode) {
            case RUN_MODE_SERIAL: {
                if (start.after(end)) {
                    logger.warn("The startDate {} is later than the endDate {}", (Object)start, (Object)end);
                    break;
                }
                cmdParam.put("complementStartDate", DateUtils.dateToString((Date)start));
                cmdParam.put("complementEndDate", DateUtils.dateToString((Date)end));
                command.setCommandParam(JSONUtils.toJsonString((Object)cmdParam));
                createCount = this.processService.createCommand(command);
                List schedules = this.processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
                if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
                    logger.info("process code: {} complement dependent in off mode or schedule's size is 0, skip dependent complement data", (Object)command.getProcessDefinitionCode());
                    break;
                }
                dependentProcessDefinitionCreateCount += this.createComplementDependentCommand(schedules, command);
                break;
            }
            case RUN_MODE_PARALLEL: {
                if (start.after(end)) {
                    logger.warn("The startDate {} is later than the endDate {}", (Object)start, (Object)end);
                    break;
                }
                ArrayList listDate = new ArrayList();
                List schedules = this.processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
                listDate.addAll(CronUtils.getSelfFireDateList((Date)start, (Date)end, (List)schedules));
                int listDateSize = listDate.size();
                createCount = listDate.size();
                if (CollectionUtils.isEmpty(listDate)) break;
                if (expectedParallelismNumber != null && expectedParallelismNumber != 0 && listDateSize < (createCount = Math.min(listDate.size(), expectedParallelismNumber))) {
                    createCount = listDateSize;
                }
                logger.info("In parallel mode, current expectedParallelismNumber:{}", (Object)createCount);
                int itemsPerCommand = listDateSize / createCount;
                int remainingItems = listDateSize % createCount;
                int startDateIndex = 0;
                int endDateIndex = 0;
                for (int i = 1; i <= createCount; ++i) {
                    int extra = i <= remainingItems ? 1 : 0;
                    int singleCommandItems = itemsPerCommand + extra;
                    if (i == 1) {
                        endDateIndex += singleCommandItems - 1;
                    } else {
                        startDateIndex = endDateIndex + 1;
                        endDateIndex += singleCommandItems;
                    }
                    cmdParam.put("complementStartDate", DateUtils.dateToString((Date)((Date)listDate.get(startDateIndex))));
                    cmdParam.put("complementEndDate", DateUtils.dateToString((Date)((Date)listDate.get(endDateIndex))));
                    command.setCommandParam(JSONUtils.toJsonString((Object)cmdParam));
                    this.processService.createCommand(command);
                    if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
                        logger.info("process code: {} complement dependent in off mode or schedule's size is 0, skip dependent complement data", (Object)command.getProcessDefinitionCode());
                        continue;
                    }
                    dependentProcessDefinitionCreateCount += this.createComplementDependentCommand(schedules, command);
                }
                break;
            }
        }
        logger.info("create complement command count: {}, create dependent complement command count: {}", (Object)createCount, (Object)dependentProcessDefinitionCreateCount);
        return createCount;
    }

    protected int createComplementDependentCommand(List<Schedule> schedules, Command command) {
        Command dependentCommand;
        int dependentProcessDefinitionCreateCount = 0;
        try {
            dependentCommand = (Command)BeanUtils.cloneBean((Object)command);
        }
        catch (Exception e) {
            logger.error("copy dependent command error: ", (Throwable)e);
            return dependentProcessDefinitionCreateCount;
        }
        List<DependentProcessDefinition> dependentProcessDefinitionList = this.getComplementDependentDefinitionList(dependentCommand.getProcessDefinitionCode(), CronUtils.getMaxCycle((String)schedules.get(0).getCrontab()), dependentCommand.getWorkerGroup());
        dependentCommand.setTaskDependType(TaskDependType.TASK_POST);
        for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) {
            dependentCommand.setProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode());
            dependentCommand.setWorkerGroup(dependentProcessDefinition.getWorkerGroup());
            Map cmdParam = JSONUtils.toMap((String)dependentCommand.getCommandParam());
            cmdParam.put("StartNodeList", String.valueOf(dependentProcessDefinition.getTaskDefinitionCode()));
            dependentCommand.setCommandParam(JSONUtils.toJsonString((Object)cmdParam));
            dependentProcessDefinitionCreateCount += this.processService.createCommand(dependentCommand);
        }
        return dependentProcessDefinitionCreateCount;
    }

    private List<DependentProcessDefinition> getComplementDependentDefinitionList(long processDefinitionCode, CycleEnum processDefinitionCycle, String workerGroup) {
        List dependentProcessDefinitionList = this.processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode);
        return this.checkDependentProcessDefinitionValid(dependentProcessDefinitionList, processDefinitionCycle, workerGroup);
    }

    private List<DependentProcessDefinition> checkDependentProcessDefinitionValid(List<DependentProcessDefinition> dependentProcessDefinitionList, CycleEnum processDefinitionCycle, String workerGroup) {
        ArrayList<DependentProcessDefinition> validDependentProcessDefinitionList = new ArrayList<DependentProcessDefinition>();
        List processDefinitionCodeList = dependentProcessDefinitionList.stream().map(DependentProcessDefinition::getProcessDefinitionCode).collect(Collectors.toList());
        Map processDefinitionWorkerGroupMap = this.processService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList);
        for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) {
            if (dependentProcessDefinition.getDependentCycle() != processDefinitionCycle) continue;
            if (processDefinitionWorkerGroupMap.get(dependentProcessDefinition.getProcessDefinitionCode()) == null) {
                dependentProcessDefinition.setWorkerGroup(workerGroup);
            }
            validDependentProcessDefinitionList.add(dependentProcessDefinition);
        }
        return validDependentProcessDefinitionList;
    }
}

