package org.apache.dolphinscheduler.server.master.processor.queue;

import io.netty.channel.Channel;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.class */
public class TaskExecuteThread {
    private static final Logger logger = LoggerFactory.getLogger(TaskExecuteThread.class);
    private final int processInstanceId;
    private final ConcurrentLinkedQueue<TaskEvent> events = new ConcurrentLinkedQueue<>();
    private ProcessService processService;
    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
    private DataQualityResultOperator dataQualityResultOperator;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.dolphinscheduler.server.master.processor.queue.TaskExecuteThread$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$dolphinscheduler$common$enums$Event = new int[Event.values().length];

        static {
            try {
                $SwitchMap$org$apache$dolphinscheduler$common$enums$Event[Event.DISPATCH.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$dolphinscheduler$common$enums$Event[Event.DELAY.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$dolphinscheduler$common$enums$Event[Event.RUNNING.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$dolphinscheduler$common$enums$Event[Event.RESULT.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public TaskExecuteThread(int i, ProcessService processService, WorkflowExecuteThreadPool workflowExecuteThreadPool, ProcessInstanceExecCacheManager processInstanceExecCacheManager, DataQualityResultOperator dataQualityResultOperator) {
        this.processInstanceId = i;
        this.processService = processService;
        this.workflowExecuteThreadPool = workflowExecuteThreadPool;
        this.processInstanceExecCacheManager = processInstanceExecCacheManager;
        this.dataQualityResultOperator = dataQualityResultOperator;
    }

    public void run() {
        while (!this.events.isEmpty()) {
            TaskEvent peek = this.events.peek();
            try {
                persist(peek);
            } catch (Exception e) {
                logger.error("persist error, event:{}, error: {}", peek, e);
            } finally {
                this.events.remove(peek);
            }
        }
    }

    public String getKey() {
        return String.valueOf(this.processInstanceId);
    }

    public int eventSize() {
        return this.events.size();
    }

    public boolean isEmpty() {
        return this.events.isEmpty();
    }

    public Integer getProcessInstanceId() {
        return Integer.valueOf(this.processInstanceId);
    }

    public boolean addEvent(TaskEvent taskEvent) {
        if (taskEvent.getProcessInstanceId() == this.processInstanceId) {
            return this.events.add(taskEvent);
        }
        logger.warn("event would be abounded, task instance id:{}, process instance id:{}, this.processInstanceId:{}", new Object[]{Integer.valueOf(taskEvent.getTaskInstanceId()), Integer.valueOf(taskEvent.getProcessInstanceId()), Integer.valueOf(this.processInstanceId)});
        return false;
    }

    private void persist(TaskEvent taskEvent) {
        Event event = taskEvent.getEvent();
        int taskInstanceId = taskEvent.getTaskInstanceId();
        WorkflowExecuteThread byProcessInstanceId = this.processInstanceExecCacheManager.getByProcessInstanceId(taskEvent.getProcessInstanceId());
        TaskInstance findTaskInstanceById = (byProcessInstanceId == null || !byProcessInstanceId.checkTaskInstanceById(taskInstanceId)) ? this.processService.findTaskInstanceById(Integer.valueOf(taskInstanceId)) : byProcessInstanceId.getTaskInstance(taskInstanceId);
        switch (AnonymousClass1.$SwitchMap$org$apache$dolphinscheduler$common$enums$Event[event.ordinal()]) {
            case 1:
                handleDispatchEvent(taskEvent, findTaskInstanceById);
                return;
            case 2:
            case 3:
                handleRunningEvent(taskEvent, findTaskInstanceById);
                break;
            case 4:
                handleResultEvent(taskEvent, findTaskInstanceById);
                break;
            default:
                throw new IllegalArgumentException("invalid event type : " + event);
        }
        StateEvent stateEvent = new StateEvent();
        stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId());
        stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId());
        stateEvent.setExecutionStatus(taskEvent.getState());
        stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
        this.workflowExecuteThreadPool.submitStateEvent(stateEvent);
    }

    private void handleDispatchEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
        if (taskInstance == null) {
            logger.error("taskInstance is null");
        } else {
            if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) {
                return;
            }
            taskInstance.setState(ExecutionStatus.DISPATCH);
            taskInstance.setHost(taskEvent.getWorkerAddress());
            this.processService.saveTaskInstance(taskInstance);
        }
    }

    private void handleRunningEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
        Channel channel = taskEvent.getChannel();
        if (taskInstance != null) {
            try {
                if (taskInstance.getState().typeIsFinished()) {
                    logger.warn("task is finish, running event is meaningless, taskInstanceId:{}, state:{}", Integer.valueOf(taskInstance.getId()), taskInstance.getState());
                } else {
                    taskInstance.setState(taskEvent.getState());
                    taskInstance.setStartTime(taskEvent.getStartTime());
                    taskInstance.setHost(taskEvent.getWorkerAddress());
                    taskInstance.setLogPath(taskEvent.getLogPath());
                    taskInstance.setExecutePath(taskEvent.getExecutePath());
                    taskInstance.setPid(taskEvent.getProcessId());
                    taskInstance.setAppLink(taskEvent.getAppIds());
                    this.processService.saveTaskInstance(taskInstance);
                }
            } catch (Exception e) {
                logger.error("worker ack master error", e);
                channel.writeAndFlush(new TaskExecuteRunningAckCommand(ExecutionStatus.FAILURE.getCode(), -1).convert2Command());
                return;
            }
        }
        channel.writeAndFlush(new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId()).convert2Command());
    }

    private void handleResultEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
        Channel channel = taskEvent.getChannel();
        if (taskInstance != null) {
            try {
                this.dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance);
                taskInstance.setStartTime(taskEvent.getStartTime());
                taskInstance.setHost(taskEvent.getWorkerAddress());
                taskInstance.setLogPath(taskEvent.getLogPath());
                taskInstance.setExecutePath(taskEvent.getExecutePath());
                taskInstance.setPid(taskEvent.getProcessId());
                taskInstance.setAppLink(taskEvent.getAppIds());
                taskInstance.setState(taskEvent.getState());
                taskInstance.setEndTime(taskEvent.getEndTime());
                taskInstance.setVarPool(taskEvent.getVarPool());
                this.processService.changeOutParam(taskInstance);
                this.processService.saveTaskInstance(taskInstance);
            } catch (Exception e) {
                logger.error("worker response master error", e);
                channel.writeAndFlush(new TaskExecuteResponseAckCommand(ExecutionStatus.FAILURE.getCode(), -1).convert2Command());
                return;
            }
        }
        channel.writeAndFlush(new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId()).convert2Command());
    }
}
