package org.apache.dolphinscheduler.server.worker.processor;

import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskRecallCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.class */
public class TaskExecuteProcessor implements NettyRequestProcessor {
    private static final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class);
    private final WorkerConfig workerConfig;
    private final TaskCallbackService taskCallbackService;
    private AlertClientService alertClientService;
    private TaskPluginManager taskPluginManager;
    private final WorkerManagerThread workerManager;

    public TaskExecuteProcessor() {
        this.taskCallbackService = (TaskCallbackService) SpringApplicationContext.getBean(TaskCallbackService.class);
        this.workerConfig = (WorkerConfig) SpringApplicationContext.getBean(WorkerConfig.class);
        this.workerManager = (WorkerManagerThread) SpringApplicationContext.getBean(WorkerManagerThread.class);
    }

    private void setTaskCache(TaskExecutionContext taskExecutionContext) {
        TaskExecutionContextCacheManager.cacheTaskExecutionContext((TaskRequest) JSONUtils.parseObject(JSONUtils.toJsonString(taskExecutionContext), TaskRequest.class));
    }

    public TaskExecuteProcessor(AlertClientService alertClientService, TaskPluginManager taskPluginManager) {
        this();
        this.alertClientService = alertClientService;
        this.taskPluginManager = taskPluginManager;
    }

    public void process(Channel channel, Command command) {
        Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
        TaskExecuteRequestCommand taskExecuteRequestCommand = (TaskExecuteRequestCommand) JSONUtils.parseObject(command.getBody(), TaskExecuteRequestCommand.class);
        logger.info("received command : {}", taskExecuteRequestCommand);
        if (taskExecuteRequestCommand == null) {
            logger.error("task execute request command is null");
            return;
        }
        TaskExecutionContext taskExecutionContext = (TaskExecutionContext) JSONUtils.parseObject(taskExecuteRequestCommand.getTaskExecutionContext(), TaskExecutionContext.class);
        if (taskExecutionContext == null) {
            logger.error("task execution context is null");
            return;
        }
        setTaskCache(taskExecutionContext);
        taskExecutionContext.setHost(NetUtils.getAddr(this.workerConfig.getListenPort()));
        if (CommonUtils.isSudoEnable() && this.workerConfig.getWorkerTenantAutoCreate()) {
            OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
        }
        ResponceCache.get().removeRecallCache(Integer.valueOf(taskExecutionContext.getTaskInstanceId()));
        this.taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque()));
        if (this.workerManager.offer(new TaskExecuteThread(taskExecutionContext, this.taskCallbackService, this.alertClientService, this.taskPluginManager))) {
            return;
        }
        logger.warn("submit task to wait queue error, queue is full, queue size is {}, taskInstanceId: {}", Integer.valueOf(this.workerManager.getWaitSubmitQueueSize()), Integer.valueOf(taskExecutionContext.getTaskInstanceId()));
        sendRecallCommand(taskExecutionContext, channel);
        TaskExecutionContextCacheManager.removeByTaskInstanceId(Integer.valueOf(taskExecutionContext.getTaskInstanceId()));
    }

    private void sendRecallCommand(TaskExecutionContext taskExecutionContext, Channel channel) {
        Command convert2Command = buildRecallCommand(taskExecutionContext).convert2Command();
        ResponceCache.get().cache(Integer.valueOf(taskExecutionContext.getTaskInstanceId()), convert2Command, Event.WORKER_REJECT);
        this.taskCallbackService.changeRemoteChannel(taskExecutionContext.getTaskInstanceId(), new NettyRemoteChannel(channel, convert2Command.getOpaque()));
        this.taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), convert2Command);
        logger.info("send recall command successfully, taskId:{}, opaque:{}", Integer.valueOf(taskExecutionContext.getTaskInstanceId()), Long.valueOf(convert2Command.getOpaque()));
    }

    private TaskRecallCommand buildRecallCommand(TaskExecutionContext taskExecutionContext) {
        TaskRecallCommand taskRecallCommand = new TaskRecallCommand();
        taskRecallCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
        taskRecallCommand.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
        taskRecallCommand.setHost(taskExecutionContext.getHost());
        taskRecallCommand.setEvent(Event.WORKER_REJECT);
        taskRecallCommand.setStatus(ExecutionStatus.SUBMITTED_SUCCESS.getCode());
        return taskRecallCommand;
    }
}
