/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.runner.task.subworkflow;

import com.fasterxml.jackson.core.type.TypeReference;
import lombok.Generated;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SubProcessParameters;
import org.apache.dolphinscheduler.remote.command.workflow.WorkflowStateEventChangeRequest;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
import org.apache.dolphinscheduler.server.master.rpc.MasterRpcClient;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.execute.AsyncTaskExecuteFunction;
import org.apache.dolphinscheduler.server.master.runner.task.BaseAsyncLogicTask;
import org.apache.dolphinscheduler.server.master.runner.task.subworkflow.SubWorkflowAsyncTaskExecuteFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubWorkflowLogicTask
extends BaseAsyncLogicTask<SubProcessParameters> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(SubWorkflowLogicTask.class);
    public static final String TASK_TYPE = "SUB_PROCESS";
    private final ProcessInstanceExecCacheManager processInstanceExecCacheManager;
    private final ProcessInstanceDao processInstanceDao;
    private final MasterRpcClient masterRpcClient;

    public SubWorkflowLogicTask(TaskExecutionContext taskExecutionContext, ProcessInstanceExecCacheManager processInstanceExecCacheManager, ProcessInstanceDao processInstanceDao, MasterRpcClient masterRpcClient) {
        super(taskExecutionContext, (AbstractParameters)JSONUtils.parseObject((String)taskExecutionContext.getTaskParams(), (TypeReference)new TypeReference<SubProcessParameters>(){}));
        this.processInstanceExecCacheManager = processInstanceExecCacheManager;
        this.processInstanceDao = processInstanceDao;
        this.masterRpcClient = masterRpcClient;
    }

    @Override
    public AsyncTaskExecuteFunction getAsyncTaskExecuteFunction() throws MasterTaskExecuteException {
        return new SubWorkflowAsyncTaskExecuteFunction(this.taskExecutionContext, this.processInstanceDao);
    }

    @Override
    public void pause() throws MasterTaskExecuteException {
        WorkflowExecuteRunnable workflowExecuteRunnable = this.processInstanceExecCacheManager.getByProcessInstanceId(this.taskExecutionContext.getProcessInstanceId());
        if (workflowExecuteRunnable == null) {
            log.warn("Cannot find WorkflowExecuteRunnable");
            return;
        }
        ProcessInstance subProcessInstance = this.processInstanceDao.querySubProcessInstanceByParentId(Integer.valueOf(this.taskExecutionContext.getProcessInstanceId()), Integer.valueOf(this.taskExecutionContext.getTaskInstanceId()));
        if (subProcessInstance == null) {
            log.info("SubWorkflow instance is null");
            return;
        }
        TaskInstance taskInstance = workflowExecuteRunnable.getTaskInstance(this.taskExecutionContext.getTaskInstanceId()).orElse(null);
        if (taskInstance == null) {
            log.info("TaskInstance is null");
            return;
        }
        if (taskInstance.getState().isFinished()) {
            log.info("The task instance is finished, no need to pause");
            return;
        }
        subProcessInstance.setStateWithDesc(WorkflowExecutionStatus.READY_PAUSE, "ready pause sub workflow");
        this.processInstanceDao.updateById((Object)subProcessInstance);
        try {
            this.sendToSubProcess(this.taskExecutionContext, subProcessInstance);
            log.info("Success send pause request to SubWorkflow's master: {}", (Object)subProcessInstance.getHost());
        }
        catch (RemotingException e) {
            throw new MasterTaskExecuteException(String.format("Send pause request to SubWorkflow's master: %s failed", subProcessInstance.getHost()), e);
        }
    }

    @Override
    public void kill() {
        WorkflowExecuteRunnable workflowExecuteRunnable = this.processInstanceExecCacheManager.getByProcessInstanceId(this.taskExecutionContext.getProcessInstanceId());
        if (workflowExecuteRunnable == null) {
            log.warn("Cannot find WorkflowExecuteRunnable");
            return;
        }
        ProcessInstance subProcessInstance = this.processInstanceDao.querySubProcessInstanceByParentId(Integer.valueOf(this.taskExecutionContext.getProcessInstanceId()), Integer.valueOf(this.taskExecutionContext.getTaskInstanceId()));
        if (subProcessInstance == null) {
            log.info("SubWorkflow instance is null");
            return;
        }
        TaskInstance taskInstance = workflowExecuteRunnable.getTaskInstance(this.taskExecutionContext.getTaskInstanceId()).orElse(null);
        if (taskInstance == null) {
            log.info("TaskInstance is null");
            return;
        }
        if (subProcessInstance.getState().isFinished()) {
            log.info("The subProcessInstance is finished, no need to pause");
            return;
        }
        subProcessInstance.setStateWithDesc(WorkflowExecutionStatus.READY_STOP, "ready stop by kill task");
        this.processInstanceDao.updateById((Object)subProcessInstance);
        try {
            this.sendToSubProcess(this.taskExecutionContext, subProcessInstance);
            log.info("Success send kill request to SubWorkflow's master: {}", (Object)subProcessInstance.getHost());
        }
        catch (RemotingException e) {
            log.error("Send kill request to SubWorkflow's master: {} failed", (Object)subProcessInstance.getHost(), (Object)e);
        }
    }

    private void sendToSubProcess(TaskExecutionContext taskExecutionContext, ProcessInstance subProcessInstance) throws RemotingException {
        WorkflowStateEventChangeRequest stateEventChangeCommand = new WorkflowStateEventChangeRequest(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId(), subProcessInstance.getState(), subProcessInstance.getId().intValue(), 0);
        Host host = new Host(subProcessInstance.getHost());
        this.masterRpcClient.send(host, stateEventChangeCommand.convert2Command());
    }
}

