/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.processor.queue;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponsePersistThread;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TaskResponseService {
    private final Logger logger = LoggerFactory.getLogger(TaskResponseService.class);
    private final BlockingQueue<TaskResponseEvent> eventQueue = new LinkedBlockingQueue<TaskResponseEvent>();
    @Autowired
    private ProcessService processService;
    @Autowired
    private MasterConfig masterConfig;
    private Thread taskResponseWorker;
    private Thread taskResponseEventHandler;
    private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMap;
    private final ConcurrentHashMap<String, TaskResponsePersistThread> taskResponseEventHandlerMap = new ConcurrentHashMap();
    private ListeningExecutorService listeningExecutorService;
    private ExecutorService eventExecService;
    private final ConcurrentHashMap<Integer, TaskResponsePersistThread> processTaskResponseMap = new ConcurrentHashMap();

    public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMap) {
        if (this.processInstanceMap == null) {
            this.processInstanceMap = processInstanceMap;
        }
    }

    @PostConstruct
    public void start() {
        this.eventExecService = ThreadUtils.newDaemonFixedThreadExecutor((String)"PersistEventState", (int)this.masterConfig.getMasterPersistEventStateThreads());
        this.listeningExecutorService = MoreExecutors.listeningDecorator((ExecutorService)this.eventExecService);
        this.taskResponseWorker = new TaskResponseWorker();
        this.taskResponseWorker.setName("TaskResponseWorker");
        this.taskResponseWorker.start();
        this.taskResponseEventHandler = new TaskResponseEventHandler();
        this.taskResponseEventHandler.setName("TaskResponseEventHandler");
        this.taskResponseEventHandler.start();
    }

    @PreDestroy
    public void stop() {
        try {
            this.taskResponseWorker.interrupt();
            this.taskResponseEventHandler.interrupt();
        }
        catch (Exception e) {
            this.logger.error("stop error:", (Throwable)e);
        }
        this.eventExecService.shutdown();
        long waitSec = 5L;
        boolean terminated = false;
        try {
            terminated = this.eventExecService.awaitTermination(waitSec, TimeUnit.SECONDS);
        }
        catch (InterruptedException ignore) {
            Thread.currentThread().interrupt();
        }
        if (!terminated) {
            this.logger.warn("TaskResponseService: eventExecService shutdown without terminated: {}s, increase await time", (Object)waitSec);
        }
    }

    public void addResponse(TaskResponseEvent taskResponseEvent) {
        try {
            this.eventQueue.put(taskResponseEvent);
            this.logger.debug("eventQueue size:{}", (Object)this.eventQueue.size());
        }
        catch (InterruptedException e) {
            this.logger.error("put task : {} error :{}", (Object)taskResponseEvent, (Object)e);
            Thread.currentThread().interrupt();
        }
    }

    public BlockingQueue<TaskResponseEvent> getEventQueue() {
        return this.eventQueue;
    }

    class TaskResponseEventHandler
    extends Thread {
        TaskResponseEventHandler() {
        }

        @Override
        public void run() {
            TaskResponseService.this.logger.info("event handler thread started");
            while (Stopper.isRunning()) {
                try {
                    this.eventHandler();
                    TimeUnit.MILLISECONDS.sleep(1000L);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
                catch (Exception e) {
                    TaskResponseService.this.logger.error("event handler thread error", (Throwable)e);
                }
            }
        }

        private void eventHandler() {
            for (final TaskResponsePersistThread taskResponsePersistThread : TaskResponseService.this.processTaskResponseMap.values()) {
                if (TaskResponseService.this.taskResponseEventHandlerMap.containsKey(taskResponsePersistThread.getKey())) continue;
                if (taskResponsePersistThread.eventSize() == 0) {
                    if (TaskResponseService.this.processInstanceMap.containsKey(taskResponsePersistThread.getProcessInstanceId())) continue;
                    TaskResponseService.this.processTaskResponseMap.remove(taskResponsePersistThread.getProcessInstanceId());
                    TaskResponseService.this.logger.info("remove process instance: {}", (Object)taskResponsePersistThread.getProcessInstanceId());
                    continue;
                }
                TaskResponseService.this.logger.info("already exists handler process size:{}", (Object)TaskResponseService.this.taskResponseEventHandlerMap.size());
                TaskResponseService.this.taskResponseEventHandlerMap.put(taskResponsePersistThread.getKey(), taskResponsePersistThread);
                ListenableFuture future = TaskResponseService.this.listeningExecutorService.submit((Runnable)taskResponsePersistThread);
                FutureCallback futureCallback = new FutureCallback(){

                    public void onSuccess(Object o) {
                        TaskResponseService.this.logger.info("handle events {} succeeded.", (Object)taskResponsePersistThread.getProcessInstanceId());
                        if (!TaskResponseService.this.processInstanceMap.containsKey(taskResponsePersistThread.getProcessInstanceId())) {
                            TaskResponseService.this.processTaskResponseMap.remove(taskResponsePersistThread.getProcessInstanceId());
                            TaskResponseService.this.logger.info("remove process instance: {}", (Object)taskResponsePersistThread.getProcessInstanceId());
                        }
                        TaskResponseService.this.taskResponseEventHandlerMap.remove(taskResponsePersistThread.getKey());
                    }

                    public void onFailure(Throwable throwable) {
                        TaskResponseService.this.logger.error("handle events failed: {}", (Object)throwable.getMessage());
                        if (!TaskResponseService.this.processInstanceMap.containsKey(taskResponsePersistThread.getProcessInstanceId())) {
                            TaskResponseService.this.processTaskResponseMap.remove(taskResponsePersistThread.getProcessInstanceId());
                            TaskResponseService.this.logger.info("remove process instance: {}", (Object)taskResponsePersistThread.getProcessInstanceId());
                        }
                        TaskResponseService.this.taskResponseEventHandlerMap.remove(taskResponsePersistThread.getKey());
                    }
                };
                Futures.addCallback((ListenableFuture)future, (FutureCallback)futureCallback, (Executor)TaskResponseService.this.listeningExecutorService);
            }
        }
    }

    class TaskResponseWorker
    extends Thread {
        TaskResponseWorker() {
        }

        @Override
        public void run() {
            while (Stopper.isRunning()) {
                try {
                    TaskResponsePersistThread taskResponsePersistThread;
                    TaskResponseEvent taskResponseEvent = (TaskResponseEvent)TaskResponseService.this.eventQueue.take();
                    if (TaskResponseService.this.processInstanceMap.containsKey(taskResponseEvent.getProcessInstanceId()) && !TaskResponseService.this.processTaskResponseMap.containsKey(taskResponseEvent.getProcessInstanceId())) {
                        taskResponsePersistThread = new TaskResponsePersistThread(TaskResponseService.this.processService, TaskResponseService.this.processInstanceMap, taskResponseEvent.getProcessInstanceId());
                        TaskResponseService.this.processTaskResponseMap.put(taskResponseEvent.getProcessInstanceId(), taskResponsePersistThread);
                    }
                    if (null == (taskResponsePersistThread = (TaskResponsePersistThread)TaskResponseService.this.processTaskResponseMap.get(taskResponseEvent.getProcessInstanceId()))) continue;
                    if (taskResponsePersistThread.addEvent(taskResponseEvent)) {
                        TaskResponseService.this.logger.debug("submit task response persist queue success, task instance id:{},process instance id:{}, state:{} ", new Object[]{taskResponseEvent.getTaskInstanceId(), taskResponseEvent.getProcessInstanceId(), taskResponseEvent.getState()});
                        continue;
                    }
                    TaskResponseService.this.logger.error("submit task response persist queue error, task instance id:{},process instance id:{} ", (Object)taskResponseEvent.getTaskInstanceId(), (Object)taskResponseEvent.getProcessInstanceId());
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
                catch (Exception e) {
                    TaskResponseService.this.logger.error("handle task error", (Throwable)e);
                }
            }
            TaskResponseService.this.logger.info("StateEventResponseWorker stopped");
        }
    }
}

