package org.apache.dolphinscheduler.server.master.runner.task.dynamic;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.master.ITaskInstanceExecutionEventListener;
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStateChangeEvent;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.DynamicInputParameter;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DynamicParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
import org.apache.dolphinscheduler.server.master.runner.execute.AsyncTaskExecuteFunction;
import org.apache.dolphinscheduler.server.master.runner.task.BaseAsyncLogicTask;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.subworkflow.SubWorkflowService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.class */
public class DynamicLogicTask extends BaseAsyncLogicTask<DynamicParameters> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(DynamicLogicTask.class);
    public static final String TASK_TYPE = "DYNAMIC";
    private final ProcessInstanceDao processInstanceDao;
    private final SubWorkflowService subWorkflowService;
    private final ProcessDefinitionMapper processDefineMapper;
    private final CommandMapper commandMapper;
    private final ProcessService processService;
    private ProcessInstance processInstance;
    private TaskInstance taskInstance;
    private boolean haveBeenCanceled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.dolphinscheduler.server.master.runner.task.dynamic.DynamicLogicTask$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$dolphinscheduler$common$enums$CommandType = new int[CommandType.values().length];

        static {
            try {
                $SwitchMap$org$apache$dolphinscheduler$common$enums$CommandType[CommandType.REPEAT_RUNNING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$dolphinscheduler$common$enums$CommandType[CommandType.START_FAILURE_TASK_PROCESS.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$dolphinscheduler$common$enums$CommandType[CommandType.RECOVER_TOLERANCE_FAULT_PROCESS.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public DynamicLogicTask(TaskExecutionContext taskExecutionContext, ProcessInstanceDao processInstanceDao, TaskInstanceDao taskInstanceDao, SubWorkflowService subWorkflowService, ProcessService processService, ProcessDefinitionMapper processDefinitionMapper, CommandMapper commandMapper) {
        super(taskExecutionContext, (AbstractParameters) JSONUtils.parseObject(taskExecutionContext.getTaskParams(), new TypeReference<DynamicParameters>() { // from class: org.apache.dolphinscheduler.server.master.runner.task.dynamic.DynamicLogicTask.1
        }));
        this.haveBeenCanceled = false;
        this.processInstanceDao = processInstanceDao;
        this.subWorkflowService = subWorkflowService;
        this.processService = processService;
        this.processDefineMapper = processDefinitionMapper;
        this.commandMapper = commandMapper;
        this.processInstance = (ProcessInstance) processInstanceDao.queryById(Integer.valueOf(taskExecutionContext.getProcessInstanceId()));
        this.taskInstance = (TaskInstance) taskInstanceDao.queryById(Integer.valueOf(taskExecutionContext.getTaskInstanceId()));
    }

    @Override // org.apache.dolphinscheduler.server.master.runner.task.IAsyncLogicTask
    public AsyncTaskExecuteFunction getAsyncTaskExecuteFunction() throws MasterTaskExecuteException {
        List<Map<String, String>> generateParameterGroup = generateParameterGroup();
        if (generateParameterGroup.size() > this.taskParameters.getMaxNumOfSubWorkflowInstances()) {
            log.warn("the number of sub process instances [{}] exceeds the maximum limit [{}]", Integer.valueOf(generateParameterGroup.size()), Integer.valueOf(this.taskParameters.getMaxNumOfSubWorkflowInstances()));
            generateParameterGroup = generateParameterGroup.subList(0, this.taskParameters.getMaxNumOfSubWorkflowInstances());
        }
        List<ProcessInstance> allDynamicSubWorkflow = this.subWorkflowService.getAllDynamicSubWorkflow(this.processInstance.getId().intValue(), this.taskInstance.getTaskCode());
        if (CollectionUtils.isEmpty(allDynamicSubWorkflow)) {
            generateSubWorkflowInstance(generateParameterGroup);
        } else {
            resetProcessInstanceStatus(allDynamicSubWorkflow);
        }
        return new DynamicAsyncTaskExecuteFunction(this.taskExecutionContext, this.processInstance, this.taskInstance, this, this.commandMapper, this.subWorkflowService, this.taskParameters.getDegreeOfParallelism());
    }

    public void resetProcessInstanceStatus(List<ProcessInstance> list) {
        switch (AnonymousClass2.$SwitchMap$org$apache$dolphinscheduler$common$enums$CommandType[this.processInstance.getCommandType().ordinal()]) {
            case 1:
                list.forEach(processInstance -> {
                    processInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN);
                    this.processInstanceDao.updateById(processInstance);
                });
                return;
            case 2:
            case 3:
                this.subWorkflowService.filterFailedProcessInstances(list).forEach(processInstance2 -> {
                    processInstance2.setState(WorkflowExecutionStatus.WAIT_TO_RUN);
                    this.processInstanceDao.updateById(processInstance2);
                });
                return;
            default:
                return;
        }
    }

    public void generateSubWorkflowInstance(List<Map<String, String>> list) throws MasterTaskExecuteException {
        ArrayList<ProcessInstance> arrayList = new ArrayList();
        ProcessDefinition queryByCode = this.processDefineMapper.queryByCode(this.taskParameters.getProcessDefinitionCode());
        for (Map<String, String> map : list) {
            String jsonString = JSONUtils.toJsonString(map);
            Command createCommand = DynamicCommandUtils.createCommand(this.processInstance, Long.valueOf(queryByCode.getCode()), Integer.valueOf(queryByCode.getVersion()), map);
            createCommand.setId(-1);
            DynamicCommandUtils.addDataToCommandParam(createCommand, "dynamicParams", jsonString);
            ProcessInstance createSubProcessInstance = createSubProcessInstance(createCommand);
            createSubProcessInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN);
            this.processInstanceDao.insert(createSubProcessInstance);
            createCommand.setProcessInstanceId(createSubProcessInstance.getId().intValue());
            arrayList.add(createSubProcessInstance);
        }
        ArrayList arrayList2 = new ArrayList();
        for (ProcessInstance processInstance : arrayList) {
            RelationSubWorkflow relationSubWorkflow = new RelationSubWorkflow();
            relationSubWorkflow.setParentWorkflowInstanceId(Long.valueOf(this.processInstance.getId().intValue()));
            relationSubWorkflow.setParentTaskCode(Long.valueOf(this.taskInstance.getTaskCode()));
            relationSubWorkflow.setSubWorkflowInstanceId(Long.valueOf(processInstance.getId().intValue()));
            arrayList2.add(relationSubWorkflow);
        }
        log.info("Expected number of runs : {}, actual number of runs : {}", Integer.valueOf(list.size()), Integer.valueOf(arrayList.size()));
        log.info("insert {} relation sub workflow", Integer.valueOf(this.subWorkflowService.batchInsertRelationSubWorkflow(arrayList2)));
    }

    public ProcessInstance createSubProcessInstance(Command command) throws MasterTaskExecuteException {
        try {
            ProcessInstance constructProcessInstance = this.processService.constructProcessInstance(command, this.processInstance.getHost());
            constructProcessInstance.setIsSubProcess(Flag.YES);
            constructProcessInstance.setVarPool(this.taskExecutionContext.getVarPool());
            return constructProcessInstance;
        } catch (Exception e) {
            log.error("create sub process instance error", e);
            throw new MasterTaskExecuteException(e.getMessage());
        }
    }

    public List<Map<String, String>> generateParameterGroup() {
        List<DynamicInputParameter> dynamicInputParameters = getDynamicInputParameters();
        Set set = (Set) Arrays.stream(StringUtils.split(this.taskParameters.getFilterCondition(), ",")).map((v0) -> {
            return v0.trim();
        }).collect(Collectors.toSet());
        ArrayList arrayList = new ArrayList();
        for (DynamicInputParameter dynamicInputParameter : dynamicInputParameters) {
            ArrayList arrayList2 = new ArrayList();
            for (String str : (List) ((List) Arrays.stream(StringUtils.split(dynamicInputParameter.getValue(), dynamicInputParameter.getSeparator())).map((v0) -> {
                return v0.trim();
            }).collect(Collectors.toList())).stream().filter(str2 -> {
                return !set.contains(str2);
            }).collect(Collectors.toList())) {
                DynamicInputParameter dynamicInputParameter2 = new DynamicInputParameter();
                dynamicInputParameter2.setName(dynamicInputParameter.getName());
                dynamicInputParameter2.setValue(str);
                arrayList2.add(dynamicInputParameter2);
            }
            arrayList.add(arrayList2);
        }
        List<Map<String, String>> list = (List) Lists.cartesianProduct(arrayList).stream().map(list2 -> {
            return (Map) list2.stream().collect(Collectors.toMap((v0) -> {
                return v0.getName();
            }, (v0) -> {
                return v0.getValue();
            }));
        }).collect(Collectors.toList());
        log.info("parameter group size: {}", Integer.valueOf(list.size()));
        if (CollectionUtils.isNotEmpty(list)) {
            Iterator<Map<String, String>> it = list.iterator();
            while (it.hasNext()) {
                log.info("parameter group: {}", it.next());
            }
        }
        return list;
    }

    private List<DynamicInputParameter> getDynamicInputParameters() {
        List<DynamicInputParameter> listParameters = this.taskParameters.getListParameters();
        if (CollectionUtils.isNotEmpty(listParameters)) {
            for (DynamicInputParameter dynamicInputParameter : listParameters) {
                dynamicInputParameter.setValue(ParameterUtils.convertParameterPlaceholders(dynamicInputParameter.getValue(), ParameterUtils.convert(this.taskExecutionContext.getPrepareParamsMap())));
            }
        }
        return listParameters;
    }

    @Override // org.apache.dolphinscheduler.server.master.runner.task.BaseAsyncLogicTask, org.apache.dolphinscheduler.server.master.runner.task.ILogicTask
    public void kill() {
        try {
            changeRunningSubprocessInstancesToStop(WorkflowExecutionStatus.READY_STOP);
        } catch (MasterTaskExecuteException e) {
            log.error("kill {} error", this.taskInstance.getName(), e);
        }
    }

    private void changeRunningSubprocessInstancesToStop(WorkflowExecutionStatus workflowExecutionStatus) throws MasterTaskExecuteException {
        this.haveBeenCanceled = true;
        for (ProcessInstance processInstance : this.subWorkflowService.filterRunningProcessInstances(this.subWorkflowService.getAllDynamicSubWorkflow(this.processInstance.getId().intValue(), this.taskInstance.getTaskCode()))) {
            processInstance.setState(workflowExecutionStatus);
            this.processInstanceDao.updateById(processInstance);
            if (processInstance.getState().isFinished()) {
                log.info("The process instance [{}] is finished, no need to stop", processInstance.getId());
                return;
            }
            try {
                sendToSubProcess(this.taskExecutionContext, processInstance);
                log.info("Success send [{}] request to SubWorkflow's master: {}", workflowExecutionStatus, processInstance.getHost());
            } catch (Exception e) {
                throw new MasterTaskExecuteException(String.format("Send stop request to SubWorkflow's master: %s failed", processInstance.getHost()), e);
            }
        }
    }

    private void sendToSubProcess(TaskExecutionContext taskExecutionContext, ProcessInstance processInstance) {
        ((ITaskInstanceExecutionEventListener) SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(processInstance.getHost(), ITaskInstanceExecutionEventListener.class)).onWorkflowInstanceInstanceStateChange(new WorkflowInstanceStateChangeEvent(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId(), processInstance.getState(), processInstance.getId().intValue(), 0));
    }

    public boolean isCancel() {
        return this.haveBeenCanceled;
    }
}
