/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.runner;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Generated;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.GlobalTaskDispatchWaitingQueue;
import org.apache.dolphinscheduler.server.master.runner.dispatcher.TaskDispatchFactory;
import org.apache.dolphinscheduler.server.master.runner.dispatcher.TaskDispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class GlobalTaskDispatchWaitingQueueLooper
extends BaseDaemonThread
implements AutoCloseable {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(GlobalTaskDispatchWaitingQueueLooper.class);
    @Autowired
    private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;
    @Autowired
    private TaskDispatchFactory taskDispatchFactory;
    private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);
    private final AtomicInteger DISPATCHED_CONSECUTIVE_FAILURE_TIMES = new AtomicInteger();
    private static final Integer MAX_DISPATCHED_FAILED_TIMES = 100;

    public GlobalTaskDispatchWaitingQueueLooper() {
        super("GlobalTaskDispatchWaitingQueueLooper");
    }

    public synchronized void start() {
        if (!this.RUNNING_FLAG.compareAndSet(false, true)) {
            log.error("The GlobalTaskDispatchWaitingQueueLooper already started, will not start again");
            return;
        }
        log.info("GlobalTaskDispatchWaitingQueueLooper starting...");
        super.start();
        log.info("GlobalTaskDispatchWaitingQueueLooper started...");
    }

    public void run() {
        while (this.RUNNING_FLAG.get()) {
            DefaultTaskExecuteRunnable defaultTaskExecuteRunnable;
            try {
                defaultTaskExecuteRunnable = this.globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable();
            }
            catch (InterruptedException e) {
                log.warn("Get waiting dispatch task failed, the current thread has been interrupted, will stop loop");
                Thread.currentThread().interrupt();
                break;
            }
            try {
                TaskDispatcher taskDispatcher = this.taskDispatchFactory.getTaskDispatcher(defaultTaskExecuteRunnable.getTaskInstance());
                taskDispatcher.dispatchTask(defaultTaskExecuteRunnable);
                this.DISPATCHED_CONSECUTIVE_FAILURE_TIMES.set(0);
            }
            catch (Exception e) {
                defaultTaskExecuteRunnable.getTaskExecutionContext().increaseDispatchFailTimes();
                this.globalTaskDispatchWaitingQueue.submitTaskExecuteRunnable(defaultTaskExecuteRunnable);
                if (this.DISPATCHED_CONSECUTIVE_FAILURE_TIMES.incrementAndGet() > MAX_DISPATCHED_FAILED_TIMES) {
                    ThreadUtils.sleep((long)10000L);
                }
                log.error("Dispatch Task: {} failed", (Object)defaultTaskExecuteRunnable.getTaskInstance().getName(), (Object)e);
            }
        }
        log.info("GlobalTaskDispatchWaitingQueueLooper started...");
    }

    @Override
    public void close() throws Exception {
        if (this.RUNNING_FLAG.compareAndSet(true, false)) {
            log.info("GlobalTaskDispatchWaitingQueueLooper stopping...");
            log.info("GlobalTaskDispatchWaitingQueueLooper stopped...");
        }
    }
}

