/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.runner;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct;
import lombok.Generated;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.event.WorkflowEvent;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventHandleError;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventHandleException;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventHandler;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class WorkflowEventLooper
extends BaseDaemonThread
implements AutoCloseable {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(WorkflowEventLooper.class);
    @Autowired
    private WorkflowEventQueue workflowEventQueue;
    @Autowired
    private List<WorkflowEventHandler> workflowEventHandlerList;
    private final Map<WorkflowEventType, WorkflowEventHandler> workflowEventHandlerMap = new HashMap<WorkflowEventType, WorkflowEventHandler>();
    private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);

    protected WorkflowEventLooper() {
        super("WorkflowEventLooper");
    }

    @PostConstruct
    public void init() {
        this.workflowEventHandlerList.forEach(workflowEventHandler -> this.workflowEventHandlerMap.put(workflowEventHandler.getHandleWorkflowEventType(), (WorkflowEventHandler)workflowEventHandler));
    }

    public synchronized void start() {
        if (!this.RUNNING_FLAG.compareAndSet(false, true)) {
            log.error("WorkflowEventLooper thread has already started, will not start again");
            return;
        }
        log.info("WorkflowEventLooper starting...");
        super.start();
        log.info("WorkflowEventLooper started...");
    }

    public void run() {
        while (this.RUNNING_FLAG.get()) {
            WorkflowEvent workflowEvent;
            try {
                workflowEvent = this.workflowEventQueue.poolEvent();
            }
            catch (InterruptedException e) {
                log.warn("WorkflowEventLooper thread is interrupted, will close this loop");
                Thread.currentThread().interrupt();
                break;
            }
            try {
                LogUtils.setWorkflowInstanceIdMDC((Integer)workflowEvent.getWorkflowInstanceId());
                log.info("Begin to handle WorkflowEvent: {}", (Object)workflowEvent);
                WorkflowEventHandler workflowEventHandler = this.workflowEventHandlerMap.get((Object)workflowEvent.getWorkflowEventType());
                workflowEventHandler.handleWorkflowEvent(workflowEvent);
                log.info("Success handle WorkflowEvent: {}", (Object)workflowEvent);
            }
            catch (WorkflowEventHandleException workflowEventHandleException) {
                log.error("Handle workflow event failed, will retry again: {}", (Object)workflowEvent, (Object)workflowEventHandleException);
                this.workflowEventQueue.addEvent(workflowEvent);
                ThreadUtils.sleep((long)1000L);
            }
            catch (WorkflowEventHandleError workflowEventHandleError) {
                log.error("Handle workflow event error, will drop this event: {}", (Object)workflowEvent, (Object)workflowEventHandleError);
            }
            catch (Exception unknownException) {
                log.error("Handle workflow event failed, get a unknown exception, will retry again: {}", (Object)workflowEvent, (Object)unknownException);
                this.workflowEventQueue.addEvent(workflowEvent);
                ThreadUtils.sleep((long)1000L);
            }
            finally {
                LogUtils.removeWorkflowInstanceIdMDC();
            }
        }
    }

    @Override
    public void close() throws Exception {
        if (!this.RUNNING_FLAG.compareAndSet(true, false)) {
            log.info("WorkflowEventLooper thread is not start, no need to close");
            return;
        }
        log.info("WorkflowEventLooper is closing...");
        this.interrupt();
        log.info("WorkflowEventLooper closed...");
    }
}

