/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.runner.task.dynamic;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.DynamicInputParameter;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DynamicParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.dolphinscheduler.remote.command.workflow.WorkflowStateEventChangeRequest;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
import org.apache.dolphinscheduler.server.master.rpc.MasterRpcClient;
import org.apache.dolphinscheduler.server.master.runner.execute.AsyncTaskExecuteFunction;
import org.apache.dolphinscheduler.server.master.runner.task.BaseAsyncLogicTask;
import org.apache.dolphinscheduler.server.master.runner.task.dynamic.DynamicAsyncTaskExecuteFunction;
import org.apache.dolphinscheduler.server.master.runner.task.dynamic.DynamicCommandUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.subworkflow.SubWorkflowService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DynamicLogicTask
extends BaseAsyncLogicTask<DynamicParameters> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(DynamicLogicTask.class);
    public static final String TASK_TYPE = "DYNAMIC";
    private final ProcessInstanceDao processInstanceDao;
    private final SubWorkflowService subWorkflowService;
    private final ProcessDefinitionMapper processDefineMapper;
    private final CommandMapper commandMapper;
    private final ProcessService processService;
    private ProcessInstance processInstance;
    private TaskInstance taskInstance;
    private final MasterRpcClient masterRpcClient;
    private boolean haveBeenCanceled = false;

    public DynamicLogicTask(TaskExecutionContext taskExecutionContext, ProcessInstanceDao processInstanceDao, TaskInstanceDao taskInstanceDao, SubWorkflowService subWorkflowService, ProcessService processService, MasterRpcClient masterRpcClient, ProcessDefinitionMapper processDefineMapper, CommandMapper commandMapper) {
        super(taskExecutionContext, (AbstractParameters)JSONUtils.parseObject((String)taskExecutionContext.getTaskParams(), (TypeReference)new TypeReference<DynamicParameters>(){}));
        this.processInstanceDao = processInstanceDao;
        this.subWorkflowService = subWorkflowService;
        this.processService = processService;
        this.masterRpcClient = masterRpcClient;
        this.processDefineMapper = processDefineMapper;
        this.commandMapper = commandMapper;
        this.processInstance = (ProcessInstance)processInstanceDao.queryById((Serializable)Integer.valueOf(taskExecutionContext.getProcessInstanceId()));
        this.taskInstance = (TaskInstance)taskInstanceDao.queryById((Serializable)Integer.valueOf(taskExecutionContext.getTaskInstanceId()));
    }

    @Override
    public AsyncTaskExecuteFunction getAsyncTaskExecuteFunction() throws MasterTaskExecuteException {
        List existsSubProcessInstanceList;
        List<Map<String, String>> parameterGroup = this.generateParameterGroup();
        if (parameterGroup.size() > ((DynamicParameters)this.taskParameters).getMaxNumOfSubWorkflowInstances()) {
            log.warn("the number of sub process instances [{}] exceeds the maximum limit [{}]", (Object)parameterGroup.size(), (Object)((DynamicParameters)this.taskParameters).getMaxNumOfSubWorkflowInstances());
            parameterGroup = parameterGroup.subList(0, ((DynamicParameters)this.taskParameters).getMaxNumOfSubWorkflowInstances());
        }
        if (CollectionUtils.isEmpty((Collection)(existsSubProcessInstanceList = this.subWorkflowService.getAllDynamicSubWorkflow((long)this.processInstance.getId().intValue(), this.taskInstance.getTaskCode())))) {
            this.generateSubWorkflowInstance(parameterGroup);
        } else {
            this.resetProcessInstanceStatus(existsSubProcessInstanceList);
        }
        return new DynamicAsyncTaskExecuteFunction(this.taskExecutionContext, this.processInstance, this.taskInstance, this, this.commandMapper, this.subWorkflowService, ((DynamicParameters)this.taskParameters).getDegreeOfParallelism());
    }

    public void resetProcessInstanceStatus(List<ProcessInstance> existsSubProcessInstanceList) {
        switch (this.processInstance.getCommandType()) {
            case REPEAT_RUNNING: {
                existsSubProcessInstanceList.forEach(processInstance -> {
                    processInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN);
                    this.processInstanceDao.updateById(processInstance);
                });
                break;
            }
            case START_FAILURE_TASK_PROCESS: 
            case RECOVER_TOLERANCE_FAULT_PROCESS: {
                List failedProcessInstances = this.subWorkflowService.filterFailedProcessInstances(existsSubProcessInstanceList);
                failedProcessInstances.forEach(processInstance -> {
                    processInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN);
                    this.processInstanceDao.updateById(processInstance);
                });
            }
        }
    }

    public void generateSubWorkflowInstance(List<Map<String, String>> parameterGroup) throws MasterTaskExecuteException {
        ArrayList<ProcessInstance> processInstanceList = new ArrayList<ProcessInstance>();
        ProcessDefinition subProcessDefinition = this.processDefineMapper.queryByCode(((DynamicParameters)this.taskParameters).getProcessDefinitionCode());
        for (Map<String, String> map : parameterGroup) {
            String dynamicStartParams = JSONUtils.toJsonString(map);
            Command command = DynamicCommandUtils.createCommand(this.processInstance, subProcessDefinition.getCode(), subProcessDefinition.getVersion(), map);
            command.setId(Integer.valueOf(-1));
            DynamicCommandUtils.addDataToCommandParam(command, "dynamicParams", dynamicStartParams);
            ProcessInstance subProcessInstance = this.createSubProcessInstance(command);
            subProcessInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN);
            this.processInstanceDao.insert((Object)subProcessInstance);
            command.setProcessInstanceId(subProcessInstance.getId().intValue());
            processInstanceList.add(subProcessInstance);
        }
        ArrayList<RelationSubWorkflow> relationSubWorkflowList = new ArrayList<RelationSubWorkflow>();
        for (ProcessInstance subProcessInstance : processInstanceList) {
            RelationSubWorkflow relationSubWorkflow = new RelationSubWorkflow();
            relationSubWorkflow.setParentWorkflowInstanceId(Long.valueOf(this.processInstance.getId().intValue()));
            relationSubWorkflow.setParentTaskCode(Long.valueOf(this.taskInstance.getTaskCode()));
            relationSubWorkflow.setSubWorkflowInstanceId(Long.valueOf(subProcessInstance.getId().intValue()));
            relationSubWorkflowList.add(relationSubWorkflow);
        }
        log.info("Expected number of runs : {}, actual number of runs : {}", (Object)parameterGroup.size(), (Object)processInstanceList.size());
        int n = this.subWorkflowService.batchInsertRelationSubWorkflow(relationSubWorkflowList);
        log.info("insert {} relation sub workflow", (Object)n);
    }

    public ProcessInstance createSubProcessInstance(Command command) throws MasterTaskExecuteException {
        ProcessInstance subProcessInstance;
        try {
            subProcessInstance = this.processService.constructProcessInstance(command, this.processInstance.getHost());
            subProcessInstance.setIsSubProcess(Flag.YES);
            subProcessInstance.setVarPool(this.taskExecutionContext.getVarPool());
        }
        catch (Exception e) {
            log.error("create sub process instance error", (Throwable)e);
            throw new MasterTaskExecuteException(e.getMessage());
        }
        return subProcessInstance;
    }

    public List<Map<String, String>> generateParameterGroup() {
        List<DynamicInputParameter> dynamicInputParameters = this.getDynamicInputParameters();
        Set filterStrings = Arrays.stream(StringUtils.split((String)((DynamicParameters)this.taskParameters).getFilterCondition(), (String)",")).map(String::trim).collect(Collectors.toSet());
        ArrayList allParameters = new ArrayList();
        for (DynamicInputParameter dynamicInputParameter : dynamicInputParameters) {
            ArrayList<DynamicInputParameter> singleParameters = new ArrayList<DynamicInputParameter>();
            String string = dynamicInputParameter.getValue();
            String separator = dynamicInputParameter.getSeparator();
            List valueList = Arrays.stream(StringUtils.split((String)string, (String)separator)).map(String::trim).collect(Collectors.toList());
            valueList = valueList.stream().filter(v -> !filterStrings.contains(v)).collect(Collectors.toList());
            for (String v2 : valueList) {
                DynamicInputParameter singleParameter = new DynamicInputParameter();
                singleParameter.setName(dynamicInputParameter.getName());
                singleParameter.setValue(v2);
                singleParameters.add(singleParameter);
            }
            allParameters.add(singleParameters);
        }
        List cartesianProduct = Lists.cartesianProduct(allParameters);
        List<Map<String, String>> parameterGroup = cartesianProduct.stream().map(inputParameterList -> inputParameterList.stream().collect(Collectors.toMap(DynamicInputParameter::getName, DynamicInputParameter::getValue))).collect(Collectors.toList());
        log.info("parameter group size: {}", (Object)parameterGroup.size());
        if (CollectionUtils.isNotEmpty(parameterGroup)) {
            for (Map map : parameterGroup) {
                log.info("parameter group: {}", (Object)map);
            }
        }
        return parameterGroup;
    }

    private List<DynamicInputParameter> getDynamicInputParameters() {
        List dynamicInputParameters = ((DynamicParameters)this.taskParameters).getListParameters();
        if (CollectionUtils.isNotEmpty((Collection)dynamicInputParameters)) {
            for (DynamicInputParameter dynamicInputParameter : dynamicInputParameters) {
                String value = dynamicInputParameter.getValue();
                Map paramsMap = this.taskExecutionContext.getPrepareParamsMap();
                value = ParameterUtils.convertParameterPlaceholders((String)value, (Map)ParameterUtils.convert((Map)paramsMap));
                dynamicInputParameter.setValue(value);
            }
        }
        return dynamicInputParameters;
    }

    @Override
    public void kill() {
        try {
            this.changeRunningSubprocessInstancesToStop(WorkflowExecutionStatus.READY_STOP);
        }
        catch (MasterTaskExecuteException e) {
            log.error("kill {} error", (Object)this.taskInstance.getName(), (Object)e);
        }
    }

    private void changeRunningSubprocessInstancesToStop(WorkflowExecutionStatus stopStatus) throws MasterTaskExecuteException {
        this.haveBeenCanceled = true;
        List existsSubProcessInstanceList = this.subWorkflowService.getAllDynamicSubWorkflow((long)this.processInstance.getId().intValue(), this.taskInstance.getTaskCode());
        List runningSubProcessInstanceList = this.subWorkflowService.filterRunningProcessInstances(existsSubProcessInstanceList);
        for (ProcessInstance subProcessInstance : runningSubProcessInstanceList) {
            subProcessInstance.setState(stopStatus);
            this.processInstanceDao.updateById((Object)subProcessInstance);
            if (subProcessInstance.getState().isFinished()) {
                log.info("The process instance [{}] is finished, no need to stop", (Object)subProcessInstance.getId());
                return;
            }
            try {
                this.sendToSubProcess(this.taskExecutionContext, subProcessInstance);
                log.info("Success send [{}] request to SubWorkflow's master: {}", (Object)stopStatus, (Object)subProcessInstance.getHost());
            }
            catch (RemotingException e) {
                throw new MasterTaskExecuteException(String.format("Send stop request to SubWorkflow's master: %s failed", subProcessInstance.getHost()), e);
            }
        }
    }

    private void sendToSubProcess(TaskExecutionContext taskExecutionContext, ProcessInstance subProcessInstance) throws RemotingException {
        WorkflowStateEventChangeRequest stateEventChangeCommand = new WorkflowStateEventChangeRequest(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId(), subProcessInstance.getState(), subProcessInstance.getId().intValue(), 0);
        Host host = new Host(subProcessInstance.getHost());
        this.masterRpcClient.send(host, stateEventChangeCommand.convert2Command());
    }

    public boolean isCancel() {
        return this.haveBeenCanceled;
    }
}

