package org.apache.dolphinscheduler.server.master.runner;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/dolphinscheduler/server/master/runner/EventExecuteService.class */
public class EventExecuteService extends Thread {
    private static final Logger logger = LoggerFactory.getLogger(EventExecuteService.class);

    @Autowired
    private ProcessService processService;

    @Autowired
    private MasterConfig masterConfig;
    private ExecutorService eventExecService;
    private StateEventCallbackService stateEventCallbackService;
    private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps;
    private ConcurrentHashMap<String, WorkflowExecuteThread> eventHandlerMap = new ConcurrentHashMap<>();
    ListeningExecutorService listeningExecutorService;

    public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> concurrentHashMap) {
        this.eventExecService = ThreadUtils.newDaemonFixedThreadExecutor("MasterEventExecution", this.masterConfig.getMasterExecThreads());
        this.processInstanceExecMaps = concurrentHashMap;
        this.listeningExecutorService = MoreExecutors.listeningDecorator(this.eventExecService);
        this.stateEventCallbackService = (StateEventCallbackService) SpringApplicationContext.getBean(StateEventCallbackService.class);
    }

    @Override // java.lang.Thread
    public synchronized void start() {
        super.setName("EventServiceStarted");
        super.start();
    }

    public void close() {
        this.eventExecService.shutdown();
        logger.info("event service stopped...");
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        logger.info("Event service started");
        while (Stopper.isRunning()) {
            try {
                eventHandler();
                TimeUnit.MILLISECONDS.sleep(1000L);
            } catch (Exception e) {
                logger.error("Event service thread error", e);
            }
        }
    }

    private void eventHandler() {
        for (final WorkflowExecuteThread workflowExecuteThread : this.processInstanceExecMaps.values()) {
            if (workflowExecuteThread.eventSize() != 0 && !StringUtils.isEmpty(workflowExecuteThread.getKey()) && workflowExecuteThread.isStart() && !this.eventHandlerMap.containsKey(workflowExecuteThread.getKey())) {
                final int id = workflowExecuteThread.getProcessInstance().getId();
                logger.info("handle process instance : {} , events count:{}", Integer.valueOf(id), Integer.valueOf(workflowExecuteThread.eventSize()));
                logger.info("already exists handler process size:{}", Integer.valueOf(this.eventHandlerMap.size()));
                this.eventHandlerMap.put(workflowExecuteThread.getKey(), workflowExecuteThread);
                Futures.addCallback(this.listeningExecutorService.submit(workflowExecuteThread), new FutureCallback() { // from class: org.apache.dolphinscheduler.server.master.runner.EventExecuteService.1
                    public void onSuccess(Object obj) {
                        if (workflowExecuteThread.workFlowFinish()) {
                            EventExecuteService.this.processInstanceExecMaps.remove(Integer.valueOf(id));
                            notifyProcessChanged();
                            EventExecuteService.logger.info("process instance {} finished.", Integer.valueOf(id));
                        }
                        if (workflowExecuteThread.getProcessInstance().getId() != id) {
                            EventExecuteService.this.processInstanceExecMaps.remove(Integer.valueOf(id));
                            EventExecuteService.this.processInstanceExecMaps.put(Integer.valueOf(workflowExecuteThread.getProcessInstance().getId()), workflowExecuteThread);
                        }
                        EventExecuteService.this.eventHandlerMap.remove(workflowExecuteThread.getKey());
                    }

                    private void notifyProcessChanged() {
                        if (Flag.NO == workflowExecuteThread.getProcessInstance().getIsSubProcess()) {
                            return;
                        }
                        Map notifyProcessList = EventExecuteService.this.processService.notifyProcessList(id);
                        for (ProcessInstance processInstance : notifyProcessList.keySet()) {
                            if (processInstance.getHost().equalsIgnoreCase(NetUtils.getAddr(EventExecuteService.this.masterConfig.getListenPort()))) {
                                notifyMyself(processInstance, (TaskInstance) notifyProcessList.get(processInstance));
                            } else {
                                notifyProcess(processInstance, (TaskInstance) notifyProcessList.get(processInstance));
                            }
                        }
                    }

                    private void notifyMyself(ProcessInstance processInstance, TaskInstance taskInstance) {
                        EventExecuteService.logger.info("notify process {} task {} state change", Integer.valueOf(processInstance.getId()), Integer.valueOf(taskInstance.getId()));
                        if (EventExecuteService.this.processInstanceExecMaps.containsKey(Integer.valueOf(processInstance.getId()))) {
                            WorkflowExecuteThread workflowExecuteThread2 = (WorkflowExecuteThread) EventExecuteService.this.processInstanceExecMaps.get(Integer.valueOf(processInstance.getId()));
                            StateEvent stateEvent = new StateEvent();
                            stateEvent.setTaskInstanceId(taskInstance.getId());
                            stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
                            stateEvent.setProcessInstanceId(processInstance.getId());
                            stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
                            workflowExecuteThread2.addStateEvent(stateEvent);
                        }
                    }

                    private void notifyProcess(ProcessInstance processInstance, TaskInstance taskInstance) {
                        String host = processInstance.getHost();
                        if (StringUtils.isEmpty(host)) {
                            EventExecuteService.logger.info("process {} host is empty, cannot notify task {} now.", Integer.valueOf(processInstance.getId()), Integer.valueOf(taskInstance.getId()));
                            return;
                        }
                        String str = host.split(":")[0];
                        int parseInt = Integer.parseInt(host.split(":")[1]);
                        EventExecuteService.logger.info("notify process {} task {} state change, host:{}", new Object[]{Integer.valueOf(processInstance.getId()), Integer.valueOf(taskInstance.getId()), host});
                        EventExecuteService.this.stateEventCallbackService.sendResult(str, parseInt, new StateEventChangeCommand(id, 0, workflowExecuteThread.getProcessInstance().getState(), processInstance.getId(), taskInstance.getId()).convert2Command());
                    }

                    public void onFailure(Throwable th) {
                        EventExecuteService.logger.info("handle events {} failed.", Integer.valueOf(id));
                        EventExecuteService.logger.info("handle events failed.", th);
                    }
                }, this.listeningExecutorService);
            }
        }
    }
}
