/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.runner.execute;

import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Generated;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.runner.execute.AsyncMasterTaskDelayQueue;
import org.apache.dolphinscheduler.server.master.runner.execute.AsyncTaskCallbackFunction;
import org.apache.dolphinscheduler.server.master.runner.execute.AsyncTaskExecuteFunction;
import org.apache.dolphinscheduler.server.master.runner.execute.AsyncTaskExecutionContext;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterAsyncTaskExecutorThreadPool;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class AsyncMasterTaskDelayQueueLooper
extends BaseDaemonThread
implements AutoCloseable {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(AsyncMasterTaskDelayQueueLooper.class);
    @Autowired
    private AsyncMasterTaskDelayQueue asyncMasterTaskDelayQueue;
    @Autowired
    private MasterAsyncTaskExecutorThreadPool masterAsyncTaskExecutorThreadPool;
    private static final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);

    public AsyncMasterTaskDelayQueueLooper() {
        super("AsyncMasterTaskDelayQueueLooper");
    }

    public synchronized void start() {
        if (!RUNNING_FLAG.compareAndSet(false, true)) {
            log.info("The AsyncMasterTaskDelayQueueLooper has already been started, will not start again");
            return;
        }
        log.info("AsyncMasterTaskDelayQueueLooper starting...");
        super.start();
        log.info("AsyncMasterTaskDelayQueueLooper started...");
    }

    public void run() {
        while (RUNNING_FLAG.get()) {
            AsyncTaskExecutionContext asyncTaskExecutionContext;
            try {
                asyncTaskExecutionContext = this.asyncMasterTaskDelayQueue.pollAsyncTask();
            }
            catch (InterruptedException e) {
                log.error("AsyncConditionTaskLooper has been interrupted, will break this loop", (Throwable)e);
                Thread.currentThread().interrupt();
                break;
            }
            TaskExecutionContext taskExecutionContext = asyncTaskExecutionContext.getTaskExecutionContext();
            try {
                LogUtils.setWorkflowAndTaskInstanceIDMDC((Integer)taskExecutionContext.getProcessInstanceId(), (Integer)taskExecutionContext.getTaskInstanceId());
                LogUtils.setTaskInstanceLogFullPathMDC((String)taskExecutionContext.getLogPath());
                if (MasterTaskExecutionContextHolder.getTaskExecutionContext(taskExecutionContext.getTaskInstanceId()) == null) {
                    log.warn("Cannot find the taskInstance from TaskExecutionContextCacheManager, the task may already been killed, will stop the async master task");
                    continue;
                }
                this.masterAsyncTaskExecutorThreadPool.getThreadPool().execute(() -> {
                    AsyncTaskExecuteFunction asyncTaskExecuteFunction = asyncTaskExecutionContext.getAsyncTaskExecuteFunction();
                    AsyncTaskCallbackFunction asyncTaskCallbackFunction = asyncTaskExecutionContext.getAsyncTaskCallbackFunction();
                    try {
                        LogUtils.setTaskInstanceLogFullPathMDC((String)taskExecutionContext.getLogPath());
                        LogUtils.setTaskInstanceIdMDC((Integer)taskExecutionContext.getTaskInstanceId());
                        AsyncTaskExecuteFunction.AsyncTaskExecutionStatus asyncTaskExecutionStatus = asyncTaskExecuteFunction.getAsyncTaskExecutionStatus();
                        switch (asyncTaskExecutionStatus) {
                            case RUNNING: {
                                this.asyncMasterTaskDelayQueue.addAsyncTask(asyncTaskExecutionContext);
                                return;
                            }
                            case SUCCESS: {
                                asyncTaskCallbackFunction.executeSuccess();
                                return;
                            }
                            case FAILED: {
                                asyncTaskCallbackFunction.executeFailed();
                                return;
                            }
                        }
                        return;
                    }
                    catch (Exception ex) {
                        asyncTaskCallbackFunction.executeThrowing(ex);
                        return;
                    }
                    finally {
                        LogUtils.removeTaskInstanceLogFullPathMDC();
                        LogUtils.removeTaskInstanceIdMDC();
                    }
                });
            }
            finally {
                LogUtils.removeTaskInstanceLogFullPathMDC();
                LogUtils.removeWorkflowAndTaskInstanceIdMDC();
            }
        }
        log.info("AsyncMasterTaskDelayQueueLooper closed...");
    }

    @Override
    public void close() throws Exception {
        if (!RUNNING_FLAG.compareAndSet(true, false)) {
            log.warn("The AsyncMasterTaskDelayQueueLooper is not started, will not close");
            return;
        }
    }
}

