package org.apache.dolphinscheduler.server.master.processor.queue;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.class */
public class TaskResponseService {

    @Autowired
    private ProcessService processService;

    @Autowired
    private MasterConfig masterConfig;
    private Thread taskResponseWorker;
    private Thread taskResponseEventHandler;
    private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMap;
    private ListeningExecutorService listeningExecutorService;
    private ExecutorService eventExecService;
    private final Logger logger = LoggerFactory.getLogger(TaskResponseService.class);
    private final BlockingQueue<TaskResponseEvent> eventQueue = new LinkedBlockingQueue();
    private final ConcurrentHashMap<String, TaskResponsePersistThread> taskResponseEventHandlerMap = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<Integer, TaskResponsePersistThread> processTaskResponseMap = new ConcurrentHashMap<>();

    /* loaded from: input_file:org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService$TaskResponseEventHandler.class */
    class TaskResponseEventHandler extends Thread {
        TaskResponseEventHandler() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            TaskResponseService.this.logger.info("event handler thread started");
            while (Stopper.isRunning()) {
                try {
                    eventHandler();
                    TimeUnit.MILLISECONDS.sleep(1000L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                } catch (Exception e2) {
                    TaskResponseService.this.logger.error("event handler thread error", e2);
                }
            }
        }

        private void eventHandler() {
            for (final TaskResponsePersistThread taskResponsePersistThread : TaskResponseService.this.processTaskResponseMap.values()) {
                if (!TaskResponseService.this.taskResponseEventHandlerMap.containsKey(taskResponsePersistThread.getKey())) {
                    if (taskResponsePersistThread.eventSize() != 0) {
                        TaskResponseService.this.logger.info("already exists handler process size:{}", Integer.valueOf(TaskResponseService.this.taskResponseEventHandlerMap.size()));
                        TaskResponseService.this.taskResponseEventHandlerMap.put(taskResponsePersistThread.getKey(), taskResponsePersistThread);
                        Futures.addCallback(TaskResponseService.this.listeningExecutorService.submit(taskResponsePersistThread), new FutureCallback() { // from class: org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService.TaskResponseEventHandler.1
                            public void onSuccess(Object obj) {
                                TaskResponseService.this.logger.info("handle events {} succeeded.", taskResponsePersistThread.getProcessInstanceId());
                                if (!TaskResponseService.this.processInstanceMap.containsKey(taskResponsePersistThread.getProcessInstanceId())) {
                                    TaskResponseService.this.processTaskResponseMap.remove(taskResponsePersistThread.getProcessInstanceId());
                                    TaskResponseService.this.logger.info("remove process instance: {}", taskResponsePersistThread.getProcessInstanceId());
                                }
                                TaskResponseService.this.taskResponseEventHandlerMap.remove(taskResponsePersistThread.getKey());
                            }

                            public void onFailure(Throwable th) {
                                TaskResponseService.this.logger.error("handle events failed: {}", th.getMessage());
                                if (!TaskResponseService.this.processInstanceMap.containsKey(taskResponsePersistThread.getProcessInstanceId())) {
                                    TaskResponseService.this.processTaskResponseMap.remove(taskResponsePersistThread.getProcessInstanceId());
                                    TaskResponseService.this.logger.info("remove process instance: {}", taskResponsePersistThread.getProcessInstanceId());
                                }
                                TaskResponseService.this.taskResponseEventHandlerMap.remove(taskResponsePersistThread.getKey());
                            }
                        }, TaskResponseService.this.listeningExecutorService);
                    } else if (!TaskResponseService.this.processInstanceMap.containsKey(taskResponsePersistThread.getProcessInstanceId())) {
                        TaskResponseService.this.processTaskResponseMap.remove(taskResponsePersistThread.getProcessInstanceId());
                        TaskResponseService.this.logger.info("remove process instance: {}", taskResponsePersistThread.getProcessInstanceId());
                    }
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService$TaskResponseWorker.class */
    class TaskResponseWorker extends Thread {
        TaskResponseWorker() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (Stopper.isRunning()) {
                try {
                    TaskResponseEvent taskResponseEvent = (TaskResponseEvent) TaskResponseService.this.eventQueue.take();
                    if (TaskResponseService.this.processInstanceMap.containsKey(Integer.valueOf(taskResponseEvent.getProcessInstanceId())) && !TaskResponseService.this.processTaskResponseMap.containsKey(Integer.valueOf(taskResponseEvent.getProcessInstanceId()))) {
                        TaskResponseService.this.processTaskResponseMap.put(Integer.valueOf(taskResponseEvent.getProcessInstanceId()), new TaskResponsePersistThread(TaskResponseService.this.processService, TaskResponseService.this.processInstanceMap, Integer.valueOf(taskResponseEvent.getProcessInstanceId())));
                    }
                    TaskResponsePersistThread taskResponsePersistThread = (TaskResponsePersistThread) TaskResponseService.this.processTaskResponseMap.get(Integer.valueOf(taskResponseEvent.getProcessInstanceId()));
                    if (null != taskResponsePersistThread) {
                        if (taskResponsePersistThread.addEvent(taskResponseEvent)) {
                            TaskResponseService.this.logger.debug("submit task response persist queue success, task instance id:{},process instance id:{}, state:{} ", new Object[]{Integer.valueOf(taskResponseEvent.getTaskInstanceId()), Integer.valueOf(taskResponseEvent.getProcessInstanceId()), taskResponseEvent.getState()});
                        } else {
                            TaskResponseService.this.logger.error("submit task response persist queue error, task instance id:{},process instance id:{} ", Integer.valueOf(taskResponseEvent.getTaskInstanceId()), Integer.valueOf(taskResponseEvent.getProcessInstanceId()));
                        }
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (Exception e2) {
                    TaskResponseService.this.logger.error("handle task error", e2);
                }
            }
            TaskResponseService.this.logger.info("StateEventResponseWorker stopped");
        }
    }

    public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> concurrentHashMap) {
        if (this.processInstanceMap == null) {
            this.processInstanceMap = concurrentHashMap;
        }
    }

    @PostConstruct
    public void start() {
        this.eventExecService = ThreadUtils.newDaemonFixedThreadExecutor("PersistEventState", this.masterConfig.getMasterPersistEventStateThreads());
        this.listeningExecutorService = MoreExecutors.listeningDecorator(this.eventExecService);
        this.taskResponseWorker = new TaskResponseWorker();
        this.taskResponseWorker.setName("TaskResponseWorker");
        this.taskResponseWorker.start();
        this.taskResponseEventHandler = new TaskResponseEventHandler();
        this.taskResponseEventHandler.setName("TaskResponseEventHandler");
        this.taskResponseEventHandler.start();
    }

    @PreDestroy
    public void stop() {
        try {
            this.taskResponseWorker.interrupt();
            this.taskResponseEventHandler.interrupt();
        } catch (Exception e) {
            this.logger.error("stop error:", e);
        }
        this.eventExecService.shutdown();
        boolean z = false;
        try {
            z = this.eventExecService.awaitTermination(5L, TimeUnit.SECONDS);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
        }
        if (z) {
            return;
        }
        this.logger.warn("TaskResponseService: eventExecService shutdown without terminated: {}s, increase await time", 5L);
    }

    public void addResponse(TaskResponseEvent taskResponseEvent) {
        try {
            this.eventQueue.put(taskResponseEvent);
            this.logger.debug("eventQueue size:{}", Integer.valueOf(this.eventQueue.size()));
        } catch (InterruptedException e) {
            this.logger.error("put task : {} error :{}", taskResponseEvent, e);
            Thread.currentThread().interrupt();
        }
    }

    public BlockingQueue<TaskResponseEvent> getEventQueue() {
        return this.eventQueue;
    }
}
