/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.runner;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Component
public class WorkflowExecuteThreadPool
extends ThreadPoolTaskExecutor {
    private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteThreadPool.class);
    @Autowired
    private MasterConfig masterConfig;
    @Autowired
    private ProcessService processService;
    @Autowired
    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
    @Autowired
    private StateEventCallbackService stateEventCallbackService;
    @Autowired
    private StateWheelExecuteThread stateWheelExecuteThread;
    private ConcurrentHashMap<String, WorkflowExecuteThread> multiThreadFilterMap = new ConcurrentHashMap();

    @PostConstruct
    private void init() {
        this.setDaemon(true);
        this.setThreadNamePrefix("Workflow-Execute-Thread-");
        this.setMaxPoolSize(this.masterConfig.getExecThreads());
        this.setCorePoolSize(this.masterConfig.getExecThreads());
    }

    public void submitStateEvent(StateEvent stateEvent) {
        WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
        if (workflowExecuteThread == null) {
            logger.warn("workflowExecuteThread is null, stateEvent:{}", (Object)stateEvent);
            return;
        }
        workflowExecuteThread.addStateEvent(stateEvent);
    }

    public void startWorkflow(WorkflowExecuteThread workflowExecuteThread) {
        this.submit(workflowExecuteThread::startProcess);
    }

    public void executeEvent(final WorkflowExecuteThread workflowExecuteThread) {
        if (!workflowExecuteThread.isStart() || workflowExecuteThread.eventSize() == 0) {
            return;
        }
        if (this.multiThreadFilterMap.containsKey(workflowExecuteThread.getKey())) {
            return;
        }
        this.multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread);
        final int processInstanceId = workflowExecuteThread.getProcessInstance().getId();
        ListenableFuture future = this.submitListenable(workflowExecuteThread::handleEvents);
        future.addCallback(new ListenableFutureCallback(){

            public void onFailure(Throwable ex) {
                logger.error("handle events {} failed", (Object)processInstanceId, (Object)ex);
                WorkflowExecuteThreadPool.this.multiThreadFilterMap.remove(workflowExecuteThread.getKey());
            }

            public void onSuccess(Object result) {
                try {
                    if (workflowExecuteThread.workFlowFinish()) {
                        WorkflowExecuteThreadPool.this.stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance());
                        WorkflowExecuteThreadPool.this.processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId);
                        WorkflowExecuteThreadPool.this.notifyProcessChanged(workflowExecuteThread.getProcessInstance());
                        logger.info("process instance {} finished.", (Object)processInstanceId);
                    }
                }
                catch (Exception e) {
                    logger.error("handle events {} success, but notify changed error", (Object)processInstanceId, (Object)e);
                }
                WorkflowExecuteThreadPool.this.multiThreadFilterMap.remove(workflowExecuteThread.getKey());
            }
        });
    }

    private void notifyProcessChanged(ProcessInstance finishProcessInstance) {
        if (Flag.NO == finishProcessInstance.getIsSubProcess()) {
            return;
        }
        Map fatherMaps = this.processService.notifyProcessList(finishProcessInstance.getId());
        for (ProcessInstance processInstance : fatherMaps.keySet()) {
            String address = NetUtils.getAddr((int)this.masterConfig.getListenPort());
            if (processInstance.getHost().equalsIgnoreCase(address)) {
                this.notifyMyself(processInstance, (TaskInstance)fatherMaps.get(processInstance));
                continue;
            }
            this.notifyProcess(finishProcessInstance, processInstance, (TaskInstance)fatherMaps.get(processInstance));
        }
    }

    private void notifyMyself(ProcessInstance processInstance, TaskInstance taskInstance) {
        logger.info("notify process {} task {} state change", (Object)processInstance.getId(), (Object)taskInstance.getId());
        if (!this.processInstanceExecCacheManager.contains(processInstance.getId())) {
            return;
        }
        StateEvent stateEvent = new StateEvent();
        stateEvent.setTaskInstanceId(taskInstance.getId());
        stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
        stateEvent.setProcessInstanceId(processInstance.getId());
        stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
        this.submitStateEvent(stateEvent);
    }

    private void notifyProcess(ProcessInstance finishProcessInstance, ProcessInstance processInstance, TaskInstance taskInstance) {
        String host = processInstance.getHost();
        if (StringUtils.isEmpty((String)host)) {
            logger.error("process {} host is empty, cannot notify task {} now", (Object)processInstance.getId(), (Object)taskInstance.getId());
            return;
        }
        String address = host.split(":")[0];
        int port = Integer.parseInt(host.split(":")[1]);
        StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(finishProcessInstance.getId(), 0, finishProcessInstance.getState(), processInstance.getId(), taskInstance.getId());
        this.stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command());
    }
}

