package org.apache.dolphinscheduler.server.master.processor.queue;

import io.netty.channel.Channel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.class */
public class TaskResponseService {
    private final Logger logger = LoggerFactory.getLogger(TaskResponseService.class);
    private final BlockingQueue<TaskResponseEvent> eventQueue = new LinkedBlockingQueue();

    @Autowired
    private ProcessService processService;
    private Thread taskResponseWorker;
    private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$dolphinscheduler$common$enums$Event = new int[Event.values().length];

        static {
            try {
                $SwitchMap$org$apache$dolphinscheduler$common$enums$Event[Event.ACK.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$dolphinscheduler$common$enums$Event[Event.RESULT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* loaded from: input_file:org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService$TaskResponseWorker.class */
    class TaskResponseWorker extends Thread {
        TaskResponseWorker() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (Stopper.isRunning()) {
                try {
                    TaskResponseService.this.persist((TaskResponseEvent) TaskResponseService.this.eventQueue.take());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (Exception e2) {
                    TaskResponseService.this.logger.error("persist task error", e2);
                }
            }
            TaskResponseService.this.logger.info("StateEventResponseWorker stopped");
        }
    }

    public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> concurrentHashMap) {
        if (this.processInstanceMapper == null) {
            this.processInstanceMapper = concurrentHashMap;
        }
    }

    @PostConstruct
    public void start() {
        this.taskResponseWorker = new TaskResponseWorker();
        this.taskResponseWorker.setName("StateEventResponseWorker");
        this.taskResponseWorker.start();
    }

    @PreDestroy
    public void stop() {
        try {
            this.taskResponseWorker.interrupt();
            if (!this.eventQueue.isEmpty()) {
                ArrayList arrayList = new ArrayList(this.eventQueue.size());
                this.eventQueue.drainTo(arrayList);
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    persist((TaskResponseEvent) it.next());
                }
            }
        } catch (Exception e) {
            this.logger.error("stop error:", e);
        }
    }

    public void addResponse(TaskResponseEvent taskResponseEvent) {
        try {
            this.eventQueue.put(taskResponseEvent);
        } catch (InterruptedException e) {
            this.logger.error("put task : {} error :{}", taskResponseEvent, e);
            Thread.currentThread().interrupt();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void persist(TaskResponseEvent taskResponseEvent) {
        Event event = taskResponseEvent.getEvent();
        Channel channel = taskResponseEvent.getChannel();
        TaskInstance findTaskInstanceById = this.processService.findTaskInstanceById(Integer.valueOf(taskResponseEvent.getTaskInstanceId()));
        switch (AnonymousClass1.$SwitchMap$org$apache$dolphinscheduler$common$enums$Event[event.ordinal()]) {
            case 1:
                if (findTaskInstanceById != null) {
                    try {
                        this.processService.changeTaskState(findTaskInstanceById, findTaskInstanceById.getState().typeIsFinished() ? findTaskInstanceById.getState() : taskResponseEvent.getState(), taskResponseEvent.getStartTime(), taskResponseEvent.getWorkerAddress(), taskResponseEvent.getExecutePath(), taskResponseEvent.getLogPath(), taskResponseEvent.getTaskInstanceId());
                    } catch (Exception e) {
                        this.logger.error("worker ack master error", e);
                        channel.writeAndFlush(new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), -1).convert2Command());
                        break;
                    }
                }
                channel.writeAndFlush(new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId()).convert2Command());
                break;
            case 2:
                if (findTaskInstanceById != null) {
                    try {
                        this.processService.changeTaskState(findTaskInstanceById, taskResponseEvent.getState(), taskResponseEvent.getEndTime(), taskResponseEvent.getProcessId(), taskResponseEvent.getAppIds(), taskResponseEvent.getTaskInstanceId(), taskResponseEvent.getVarPool());
                    } catch (Exception e2) {
                        this.logger.error("worker response master error", e2);
                        channel.writeAndFlush(new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1).convert2Command());
                        break;
                    }
                }
                channel.writeAndFlush(new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId()).convert2Command());
                break;
            default:
                throw new IllegalArgumentException("invalid event type : " + event);
        }
        WorkflowExecuteThread workflowExecuteThread = this.processInstanceMapper.get(Integer.valueOf(taskResponseEvent.getProcessInstanceId()));
        if (workflowExecuteThread != null) {
            StateEvent stateEvent = new StateEvent();
            stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId());
            stateEvent.setTaskInstanceId(taskResponseEvent.getTaskInstanceId());
            stateEvent.setExecutionStatus(taskResponseEvent.getState());
            stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
            workflowExecuteThread.addStateEvent(stateEvent);
        }
    }

    public BlockingQueue<TaskResponseEvent> getEventQueue() {
        return this.eventQueue;
    }
}
