/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.processor.queue;

import io.netty.channel.Channel;
import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import lombok.Generated;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.remote.command.StateEventResponse;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class StateEventResponseService {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(StateEventResponseService.class);
    private final BlockingQueue<StateEvent> eventQueue = new LinkedBlockingQueue<StateEvent>(5000);
    private Thread responseWorker;
    @Autowired
    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
    @Autowired
    private WorkflowExecuteThreadPool workflowExecuteThreadPool;

    @PostConstruct
    public void start() {
        this.responseWorker = new StateEventResponseWorker();
        this.responseWorker.start();
    }

    @PreDestroy
    public void stop() {
        this.responseWorker.interrupt();
        if (!this.eventQueue.isEmpty()) {
            ArrayList remainEvents = new ArrayList(this.eventQueue.size());
            this.eventQueue.drainTo(remainEvents);
            for (StateEvent event : remainEvents) {
                LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setWorkflowAndTaskInstanceIDMDC((Integer)event.getProcessInstanceId(), (Integer)event.getTaskInstanceId());
                Throwable throwable = null;
                try {
                    this.persist(event);
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (mdcAutoClosableContext == null) continue;
                    if (throwable != null) {
                        try {
                            mdcAutoClosableContext.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    mdcAutoClosableContext.close();
                }
            }
        }
    }

    public void addStateChangeEvent(StateEvent stateEvent) {
        try {
            this.eventQueue.put(stateEvent);
        }
        catch (InterruptedException e) {
            log.error("Put state event : {} error", (Object)stateEvent, (Object)e);
            Thread.currentThread().interrupt();
        }
    }

    private void writeResponse(StateEvent stateEvent) {
        Channel channel = stateEvent.getChannel();
        if (channel != null) {
            StateEventResponse command = new StateEventResponse(stateEvent.getKey());
            channel.writeAndFlush((Object)command.convert2Command());
        }
    }

    private void persist(StateEvent stateEvent) {
        try {
            if (!this.processInstanceExecCacheManager.contains(stateEvent.getProcessInstanceId())) {
                log.warn("Persist event into workflow execute thread error, cannot find the workflow instance from cache manager, event: {}", (Object)stateEvent);
                this.writeResponse(stateEvent);
                return;
            }
            WorkflowExecuteRunnable workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
            switch (stateEvent.getType()) {
                case TASK_STATE_CHANGE: {
                    workflowExecuteThread.refreshTaskInstance(stateEvent.getTaskInstanceId());
                    break;
                }
                case PROCESS_STATE_CHANGE: {
                    workflowExecuteThread.refreshProcessInstance(stateEvent.getProcessInstanceId());
                    break;
                }
            }
            this.workflowExecuteThreadPool.submitStateEvent(stateEvent);
            this.writeResponse(stateEvent);
        }
        catch (Exception e) {
            log.error("Persist event queue error, event: {}", (Object)stateEvent, (Object)e);
        }
    }

    public void addEvent2WorkflowExecute(StateEvent stateEvent) {
        this.workflowExecuteThreadPool.submitStateEvent(stateEvent);
    }

    class StateEventResponseWorker
    extends BaseDaemonThread {
        protected StateEventResponseWorker() {
            super("StateEventResponseWorker");
        }

        public void run() {
            log.info("State event loop service started");
            while (!ServerLifeCycleManager.isStopped()) {
                StateEvent stateEvent;
                try {
                    stateEvent = (StateEvent)StateEventResponseService.this.eventQueue.take();
                }
                catch (InterruptedException e) {
                    log.warn("State event loop service interrupted, will stop loop");
                    Thread.currentThread().interrupt();
                    break;
                }
                LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setWorkflowAndTaskInstanceIDMDC((Integer)stateEvent.getProcessInstanceId(), (Integer)stateEvent.getTaskInstanceId());
                Throwable throwable = null;
                try {
                    StateEventResponseService.this.persist(stateEvent);
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (mdcAutoClosableContext == null) continue;
                    if (throwable != null) {
                        try {
                            mdcAutoClosableContext.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    mdcAutoClosableContext.close();
                }
            }
            log.info("State event loop service stopped");
        }
    }
}

