/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.event;

import java.util.Optional;
import lombok.Generated;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.worker.ITaskInstanceExecutionEventAckListener;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceExecutionFinishEventAck;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.event.TaskEventHandleError;
import org.apache.dolphinscheduler.server.master.event.TaskEventHandleException;
import org.apache.dolphinscheduler.server.master.event.TaskEventHandler;
import org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.master.utils.DataQualityResultOperator;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TaskResultEventHandler
implements TaskEventHandler {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(TaskResultEventHandler.class);
    @Autowired
    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
    @Autowired
    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
    @Autowired
    private DataQualityResultOperator dataQualityResultOperator;
    @Autowired
    private ProcessService processService;
    @Autowired
    private TaskInstanceDao taskInstanceDao;
    @Autowired
    private MasterConfig masterConfig;

    @Override
    public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError, TaskEventHandleException {
        int taskInstanceId = taskEvent.getTaskInstanceId();
        int processInstanceId = taskEvent.getProcessInstanceId();
        WorkflowExecuteRunnable workflowExecuteRunnable = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
        if (workflowExecuteRunnable == null) {
            this.sendAckToWorker(taskEvent);
            throw new TaskEventHandleError("Handle task result event error, cannot find related workflow instance from cache, will discard this event");
        }
        Optional<TaskInstance> taskInstanceOptional = workflowExecuteRunnable.getTaskInstance(taskInstanceId);
        if (!taskInstanceOptional.isPresent()) {
            this.sendAckToWorker(taskEvent);
            throw new TaskEventHandleError("Handle task result event error, cannot find the taskInstance from cache, will discord this event");
        }
        TaskInstance taskInstance = taskInstanceOptional.get();
        if (taskInstance.getState().isFinished()) {
            this.sendAckToWorker(taskEvent);
            throw new TaskEventHandleError("Handle task result event error, the task instance is already finished, will discord this event");
        }
        this.dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance);
        TaskInstance oldTaskInstance = new TaskInstance();
        TaskInstanceUtils.copyTaskInstance((TaskInstance)taskInstance, (TaskInstance)oldTaskInstance);
        try {
            taskInstance.setStartTime(taskEvent.getStartTime());
            taskInstance.setHost(taskEvent.getWorkerAddress());
            taskInstance.setLogPath(taskEvent.getLogPath());
            taskInstance.setExecutePath(taskEvent.getExecutePath());
            taskInstance.setPid(taskEvent.getProcessId());
            taskInstance.setAppLink(taskEvent.getAppIds());
            taskInstance.setState(taskEvent.getState());
            taskInstance.setEndTime(taskEvent.getEndTime());
            taskInstance.setVarPool(taskEvent.getVarPool());
            this.processService.changeOutParam(taskInstance);
            this.taskInstanceDao.updateById((Object)taskInstance);
        }
        catch (Exception ex) {
            TaskInstanceUtils.copyTaskInstance((TaskInstance)oldTaskInstance, (TaskInstance)taskInstance);
            throw new TaskEventHandleError("Handle task result event error, save taskInstance to db error", ex);
        }
        this.sendAckToWorker(taskEvent);
        TaskStateEvent stateEvent = TaskStateEvent.builder().processInstanceId(taskEvent.getProcessInstanceId()).taskInstanceId(taskEvent.getTaskInstanceId()).status(taskEvent.getState()).type(StateEventType.TASK_STATE_CHANGE).build();
        this.workflowExecuteThreadPool.submitStateEvent(stateEvent);
    }

    public void sendAckToWorker(TaskEvent taskEvent) {
        try {
            ITaskInstanceExecutionEventAckListener instanceExecutionEventAckListener = (ITaskInstanceExecutionEventAckListener)SingletonJdkDynamicRpcClientProxyFactory.getProxyClient((String)taskEvent.getWorkerAddress(), ITaskInstanceExecutionEventAckListener.class);
            instanceExecutionEventAckListener.handleTaskInstanceExecutionFinishEventAck(TaskInstanceExecutionFinishEventAck.success((int)taskEvent.getTaskInstanceId()));
        }
        catch (Exception e) {
            log.warn("send ack to worker error, taskInstanceId: {}", (Object)taskEvent.getTaskInstanceId(), (Object)e);
        }
    }

    @Override
    public TaskEventType getHandleEventType() {
        return TaskEventType.RESULT;
    }
}

