/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.runner.task;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.auto.service.AutoService;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.runner.task.BaseTaskProcessor;
import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;

@AutoService(value={ITaskProcessor.class})
public class SubTaskProcessor
extends BaseTaskProcessor {
    private ProcessInstance subProcessInstance = null;
    private final Lock runLock = new ReentrantLock();
    private StateEventCallbackService stateEventCallbackService = (StateEventCallbackService)SpringApplicationContext.getBean(StateEventCallbackService.class);

    @Override
    public boolean submitTask() {
        this.taskInstance = this.processService.submitTaskWithRetry(this.processInstance, this.taskInstance, this.maxRetryTimes, this.commitInterval);
        if (this.taskInstance == null) {
            return false;
        }
        this.setTaskExecutionLogger();
        this.taskInstance.setLogPath(LogUtils.getTaskLogPath((Date)this.taskInstance.getFirstSubmitTime(), (Long)this.processInstance.getProcessDefinitionCode(), (int)this.processInstance.getProcessDefinitionVersion(), (int)this.taskInstance.getProcessInstanceId(), (int)this.taskInstance.getId()));
        return true;
    }

    @Override
    public boolean runTask() {
        try {
            this.runLock.lock();
            if (this.setSubWorkFlow()) {
                this.updateTaskState();
            }
        }
        catch (Exception e) {
            this.logger.error("work flow {} sub task {} exceptions", new Object[]{this.processInstance.getId(), this.taskInstance.getId(), e});
        }
        finally {
            this.runLock.unlock();
        }
        return true;
    }

    @Override
    protected boolean dispatchTask() {
        return true;
    }

    @Override
    protected boolean taskTimeout() {
        TaskTimeoutStrategy taskTimeoutStrategy = this.taskInstance.getTaskDefine().getTimeoutNotifyStrategy();
        if (TaskTimeoutStrategy.FAILED != taskTimeoutStrategy && TaskTimeoutStrategy.WARNFAILED != taskTimeoutStrategy) {
            return true;
        }
        this.logger.info("sub process task {} timeout, strategy {} ", (Object)this.taskInstance.getId(), (Object)taskTimeoutStrategy.getDescp());
        this.killTask();
        return true;
    }

    private void updateTaskState() {
        this.subProcessInstance = this.processService.findSubProcessInstance(Integer.valueOf(this.processInstance.getId()), Integer.valueOf(this.taskInstance.getId()));
        this.logger.info("work flow {} task {}, sub work flow: {} state: {}", new Object[]{this.processInstance.getId(), this.taskInstance.getId(), this.subProcessInstance.getId(), this.subProcessInstance.getState().getDescp()});
        if (this.subProcessInstance != null && this.subProcessInstance.getState().typeIsFinished()) {
            this.taskInstance.setState(this.subProcessInstance.getState());
            this.taskInstance.setEndTime(new Date());
            this.dealFinish();
            this.processService.saveTaskInstance(this.taskInstance);
        }
    }

    private void dealFinish() {
        String subProcessInstanceVarPool;
        String thisTaskInstanceVarPool = this.taskInstance.getVarPool();
        if (StringUtils.isNotEmpty((String)thisTaskInstanceVarPool) && StringUtils.isNotEmpty((String)(subProcessInstanceVarPool = this.subProcessInstance.getVarPool()))) {
            List varPoolProperties = JSONUtils.toList((String)thisTaskInstanceVarPool, Property.class);
            Map taskParams = (Map)JSONUtils.parseObject((String)this.taskInstance.getTaskParams(), (TypeReference)new TypeReference<Map<String, Object>>(){});
            Object localParams = taskParams.get("localParams");
            if (localParams != null) {
                List properties = JSONUtils.toList((String)JSONUtils.toJsonString(localParams), Property.class);
                Map<String, String> subProcessParam = JSONUtils.toList((String)subProcessInstanceVarPool, Property.class).stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
                List outProperties = properties.stream().filter(r -> Direct.OUT == r.getDirect()).collect(Collectors.toList());
                for (Property info : outProperties) {
                    info.setValue(subProcessParam.get(info.getProp()));
                    varPoolProperties.add(info);
                }
                this.taskInstance.setVarPool(JSONUtils.toJsonString((Object)varPoolProperties));
                this.processService.changeOutParam(this.taskInstance);
            }
        }
    }

    @Override
    protected boolean pauseTask() {
        this.pauseSubWorkFlow();
        return true;
    }

    private boolean pauseSubWorkFlow() {
        ProcessInstance subProcessInstance = this.processService.findSubProcessInstance(Integer.valueOf(this.processInstance.getId()), Integer.valueOf(this.taskInstance.getId()));
        if (subProcessInstance == null || this.taskInstance.getState().typeIsFinished() || subProcessInstance.getState().typeIsFinished()) {
            return false;
        }
        subProcessInstance.setState(ExecutionStatus.READY_PAUSE);
        this.processService.updateProcessInstance(subProcessInstance);
        this.sendToSubProcess();
        return true;
    }

    private boolean setSubWorkFlow() {
        this.logger.info("set work flow {} task {} running", (Object)this.processInstance.getId(), (Object)this.taskInstance.getId());
        if (this.subProcessInstance != null) {
            return true;
        }
        this.subProcessInstance = this.processService.findSubProcessInstance(Integer.valueOf(this.processInstance.getId()), Integer.valueOf(this.taskInstance.getId()));
        if (this.subProcessInstance == null || this.taskInstance.getState().typeIsFinished()) {
            return false;
        }
        this.taskInstance.setHost(NetUtils.getAddr((int)this.masterConfig.getListenPort()));
        this.taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
        this.taskInstance.setStartTime(new Date());
        this.processService.updateTaskInstance(this.taskInstance);
        this.logger.info("set sub work flow {} task {} state: {}", new Object[]{this.processInstance.getId(), this.taskInstance.getId(), this.taskInstance.getState()});
        return true;
    }

    @Override
    protected boolean killTask() {
        ProcessInstance subProcessInstance = this.processService.findSubProcessInstance(Integer.valueOf(this.processInstance.getId()), Integer.valueOf(this.taskInstance.getId()));
        if (subProcessInstance == null || this.taskInstance.getState().typeIsFinished() || subProcessInstance.getState().typeIsFinished()) {
            return false;
        }
        subProcessInstance.setState(ExecutionStatus.READY_STOP);
        this.processService.updateProcessInstance(subProcessInstance);
        this.sendToSubProcess();
        return true;
    }

    private void sendToSubProcess() {
        StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(this.processInstance.getId(), this.taskInstance.getId(), this.subProcessInstance.getState(), this.subProcessInstance.getId(), 0);
        this.stateEventCallbackService.sendResult(new Host(this.subProcessInstance.getHost()), stateEventChangeCommand.convert2Command());
    }

    @Override
    public String getType() {
        return "SUB_PROCESS";
    }
}

