/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.runner;

import com.google.common.base.Strings;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import lombok.Generated;
import lombok.NonNull;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.remote.command.workflow.WorkflowStateEventChangeRequest;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
import org.apache.dolphinscheduler.server.master.runner.IWorkflowExecuteContext;
import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Component
public class WorkflowExecuteThreadPool
extends ThreadPoolTaskExecutor {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(WorkflowExecuteThreadPool.class);
    @Autowired
    private MasterConfig masterConfig;
    @Autowired
    private ProcessService processService;
    @Autowired
    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
    @Autowired
    private StateEventCallbackService stateEventCallbackService;
    @Autowired
    private StateWheelExecuteThread stateWheelExecuteThread;
    private ConcurrentHashMap<Integer, WorkflowExecuteRunnable> multiThreadFilterMap = new ConcurrentHashMap();

    @PostConstruct
    private void init() {
        this.setDaemon(true);
        this.setThreadNamePrefix("WorkflowExecuteThread-");
        this.setMaxPoolSize(this.masterConfig.getExecThreads());
        this.setCorePoolSize(this.masterConfig.getExecThreads());
    }

    public void submitStateEvent(StateEvent stateEvent) {
        WorkflowExecuteRunnable workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
        if (workflowExecuteThread == null) {
            log.warn("Submit state event error, cannot from workflowExecuteThread from cache manager, stateEvent:{}", (Object)stateEvent);
            return;
        }
        workflowExecuteThread.addStateEvent(stateEvent);
        log.info("Submit state event success, stateEvent: {}", (Object)stateEvent);
    }

    public void executeEvent(final WorkflowExecuteRunnable workflowExecuteThread) {
        if (!workflowExecuteThread.isStart() || workflowExecuteThread.eventSize() == 0) {
            return;
        }
        IWorkflowExecuteContext workflowExecuteRunnableContext = workflowExecuteThread.getWorkflowExecuteContext();
        final Integer workflowInstanceId = workflowExecuteRunnableContext.getWorkflowInstance().getId();
        if (this.multiThreadFilterMap.containsKey(workflowInstanceId)) {
            log.debug("The workflow has been executed by another thread");
            return;
        }
        this.multiThreadFilterMap.put(workflowInstanceId, workflowExecuteThread);
        ListenableFuture future = this.submitListenable(workflowExecuteThread::handleEvents);
        future.addCallback(new ListenableFutureCallback(){

            public void onFailure(Throwable ex) {
                LogUtils.setWorkflowInstanceIdMDC((Integer)workflowInstanceId);
                try {
                    log.error("Workflow instance events handle failed", ex);
                    WorkflowExecuteThreadPool.this.notifyProcessChanged(workflowExecuteThread.getWorkflowExecuteContext().getWorkflowInstance());
                    WorkflowExecuteThreadPool.this.multiThreadFilterMap.remove(workflowInstanceId);
                }
                finally {
                    LogUtils.removeWorkflowInstanceIdMDC();
                }
            }

            public void onSuccess(Object result) {
                try {
                    LogUtils.setWorkflowInstanceIdMDC((Integer)workflowExecuteThread.getWorkflowExecuteContext().getWorkflowInstance().getId());
                    if (workflowExecuteThread.workFlowFinish() && workflowExecuteThread.eventSize() == 0) {
                        WorkflowExecuteThreadPool.this.stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getWorkflowExecuteContext().getWorkflowInstance().getId());
                        WorkflowExecuteThreadPool.this.processInstanceExecCacheManager.removeByProcessInstanceId(workflowInstanceId);
                        WorkflowExecuteThreadPool.this.notifyProcessChanged(workflowExecuteThread.getWorkflowExecuteContext().getWorkflowInstance());
                        log.info("Workflow instance is finished.");
                    }
                }
                catch (Exception e) {
                    log.error("Workflow instance is finished, but notify changed error", (Throwable)e);
                }
                finally {
                    WorkflowExecuteThreadPool.this.multiThreadFilterMap.remove(workflowInstanceId);
                    LogUtils.removeWorkflowInstanceIdMDC();
                }
            }
        });
    }

    private void notifyProcessChanged(ProcessInstance finishProcessInstance) {
        if (Flag.NO == finishProcessInstance.getIsSubProcess()) {
            return;
        }
        Map fatherMaps = this.processService.notifyProcessList(finishProcessInstance.getId().intValue());
        for (Map.Entry entry : fatherMaps.entrySet()) {
            ProcessInstance processInstance = (ProcessInstance)entry.getKey();
            TaskInstance taskInstance = (TaskInstance)entry.getValue();
            this.crossWorkflowParameterPassing(finishProcessInstance, taskInstance);
            String address = NetUtils.getAddr((int)this.masterConfig.getListenPort());
            LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setWorkflowAndTaskInstanceIDMDC((Integer)processInstance.getId(), (Integer)taskInstance.getId());
            Throwable throwable = null;
            try {
                if (processInstance.getHost().equalsIgnoreCase(address)) {
                    log.info("Process host is local master, will notify it");
                    this.notifyMyself(processInstance, taskInstance);
                    continue;
                }
                log.info("Process host is remote master, will notify it");
                this.notifyProcess(finishProcessInstance, processInstance, taskInstance);
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (mdcAutoClosableContext == null) continue;
                if (throwable != null) {
                    try {
                        mdcAutoClosableContext.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                mdcAutoClosableContext.close();
            }
        }
    }

    private void crossWorkflowParameterPassing(ProcessInstance finishProcessInstance, TaskInstance taskInstance) {
        try {
            MasterTaskExecuteRunnable masterTaskExecuteRunnable = MasterTaskExecuteRunnableHolder.getMasterTaskExecuteRunnable(taskInstance.getId());
            masterTaskExecuteRunnable.getILogicTask().getTaskParameters().setVarPool(finishProcessInstance.getVarPool());
            log.info("Cross workflow parameter passing success, finishProcessInstanceId: {}, taskInstanceId: {}", (Object)finishProcessInstance.getId(), (Object)taskInstance.getId());
        }
        catch (Exception ex) {
            log.error("Cross workflow parameter passing error, finishProcessInstanceId: {}, taskInstanceId: {}", new Object[]{finishProcessInstance.getId(), taskInstance.getId(), ex});
        }
    }

    private void notifyMyself(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
        if (processInstance == null) {
            throw new NullPointerException("processInstance is marked non-null but is null");
        }
        if (taskInstance == null) {
            throw new NullPointerException("taskInstance is marked non-null but is null");
        }
        if (!this.processInstanceExecCacheManager.contains(processInstance.getId())) {
            log.warn("The execute cache manager doesn't contains this workflow instance");
            return;
        }
        TaskStateEvent stateEvent = TaskStateEvent.builder().processInstanceId(processInstance.getId()).taskInstanceId(taskInstance.getId()).type(StateEventType.TASK_STATE_CHANGE).status(TaskExecutionStatus.RUNNING_EXECUTION).build();
        this.submitStateEvent(stateEvent);
    }

    private void notifyProcess(ProcessInstance finishProcessInstance, ProcessInstance processInstance, TaskInstance taskInstance) {
        String processInstanceHost = processInstance.getHost();
        if (Strings.isNullOrEmpty((String)processInstanceHost)) {
            log.error("Process {} host is empty, cannot notify task {} now, taskId: {}", new Object[]{processInstance.getName(), taskInstance.getName(), taskInstance.getId()});
            return;
        }
        WorkflowStateEventChangeRequest workflowStateEventChangeRequest = new WorkflowStateEventChangeRequest(finishProcessInstance.getId().intValue(), 0, finishProcessInstance.getState(), processInstance.getId().intValue(), taskInstance.getId().intValue());
        Host host = new Host(processInstanceHost);
        this.stateEventCallbackService.sendResult(host, workflowStateEventChangeRequest.convert2Command());
    }
}

