package org.apache.dolphinscheduler.server.master.runner.task;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.auto.service.AutoService;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;

@AutoService({ITaskProcessor.class})
/* loaded from: input_file:org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.class */
public class SubTaskProcessor extends BaseTaskProcessor {
    private ProcessInstance subProcessInstance = null;
    private final Lock runLock = new ReentrantLock();
    private StateEventCallbackService stateEventCallbackService = (StateEventCallbackService) SpringApplicationContext.getBean(StateEventCallbackService.class);

    @Override // org.apache.dolphinscheduler.server.master.runner.task.BaseTaskProcessor
    public boolean submitTask() {
        this.taskInstance = this.processService.submitTaskWithRetry(this.processInstance, this.taskInstance, this.maxRetryTimes, this.commitInterval);
        if (this.taskInstance == null) {
            return false;
        }
        setTaskExecutionLogger();
        this.taskInstance.setLogPath(LogUtils.getTaskLogPath(this.taskInstance.getFirstSubmitTime(), this.processInstance.getProcessDefinitionCode(), this.processInstance.getProcessDefinitionVersion(), this.taskInstance.getProcessInstanceId(), this.taskInstance.getId().intValue()));
        return true;
    }

    @Override // org.apache.dolphinscheduler.server.master.runner.task.BaseTaskProcessor
    public boolean runTask() {
        try {
            this.runLock.lock();
            if (setSubWorkFlow()) {
                updateTaskState();
            }
            return true;
        } catch (Exception e) {
            this.logger.error("work flow {} sub task {} exceptions", new Object[]{this.processInstance.getId(), this.taskInstance.getId(), e});
            return true;
        } finally {
            this.runLock.unlock();
        }
    }

    @Override // org.apache.dolphinscheduler.server.master.runner.task.BaseTaskProcessor
    protected boolean resubmitTask() {
        return true;
    }

    @Override // org.apache.dolphinscheduler.server.master.runner.task.BaseTaskProcessor
    protected boolean dispatchTask() {
        return true;
    }

    @Override // org.apache.dolphinscheduler.server.master.runner.task.BaseTaskProcessor
    protected boolean taskTimeout() {
        TaskTimeoutStrategy timeoutNotifyStrategy = this.taskInstance.getTaskDefine().getTimeoutNotifyStrategy();
        if (TaskTimeoutStrategy.FAILED != timeoutNotifyStrategy && TaskTimeoutStrategy.WARNFAILED != timeoutNotifyStrategy) {
            return true;
        }
        this.logger.info("sub process task {} timeout, strategy {} ", this.taskInstance.getId(), timeoutNotifyStrategy.getDescp());
        killTask();
        return true;
    }

    private void updateTaskState() {
        this.subProcessInstance = this.processService.findSubProcessInstance(this.processInstance.getId(), this.taskInstance.getId());
        this.logger.info("work flow {} task {}, sub work flow: {} state: {}", new Object[]{this.processInstance.getId(), this.taskInstance.getId(), this.subProcessInstance.getId(), this.subProcessInstance.getState()});
        if (this.subProcessInstance == null || !this.subProcessInstance.getState().isFinished()) {
            return;
        }
        this.taskInstance.setState(TaskExecutionStatus.of(this.subProcessInstance.getState().getCode()));
        this.taskInstance.setEndTime(new Date());
        dealFinish();
        this.processService.saveTaskInstance(this.taskInstance);
    }

    private void dealFinish() {
        String varPool = this.taskInstance.getVarPool();
        if (StringUtils.isNotEmpty(varPool)) {
            String varPool2 = this.subProcessInstance.getVarPool();
            if (StringUtils.isNotEmpty(varPool2)) {
                List list = JSONUtils.toList(varPool, Property.class);
                Object obj = ((Map) JSONUtils.parseObject(this.taskInstance.getTaskParams(), new TypeReference<Map<String, Object>>() { // from class: org.apache.dolphinscheduler.server.master.runner.task.SubTaskProcessor.1
                })).get("localParams");
                if (obj != null) {
                    List list2 = JSONUtils.toList(JSONUtils.toJsonString(obj), Property.class);
                    Map map = (Map) JSONUtils.toList(varPool2, Property.class).stream().collect(Collectors.toMap((v0) -> {
                        return v0.getProp();
                    }, (v0) -> {
                        return v0.getValue();
                    }));
                    for (Property property : (List) list2.stream().filter(property2 -> {
                        return Direct.OUT == property2.getDirect();
                    }).collect(Collectors.toList())) {
                        property.setValue((String) map.get(property.getProp()));
                        list.add(property);
                    }
                    this.taskInstance.setVarPool(JSONUtils.toJsonString(list));
                    this.processService.changeOutParam(this.taskInstance);
                }
            }
        }
    }

    @Override // org.apache.dolphinscheduler.server.master.runner.task.BaseTaskProcessor
    protected boolean pauseTask() {
        pauseSubWorkFlow();
        return true;
    }

    private boolean pauseSubWorkFlow() {
        ProcessInstance findSubProcessInstance = this.processService.findSubProcessInstance(this.processInstance.getId(), this.taskInstance.getId());
        if (findSubProcessInstance == null || this.taskInstance.getState().isFinished()) {
            return false;
        }
        findSubProcessInstance.setStateWithDesc(WorkflowExecutionStatus.READY_PAUSE, "ready pause sub workflow");
        this.processInstanceDao.updateProcessInstance(findSubProcessInstance);
        sendToSubProcess();
        return true;
    }

    private boolean setSubWorkFlow() {
        this.logger.info("set work flow {} task {} running", this.processInstance.getId(), this.taskInstance.getId());
        if (this.subProcessInstance != null) {
            return true;
        }
        this.subProcessInstance = this.processService.findSubProcessInstance(this.processInstance.getId(), this.taskInstance.getId());
        if (this.subProcessInstance == null || this.taskInstance.getState().isFinished()) {
            return false;
        }
        this.taskInstance.setHost(NetUtils.getAddr(this.masterConfig.getListenPort()));
        this.taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION);
        this.taskInstance.setStartTime(new Date());
        this.processService.updateTaskInstance(this.taskInstance);
        this.logger.info("set sub work flow {} task {} state: {}", new Object[]{this.processInstance.getId(), this.taskInstance.getId(), this.taskInstance.getState()});
        return true;
    }

    @Override // org.apache.dolphinscheduler.server.master.runner.task.BaseTaskProcessor
    protected boolean killTask() {
        ProcessInstance findSubProcessInstance = this.processService.findSubProcessInstance(this.processInstance.getId(), this.taskInstance.getId());
        if (findSubProcessInstance == null || this.taskInstance.getState().isFinished()) {
            return false;
        }
        findSubProcessInstance.setStateWithDesc(WorkflowExecutionStatus.READY_STOP, "ready stop by kill task");
        this.processInstanceDao.updateProcessInstance(findSubProcessInstance);
        sendToSubProcess();
        return true;
    }

    private void sendToSubProcess() {
        WorkflowStateEventChangeCommand workflowStateEventChangeCommand = new WorkflowStateEventChangeCommand(this.processInstance.getId().intValue(), this.taskInstance.getId().intValue(), this.subProcessInstance.getState(), this.subProcessInstance.getId().intValue(), 0);
        this.stateEventCallbackService.sendResult(new Host(this.subProcessInstance.getHost()), workflowStateEventChangeCommand.convert2Command());
    }

    @Override // org.apache.dolphinscheduler.server.master.runner.task.BaseTaskProcessor, org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor
    public String getType() {
        return "SUB_PROCESS";
    }
}
