package org.apache.dolphinscheduler.server.master.runner;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import lombok.NonNull;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Component
/* loaded from: input_file:org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.class */
public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
    private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteThreadPool.class);

    @Autowired
    private MasterConfig masterConfig;

    @Autowired
    private ProcessService processService;

    @Autowired
    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;

    @Autowired
    private StateEventCallbackService stateEventCallbackService;

    @Autowired
    private StateWheelExecuteThread stateWheelExecuteThread;
    private ConcurrentHashMap<String, WorkflowExecuteRunnable> multiThreadFilterMap = new ConcurrentHashMap<>();

    @PostConstruct
    private void init() {
        setDaemon(true);
        setThreadNamePrefix("WorkflowExecuteThread-");
        setMaxPoolSize(this.masterConfig.getExecThreads());
        setCorePoolSize(this.masterConfig.getExecThreads());
    }

    public void submitStateEvent(StateEvent stateEvent) {
        WorkflowExecuteRunnable byProcessInstanceId = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
        if (byProcessInstanceId == null) {
            logger.warn("Submit state event error, cannot from workflowExecuteThread from cache manager, stateEvent:{}", stateEvent);
        } else {
            byProcessInstanceId.addStateEvent(stateEvent);
            logger.info("Submit state event success, stateEvent: {}", stateEvent);
        }
    }

    public void executeEvent(final WorkflowExecuteRunnable workflowExecuteRunnable) {
        if (!workflowExecuteRunnable.isStart() || workflowExecuteRunnable.eventSize() == 0) {
            return;
        }
        if (this.multiThreadFilterMap.containsKey(workflowExecuteRunnable.getKey())) {
            logger.warn("The workflow has been executed by another thread");
            return;
        }
        this.multiThreadFilterMap.put(workflowExecuteRunnable.getKey(), workflowExecuteRunnable);
        final int id = workflowExecuteRunnable.getProcessInstance().getId();
        Objects.requireNonNull(workflowExecuteRunnable);
        submitListenable(workflowExecuteRunnable::handleEvents).addCallback(new ListenableFutureCallback() { // from class: org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool.1
            public void onFailure(Throwable th) {
                LoggerUtils.setWorkflowInstanceIdMDC(id);
                try {
                    WorkflowExecuteThreadPool.logger.error("Workflow instance events handle failed", th);
                    WorkflowExecuteThreadPool.this.multiThreadFilterMap.remove(workflowExecuteRunnable.getKey());
                } finally {
                    LoggerUtils.removeWorkflowInstanceIdMDC();
                }
            }

            public void onSuccess(Object obj) {
                try {
                    LoggerUtils.setWorkflowInstanceIdMDC(workflowExecuteRunnable.getProcessInstance().getId());
                    if (workflowExecuteRunnable.workFlowFinish()) {
                        WorkflowExecuteThreadPool.this.stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteRunnable.getProcessInstance().getId());
                        WorkflowExecuteThreadPool.this.processInstanceExecCacheManager.removeByProcessInstanceId(id);
                        WorkflowExecuteThreadPool.this.notifyProcessChanged(workflowExecuteRunnable.getProcessInstance());
                        WorkflowExecuteThreadPool.logger.info("Workflow instance is finished.");
                    }
                } catch (Exception e) {
                    WorkflowExecuteThreadPool.logger.error("Workflow instance is finished, but notify changed error", e);
                } finally {
                    WorkflowExecuteThreadPool.this.multiThreadFilterMap.remove(workflowExecuteRunnable.getKey());
                    LoggerUtils.removeWorkflowInstanceIdMDC();
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyProcessChanged(ProcessInstance processInstance) {
        if (Flag.NO == processInstance.getIsSubProcess()) {
            return;
        }
        for (Map.Entry entry : this.processService.notifyProcessList(processInstance.getId()).entrySet()) {
            ProcessInstance processInstance2 = (ProcessInstance) entry.getKey();
            TaskInstance taskInstance = (TaskInstance) entry.getValue();
            if (processInstance2.getHost().equalsIgnoreCase(NetUtils.getAddr(this.masterConfig.getListenPort()))) {
                notifyMyself(processInstance2, taskInstance);
            } else {
                notifyProcess(processInstance, processInstance2, taskInstance);
            }
        }
    }

    private void notifyMyself(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
        if (processInstance == null) {
            throw new NullPointerException("processInstance is marked non-null but is null");
        }
        if (taskInstance == null) {
            throw new NullPointerException("taskInstance is marked non-null but is null");
        }
        if (!this.processInstanceExecCacheManager.contains(processInstance.getId())) {
            logger.warn("The execute cache manager doesn't contains this workflow instance");
            return;
        }
        StateEvent stateEvent = new StateEvent();
        stateEvent.setTaskInstanceId(taskInstance.getId());
        stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
        stateEvent.setProcessInstanceId(processInstance.getId());
        stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
        submitStateEvent(stateEvent);
    }

    private void notifyProcess(ProcessInstance processInstance, ProcessInstance processInstance2, TaskInstance taskInstance) {
        String host = processInstance2.getHost();
        if (StringUtils.isEmpty(host)) {
            logger.error("process {} host is empty, cannot notify task {} now", Integer.valueOf(processInstance2.getId()), Integer.valueOf(taskInstance.getId()));
        } else {
            this.stateEventCallbackService.sendResult(new Host(host), new StateEventChangeCommand(processInstance.getId(), 0, processInstance.getState(), processInstance2.getId(), taskInstance.getId()).convert2Command());
        }
    }
}
