/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.runner.task.dynamic;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.Generated;
import lombok.NonNull;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.server.master.runner.execute.AsyncTaskExecuteFunction;
import org.apache.dolphinscheduler.server.master.runner.task.dynamic.DynamicCommandUtils;
import org.apache.dolphinscheduler.server.master.runner.task.dynamic.DynamicLogicTask;
import org.apache.dolphinscheduler.server.master.runner.task.dynamic.DynamicOutput;
import org.apache.dolphinscheduler.service.subworkflow.SubWorkflowService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DynamicAsyncTaskExecuteFunction
implements AsyncTaskExecuteFunction {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(DynamicAsyncTaskExecuteFunction.class);
    private static final Duration TASK_EXECUTE_STATE_CHECK_INTERVAL = Duration.ofSeconds(10L);
    private static final String OUTPUT_KEY = "dynamic.out";
    private final ProcessInstance processInstance;
    private final TaskInstance taskInstance;
    private final SubWorkflowService subWorkflowService;
    private final CommandMapper commandMapper;
    private final int degreeOfParallelism;
    private final DynamicLogicTask logicTask;

    public DynamicAsyncTaskExecuteFunction(TaskExecutionContext taskExecutionContext, ProcessInstance processInstance, TaskInstance taskInstance, DynamicLogicTask dynamicLogicTask, CommandMapper commandMapper, SubWorkflowService subWorkflowService, int degreeOfParallelism) {
        this.processInstance = processInstance;
        this.taskInstance = taskInstance;
        this.logicTask = dynamicLogicTask;
        this.degreeOfParallelism = degreeOfParallelism;
        this.commandMapper = commandMapper;
        this.subWorkflowService = subWorkflowService;
    }

    @Override
    @NonNull
    public AsyncTaskExecuteFunction.AsyncTaskExecutionStatus getAsyncTaskExecutionStatus() {
        List<ProcessInstance> allSubProcessInstance = this.getAllSubProcessInstance();
        int totalSubProcessInstanceCount = allSubProcessInstance.size();
        List finishedSubProcessInstance = this.subWorkflowService.filterFinishProcessInstances(allSubProcessInstance);
        if (finishedSubProcessInstance.size() == totalSubProcessInstanceCount) {
            log.info("all sub process instance finish");
            int successCount = this.subWorkflowService.filterSuccessProcessInstances(finishedSubProcessInstance).size();
            log.info("success sub process instance count: {}", (Object)successCount);
            if (successCount == totalSubProcessInstanceCount) {
                log.info("all sub process instance success");
                this.setOutputParameters();
                return AsyncTaskExecuteFunction.AsyncTaskExecutionStatus.SUCCESS;
            }
            int failedCount = totalSubProcessInstanceCount - successCount;
            log.info("failed sub process instance count: {}", (Object)failedCount);
            return AsyncTaskExecuteFunction.AsyncTaskExecutionStatus.FAILED;
        }
        if (this.logicTask.isCancel()) {
            return AsyncTaskExecuteFunction.AsyncTaskExecutionStatus.FAILED;
        }
        int runningCount = this.subWorkflowService.filterRunningProcessInstances(allSubProcessInstance).size();
        int startCount = this.degreeOfParallelism - runningCount;
        if (startCount > 0) {
            log.info("There are {} sub process instances that can be started", (Object)startCount);
            this.startSubProcessInstances(allSubProcessInstance, startCount);
        }
        return AsyncTaskExecuteFunction.AsyncTaskExecutionStatus.RUNNING;
    }

    private void setOutputParameters() {
        log.info("set varPool");
        List<ProcessInstance> allSubProcessInstance = this.getAllSubProcessInstance();
        ArrayList<DynamicOutput> dynamicOutputs = new ArrayList<DynamicOutput>();
        int index = 1;
        for (ProcessInstance processInstance : allSubProcessInstance) {
            DynamicOutput dynamicOutput = new DynamicOutput();
            Map dynamicParams = JSONUtils.toMap((String)((String)JSONUtils.toMap((String)processInstance.getCommandParam()).get("dynamicParams")));
            dynamicOutput.setDynParams(dynamicParams);
            HashMap<String, String> outputValueMap = new HashMap<String, String>();
            List propertyList = this.subWorkflowService.getWorkflowOutputParameters(processInstance);
            for (Property property : propertyList) {
                outputValueMap.put(property.getProp(), property.getValue());
            }
            dynamicOutput.setOutputValue(outputValueMap);
            dynamicOutput.setMappedTimes(index++);
            dynamicOutputs.add(dynamicOutput);
        }
        Property property = new Property();
        property.setProp(String.format("%s(%s)", OUTPUT_KEY, this.taskInstance.getName()));
        property.setDirect(Direct.OUT);
        property.setType(DataType.VARCHAR);
        property.setValue(JSONUtils.toJsonString(dynamicOutputs));
        ArrayList<Property> taskPropertyList = new ArrayList<Property>(JSONUtils.toList((String)this.taskInstance.getVarPool(), Property.class));
        taskPropertyList.add(property);
        this.logicTask.getTaskParameters().setVarPool(JSONUtils.toJsonString(taskPropertyList));
        log.info("set property: {}", (Object)property);
    }

    private void startSubProcessInstances(List<ProcessInstance> allSubProcessInstance, int startCount) {
        List waitingProcessInstances = this.subWorkflowService.filterWaitToRunProcessInstances(allSubProcessInstance);
        for (int i = 0; i < Math.min(startCount, waitingProcessInstances.size()); ++i) {
            ProcessInstance subProcessInstance = (ProcessInstance)waitingProcessInstances.get(i);
            Map parameters = JSONUtils.toMap((String)DynamicCommandUtils.getDataFromCommandParam(subProcessInstance.getCommandParam(), "dynamicParams"));
            Command command = DynamicCommandUtils.createCommand(this.processInstance, subProcessInstance.getProcessDefinitionCode(), subProcessInstance.getProcessDefinitionVersion(), parameters);
            command.setProcessInstanceId(subProcessInstance.getId().intValue());
            this.commandMapper.insert((Object)command);
            log.info("start sub process instance, sub process instance id: {}, command: {}", (Object)subProcessInstance.getId(), (Object)command);
        }
    }

    public List<ProcessInstance> getAllSubProcessInstance() {
        return this.subWorkflowService.getAllDynamicSubWorkflow((long)this.processInstance.getId().intValue(), this.taskInstance.getTaskCode());
    }

    @Override
    @NonNull
    public Duration getAsyncTaskStateCheckInterval() {
        return TASK_EXECUTE_STATE_CHECK_INTERVAL;
    }
}

