/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.runner;

import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.cache.StreamTaskInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.runner.StreamTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.StreamTaskExecuteThreadPool;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class EventExecuteService
extends BaseDaemonThread {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(EventExecuteService.class);
    @Autowired
    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
    @Autowired
    private StreamTaskInstanceExecCacheManager streamTaskInstanceExecCacheManager;
    @Autowired
    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
    @Autowired
    private StreamTaskExecuteThreadPool streamTaskExecuteThreadPool;

    protected EventExecuteService() {
        super("EventServiceStarted");
    }

    public synchronized void start() {
        log.info("Master Event execute service starting");
        super.start();
        log.info("Master Event execute service started");
    }

    public void run() {
        while (!ServerLifeCycleManager.isStopped()) {
            try {
                this.workflowEventHandler();
                this.streamTaskEventHandler();
                TimeUnit.MILLISECONDS.sleep(100L);
            }
            catch (InterruptedException interruptedException) {
                log.warn("Master event service interrupted, will exit this loop", (Throwable)interruptedException);
                Thread.currentThread().interrupt();
                break;
            }
            catch (Exception e) {
                log.error("Master event execute service error", (Throwable)e);
            }
        }
    }

    private void workflowEventHandler() {
        for (WorkflowExecuteRunnable workflowExecuteThread : this.processInstanceExecCacheManager.getAll()) {
            try {
                LogUtils.setWorkflowInstanceIdMDC((Integer)workflowExecuteThread.getWorkflowExecuteContext().getWorkflowInstance().getId());
                this.workflowExecuteThreadPool.executeEvent(workflowExecuteThread);
            }
            finally {
                LogUtils.removeWorkflowInstanceIdMDC();
            }
        }
    }

    private void streamTaskEventHandler() {
        for (StreamTaskExecuteRunnable streamTaskExecuteRunnable : this.streamTaskInstanceExecCacheManager.getAll()) {
            try {
                LogUtils.setTaskInstanceIdMDC((Integer)streamTaskExecuteRunnable.getTaskInstance().getId());
                this.streamTaskExecuteThreadPool.executeEvent(streamTaskExecuteRunnable);
            }
            finally {
                LogUtils.removeWorkflowInstanceIdMDC();
            }
        }
    }
}

