package org.apache.dolphinscheduler.server.master.event;

import java.util.Optional;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.master.utils.DataQualityResultOperator;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.class */
public class TaskResultEventHandler implements TaskEventHandler {

    @Autowired
    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;

    @Autowired
    private WorkflowExecuteThreadPool workflowExecuteThreadPool;

    @Autowired
    private DataQualityResultOperator dataQualityResultOperator;

    @Autowired
    private ProcessService processService;

    @Autowired
    private MasterConfig masterConfig;

    @Override // org.apache.dolphinscheduler.server.master.event.TaskEventHandler
    public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError, TaskEventHandleException {
        int taskInstanceId = taskEvent.getTaskInstanceId();
        WorkflowExecuteRunnable byProcessInstanceId = this.processInstanceExecCacheManager.getByProcessInstanceId(taskEvent.getProcessInstanceId());
        if (byProcessInstanceId == null) {
            sendAckToWorker(taskEvent);
            throw new TaskEventHandleError("Handle task result event error, cannot find related workflow instance from cache, will discard this event");
        }
        Optional<TaskInstance> taskInstance = byProcessInstanceId.getTaskInstance(taskInstanceId);
        if (!taskInstance.isPresent()) {
            sendAckToWorker(taskEvent);
            throw new TaskEventHandleError("Handle task result event error, cannot find the taskInstance from cache, will discord this event");
        }
        TaskInstance taskInstance2 = taskInstance.get();
        if (taskInstance2.getState().isFinished()) {
            sendAckToWorker(taskEvent);
            throw new TaskEventHandleError("Handle task result event error, the task instance is already finished, will discord this event");
        }
        this.dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance2);
        TaskInstance taskInstance3 = new TaskInstance();
        TaskInstanceUtils.copyTaskInstance(taskInstance2, taskInstance3);
        try {
            taskInstance2.setStartTime(taskEvent.getStartTime());
            taskInstance2.setHost(taskEvent.getWorkerAddress());
            taskInstance2.setLogPath(taskEvent.getLogPath());
            taskInstance2.setExecutePath(taskEvent.getExecutePath());
            taskInstance2.setPid(taskEvent.getProcessId());
            taskInstance2.setAppLink(taskEvent.getAppIds());
            taskInstance2.setState(taskEvent.getState());
            taskInstance2.setEndTime(taskEvent.getEndTime());
            taskInstance2.setVarPool(taskEvent.getVarPool());
            this.processService.changeOutParam(taskInstance2);
            this.processService.updateTaskInstance(taskInstance2);
            sendAckToWorker(taskEvent);
            this.workflowExecuteThreadPool.submitStateEvent(TaskStateEvent.builder().processInstanceId(taskEvent.getProcessInstanceId()).taskInstanceId(Integer.valueOf(taskEvent.getTaskInstanceId())).status(taskEvent.getState()).type(StateEventType.TASK_STATE_CHANGE).build());
        } catch (Exception e) {
            TaskInstanceUtils.copyTaskInstance(taskInstance3, taskInstance2);
            throw new TaskEventHandleError("Handle task result event error, save taskInstance to db error", e);
        }
    }

    public void sendAckToWorker(TaskEvent taskEvent) {
        taskEvent.getChannel().writeAndFlush(new TaskExecuteAckCommand(true, taskEvent.getTaskInstanceId(), this.masterConfig.getMasterAddress(), taskEvent.getWorkerAddress(), System.currentTimeMillis()).convert2Command());
    }

    @Override // org.apache.dolphinscheduler.server.master.event.TaskEventHandler
    public TaskEventType getHandleEventType() {
        return TaskEventType.RESULT;
    }
}
