package org.apache.dolphinscheduler.server.master.processor.queue;

import io.netty.channel.Channel;
import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.service.utils.LoggerUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.class */
public class StateEventResponseService {
    private final Logger logger = LoggerFactory.getLogger(StateEventResponseService.class);
    private final BlockingQueue<StateEvent> eventQueue = new LinkedBlockingQueue(5000);
    private Thread responseWorker;

    @Autowired
    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;

    @Autowired
    private WorkflowExecuteThreadPool workflowExecuteThreadPool;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.dolphinscheduler.server.master.processor.queue.StateEventResponseService$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$dolphinscheduler$common$enums$StateEventType = new int[StateEventType.values().length];

        static {
            try {
                $SwitchMap$org$apache$dolphinscheduler$common$enums$StateEventType[StateEventType.TASK_STATE_CHANGE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$dolphinscheduler$common$enums$StateEventType[StateEventType.PROCESS_STATE_CHANGE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* loaded from: input_file:org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService$StateEventResponseWorker.class */
    class StateEventResponseWorker extends BaseDaemonThread {
        protected StateEventResponseWorker() {
            super("StateEventResponseWorker");
        }

        public void run() {
            StateEventResponseService.this.logger.info("State event loop service started");
            while (!ServerLifeCycleManager.isStopped()) {
                try {
                    try {
                        StateEvent stateEvent = (StateEvent) StateEventResponseService.this.eventQueue.take();
                        LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId());
                        StateEventResponseService.this.persist(stateEvent);
                        LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
                    } catch (InterruptedException e) {
                        StateEventResponseService.this.logger.warn("State event loop service interrupted, will stop this loop", e);
                        Thread.currentThread().interrupt();
                        LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
                    }
                } catch (Throwable th) {
                    LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
                    throw th;
                }
            }
            StateEventResponseService.this.logger.info("State event loop service stopped");
        }
    }

    @PostConstruct
    public void start() {
        this.responseWorker = new StateEventResponseWorker();
        this.responseWorker.start();
    }

    @PreDestroy
    public void stop() {
        this.responseWorker.interrupt();
        if (this.eventQueue.isEmpty()) {
            return;
        }
        ArrayList<StateEvent> arrayList = new ArrayList(this.eventQueue.size());
        this.eventQueue.drainTo(arrayList);
        for (StateEvent stateEvent : arrayList) {
            try {
                LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId());
                persist(stateEvent);
                LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
            } catch (Throwable th) {
                LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
                throw th;
            }
        }
    }

    public void addStateChangeEvent(StateEvent stateEvent) {
        try {
            this.eventQueue.put(stateEvent);
        } catch (InterruptedException e) {
            this.logger.error("Put state event : {} error", stateEvent, e);
            Thread.currentThread().interrupt();
        }
    }

    private void writeResponse(StateEvent stateEvent) {
        Channel channel = stateEvent.getChannel();
        if (channel != null) {
            channel.writeAndFlush(new StateEventResponseCommand(stateEvent.getKey()).convert2Command());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void persist(StateEvent stateEvent) {
        try {
            if (!this.processInstanceExecCacheManager.contains(stateEvent.getProcessInstanceId())) {
                this.logger.warn("Persist event into workflow execute thread error, cannot find the workflow instance from cache manager, event: {}", stateEvent);
                writeResponse(stateEvent);
                return;
            }
            WorkflowExecuteRunnable byProcessInstanceId = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
            switch (AnonymousClass1.$SwitchMap$org$apache$dolphinscheduler$common$enums$StateEventType[stateEvent.getType().ordinal()]) {
                case 1:
                    byProcessInstanceId.refreshTaskInstance(stateEvent.getTaskInstanceId());
                    break;
                case 2:
                    byProcessInstanceId.refreshProcessInstance(stateEvent.getProcessInstanceId());
                    break;
            }
            this.workflowExecuteThreadPool.submitStateEvent(stateEvent);
            writeResponse(stateEvent);
        } catch (Exception e) {
            this.logger.error("Persist event queue error, event: {}", stateEvent, e);
        }
    }

    public void addEvent2WorkflowExecute(StateEvent stateEvent) {
        this.workflowExecuteThreadPool.submitStateEvent(stateEvent);
    }
}
