package org.apache.dolphinscheduler.server.master.processor.queue;

import io.netty.channel.Channel;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskKillAckCommand;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.class */
public class TaskResponsePersistThread implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(TaskResponsePersistThread.class);
    private final ConcurrentLinkedQueue<TaskResponseEvent> events = new ConcurrentLinkedQueue<>();
    private final Integer processInstanceId;
    private ProcessService processService;
    private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.dolphinscheduler.server.master.processor.queue.TaskResponsePersistThread$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$dolphinscheduler$common$enums$Event = new int[Event.values().length];

        static {
            try {
                $SwitchMap$org$apache$dolphinscheduler$common$enums$Event[Event.ACK.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$dolphinscheduler$common$enums$Event[Event.RESULT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$dolphinscheduler$common$enums$Event[Event.ACTION_STOP.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public TaskResponsePersistThread(ProcessService processService, ConcurrentHashMap<Integer, WorkflowExecuteThread> concurrentHashMap, Integer num) {
        this.processService = processService;
        this.processInstanceMapper = concurrentHashMap;
        this.processInstanceId = num;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!this.events.isEmpty()) {
            TaskResponseEvent peek = this.events.peek();
            try {
                if (!persist(peek)) {
                    logger.error("persist meta error, task id:{}, instance id:{}", Integer.valueOf(peek.getTaskInstanceId()), Integer.valueOf(peek.getProcessInstanceId()));
                }
            } catch (Exception e) {
                logger.error("persist error, task id:{}, instance id:{}, error: {}", new Object[]{Integer.valueOf(peek.getTaskInstanceId()), Integer.valueOf(peek.getProcessInstanceId()), e});
            } finally {
                this.events.remove(peek);
            }
        }
    }

    private boolean persist(TaskResponseEvent taskResponseEvent) {
        ITaskProcessor iTaskProcessor;
        Event event = taskResponseEvent.getEvent();
        Channel channel = taskResponseEvent.getChannel();
        TaskInstance findTaskInstanceById = this.processService.findTaskInstanceById(Integer.valueOf(taskResponseEvent.getTaskInstanceId()));
        boolean z = true;
        switch (AnonymousClass1.$SwitchMap$org$apache$dolphinscheduler$common$enums$Event[event.ordinal()]) {
            case 1:
                if (findTaskInstanceById != null) {
                    try {
                        this.processService.changeTaskState(findTaskInstanceById, findTaskInstanceById.getState().typeIsFinished() ? findTaskInstanceById.getState() : taskResponseEvent.getState(), taskResponseEvent.getStartTime(), taskResponseEvent.getWorkerAddress(), taskResponseEvent.getExecutePath(), taskResponseEvent.getLogPath(), taskResponseEvent.getTaskInstanceId());
                        logger.debug("changeTaskState in ACK , changed in meta:{} ,task instance state:{}, task response event state:{}, taskInstance id:{},taskInstance host:{}", new Object[]{true, findTaskInstanceById.getState(), taskResponseEvent.getState(), Integer.valueOf(findTaskInstanceById.getId()), findTaskInstanceById.getHost()});
                    } catch (Exception e) {
                        z = false;
                        logger.error("worker ack master error", e);
                        channel.writeAndFlush(new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), findTaskInstanceById == null ? -1 : findTaskInstanceById.getId()).convert2Command());
                        break;
                    }
                }
                channel.writeAndFlush(new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId()).convert2Command());
                logger.debug("worker ack master success, taskInstance id:{},taskInstance host:{}", Integer.valueOf(findTaskInstanceById.getId()), findTaskInstanceById.getHost());
                break;
            case 2:
                if (findTaskInstanceById != null) {
                    try {
                        z = this.processService.changeTaskState(findTaskInstanceById, taskResponseEvent.getState(), taskResponseEvent.getEndTime(), taskResponseEvent.getProcessId(), taskResponseEvent.getAppIds(), taskResponseEvent.getTaskInstanceId(), taskResponseEvent.getVarPool());
                        logger.debug("changeTaskState in RESULT , changed in meta:{} task instance state:{}, task response event state:{}, taskInstance id:{},taskInstance host:{}", new Object[]{Boolean.valueOf(z), findTaskInstanceById.getState(), taskResponseEvent.getState(), Integer.valueOf(findTaskInstanceById.getId()), findTaskInstanceById.getHost()});
                    } catch (Exception e2) {
                        z = false;
                        logger.error("worker response master error", e2);
                        channel.writeAndFlush(new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1).convert2Command());
                        break;
                    }
                }
                if (z) {
                    channel.writeAndFlush(new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId()).convert2Command());
                    logger.debug("worker response master success, taskInstance id:{},taskInstance host:{}", Integer.valueOf(findTaskInstanceById.getId()), findTaskInstanceById.getHost());
                } else {
                    channel.writeAndFlush(new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), taskResponseEvent.getTaskInstanceId()).convert2Command());
                    logger.debug("worker response master failure, taskInstance id:{},taskInstance host:{}", Integer.valueOf(findTaskInstanceById.getId()), findTaskInstanceById.getHost());
                }
                break;
            case 3:
                WorkflowExecuteThread workflowExecuteThread = this.processInstanceMapper.get(Integer.valueOf(taskResponseEvent.getProcessInstanceId()));
                if (workflowExecuteThread != null && (iTaskProcessor = workflowExecuteThread.getActiveTaskProcessorMaps().get(Integer.valueOf(taskResponseEvent.getTaskInstanceId()))) != null) {
                    iTaskProcessor.persist(TaskAction.STOP);
                    logger.debug("ACTION_STOP: task instance id:{}, process instance id:{}", Integer.valueOf(taskResponseEvent.getTaskInstanceId()), Integer.valueOf(taskResponseEvent.getProcessInstanceId()));
                }
                if (channel != null) {
                    channel.writeAndFlush(new TaskKillAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId()).convert2Command());
                    break;
                }
                break;
            default:
                throw new IllegalArgumentException("invalid event type : " + event);
        }
        WorkflowExecuteThread workflowExecuteThread2 = this.processInstanceMapper.get(Integer.valueOf(taskResponseEvent.getProcessInstanceId()));
        if (workflowExecuteThread2 != null) {
            StateEvent stateEvent = new StateEvent();
            stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId());
            stateEvent.setTaskInstanceId(taskResponseEvent.getTaskInstanceId());
            stateEvent.setExecutionStatus(taskResponseEvent.getState());
            stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
            workflowExecuteThread2.addStateEvent(stateEvent);
        }
        return z;
    }

    public boolean addEvent(TaskResponseEvent taskResponseEvent) {
        if (taskResponseEvent.getProcessInstanceId() == this.processInstanceId.intValue()) {
            return this.events.add(taskResponseEvent);
        }
        logger.info("event would be abounded, task instance id:{}, process instance id:{}, this.processInstanceId:{}", new Object[]{Integer.valueOf(taskResponseEvent.getTaskInstanceId()), Integer.valueOf(taskResponseEvent.getProcessInstanceId()), this.processInstanceId});
        return false;
    }

    public int eventSize() {
        return this.events.size();
    }

    public boolean isEmpty() {
        return this.events.isEmpty();
    }

    public Integer getProcessInstanceId() {
        return this.processInstanceId;
    }

    public String getKey() {
        return String.valueOf(this.processInstanceId);
    }
}
