/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.processor;

import io.netty.channel.Channel;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.remote.command.Message;
import org.apache.dolphinscheduler.remote.command.MessageType;
import org.apache.dolphinscheduler.remote.command.task.TaskDispatchRequest;
import org.apache.dolphinscheduler.remote.command.task.TaskDispatchResponse;
import org.apache.dolphinscheduler.remote.processor.MasterRpcProcessor;
import org.apache.dolphinscheduler.server.master.runner.MasterDelayTaskExecuteRunnableDelayQueue;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterDelayTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableFactoryBuilder;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder;
import org.apache.dolphinscheduler.server.master.runner.message.MasterMessageSenderManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MasterTaskDispatchProcessor
implements MasterRpcProcessor {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(MasterTaskDispatchProcessor.class);
    @Autowired
    private MasterTaskExecuteRunnableFactoryBuilder masterTaskExecuteRunnableFactoryBuilder;
    @Autowired
    private MasterMessageSenderManager masterMessageSenderManager;
    @Autowired
    private MasterDelayTaskExecuteRunnableDelayQueue masterDelayTaskExecuteRunnableDelayQueue;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void process(Channel channel, Message message) {
        TaskDispatchRequest taskDispatchRequest = (TaskDispatchRequest)JSONUtils.parseObject((byte[])message.getBody(), TaskDispatchRequest.class);
        log.info("Receive task dispatch request, command: {}", (Object)taskDispatchRequest);
        TaskExecutionContext taskExecutionContext = taskDispatchRequest.getTaskExecutionContext();
        taskExecutionContext.setLogPath(LogUtils.getTaskInstanceLogFullPath((TaskExecutionContext)taskExecutionContext));
        try {
            MasterDelayTaskExecuteRunnable masterDelayTaskExecuteRunnable;
            LogUtils.setWorkflowAndTaskInstanceIDMDC((Integer)taskExecutionContext.getProcessInstanceId(), (Integer)taskExecutionContext.getTaskInstanceId());
            LogUtils.setTaskInstanceLogFullPathMDC((String)taskExecutionContext.getLogPath());
            MasterTaskExecutionContextHolder.putTaskExecutionContext(taskExecutionContext);
            long remainTime = DateUtils.getRemainTime((Date)DateUtils.timeStampToDate((long)taskExecutionContext.getFirstSubmitTime()), (long)TimeUnit.SECONDS.toMillis(taskExecutionContext.getDelayTime()));
            if (remainTime > 0L) {
                log.info("Current taskInstance: {} is choose delay execution, delay time: {}ms, remainTime: {}ms", new Object[]{taskExecutionContext.getTaskName(), TimeUnit.SECONDS.toMillis(taskExecutionContext.getDelayTime()), remainTime});
                taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION);
                this.masterMessageSenderManager.getMasterTaskExecuteResultMessageSender().sendMessage(taskExecutionContext);
            }
            if (this.masterDelayTaskExecuteRunnableDelayQueue.submitMasterDelayTaskExecuteRunnable(masterDelayTaskExecuteRunnable = this.masterTaskExecuteRunnableFactoryBuilder.createWorkerDelayTaskExecuteRunnableFactory(taskExecutionContext.getTaskType()).createWorkerTaskExecuteRunnable(taskExecutionContext))) {
                log.info("Submit task: {} to MasterDelayTaskExecuteRunnableDelayQueue success", (Object)taskExecutionContext.getTaskName());
                this.sendDispatchSuccessResult(channel, message, taskExecutionContext);
            } else {
                log.error("Submit task: {} to MasterDelayTaskExecuteRunnableDelayQueue failed, current task waiting queue size: {} is full", (Object)taskExecutionContext.getTaskName(), (Object)this.masterDelayTaskExecuteRunnableDelayQueue.size());
                this.sendDispatchRejectResult(channel, message, taskExecutionContext);
            }
        }
        catch (Exception ex) {
            log.error("Handle task dispatch request error, command: {}", (Object)taskDispatchRequest, (Object)ex);
            MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskExecutionContext.getTaskInstanceId());
            this.sendDispatchFailedResult(channel, message, taskExecutionContext, ex);
        }
        finally {
            LogUtils.removeWorkflowAndTaskInstanceIdMDC();
            LogUtils.removeTaskInstanceLogFullPathMDC();
        }
    }

    private void sendDispatchSuccessResult(Channel channel, Message dispatchRequest, TaskExecutionContext taskExecutionContext) {
        TaskDispatchResponse taskDispatchResponse = TaskDispatchResponse.success((Integer)taskExecutionContext.getTaskInstanceId());
        channel.writeAndFlush((Object)taskDispatchResponse.convert2Command(dispatchRequest.getOpaque()));
    }

    private void sendDispatchRejectResult(Channel channel, Message dispatchRequest, TaskExecutionContext taskExecutionContext) {
        TaskDispatchResponse taskDispatchResponse = TaskDispatchResponse.failed((Integer)taskExecutionContext.getTaskInstanceId(), (String)"Task dispatch queue is full");
        channel.writeAndFlush((Object)taskDispatchResponse.convert2Command(dispatchRequest.getOpaque()));
    }

    private void sendDispatchFailedResult(Channel channel, Message dispatchRequest, TaskExecutionContext taskExecutionContext, Throwable throwable) {
        TaskDispatchResponse taskDispatchResponse = TaskDispatchResponse.failed((Integer)taskExecutionContext.getTaskInstanceId(), (String)throwable.getMessage());
        channel.writeAndFlush((Object)taskDispatchResponse.convert2Command(dispatchRequest.getOpaque()));
    }

    public MessageType getCommandType() {
        return MessageType.TASK_DISPATCH_REQUEST;
    }
}

