package org.apache.dolphinscheduler.server.worker.processor;

import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.class */
public class TaskExecuteProcessor implements NettyRequestProcessor {
    private final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class);
    private final TaskCallbackService taskCallbackService = (TaskCallbackService) SpringApplicationContext.getBean(TaskCallbackService.class);
    private final WorkerConfig workerConfig = (WorkerConfig) SpringApplicationContext.getBean(WorkerConfig.class);
    private final ExecutorService workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", this.workerConfig.getWorkerExecThreads());
    private TaskExecutionContextCacheManager taskExecutionContextCacheManager = (TaskExecutionContextCacheManager) SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);

    private void setTaskCache(TaskExecutionContext taskExecutionContext) {
        new TaskExecutionContext().setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
        this.taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
    }

    public void process(Channel channel, Command command) {
        Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
        TaskExecuteRequestCommand taskExecuteRequestCommand = (TaskExecuteRequestCommand) FastJsonSerializer.deserialize(command.getBody(), TaskExecuteRequestCommand.class);
        this.logger.info("received command : {}", taskExecuteRequestCommand);
        if (taskExecuteRequestCommand == null) {
            this.logger.error("task execute request command is null");
            return;
        }
        TaskExecutionContext taskExecutionContext = (TaskExecutionContext) JSONObject.parseObject(taskExecuteRequestCommand.getTaskExecutionContext(), TaskExecutionContext.class);
        if (taskExecutionContext == null) {
            this.logger.error("task execution context is null");
            return;
        }
        setTaskCache(taskExecutionContext);
        Logger logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId("TASK", taskExecutionContext.getProcessDefineId(), taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()));
        taskExecutionContext.setHost(OSUtils.getAddr(this.workerConfig.getListenPort()));
        taskExecutionContext.setStartTime(new Date());
        taskExecutionContext.setLogPath(getTaskLogPath(taskExecutionContext));
        String execLocalPath = getExecLocalPath(taskExecutionContext);
        this.logger.info("task instance  local execute path : {} ", execLocalPath);
        FileUtils.taskLoggerThreadLocal.set(logger);
        try {
            FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskExecutionContext.getTenantCode());
        } catch (Throwable th) {
            String format = String.format("create execLocalPath : %s", execLocalPath);
            LoggerUtils.logError(Optional.ofNullable(this.logger), format, th);
            LoggerUtils.logError(Optional.ofNullable(logger), format, th);
            this.taskExecutionContextCacheManager.removeByTaskInstanceId(Integer.valueOf(taskExecutionContext.getTaskInstanceId()));
        }
        FileUtils.taskLoggerThreadLocal.remove();
        this.taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque()));
        doAck(taskExecutionContext);
        this.workerExecService.submit(new TaskExecuteThread(taskExecutionContext, this.taskCallbackService, logger));
    }

    private void doAck(TaskExecutionContext taskExecutionContext) {
        TaskExecuteAckCommand buildAckCommand = buildAckCommand(taskExecutionContext);
        ResponceCache.get().cache(Integer.valueOf(taskExecutionContext.getTaskInstanceId()), buildAckCommand.convert2Command(), Event.ACK);
        this.taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), buildAckCommand.convert2Command());
    }

    private String getTaskLogPath(TaskExecutionContext taskExecutionContext) {
        String logBase = LoggerFactory.getILoggerFactory().getLogger("ROOT").getAppender("TASKLOGFILE").getDiscriminator().getLogBase();
        return logBase.startsWith("/") ? logBase + "/" + taskExecutionContext.getProcessDefineId() + "/" + taskExecutionContext.getProcessInstanceId() + "/" + taskExecutionContext.getTaskInstanceId() + ".log" : System.getProperty("user.dir") + "/" + logBase + "/" + taskExecutionContext.getProcessDefineId() + "/" + taskExecutionContext.getProcessInstanceId() + "/" + taskExecutionContext.getTaskInstanceId() + ".log";
    }

    private TaskExecuteAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) {
        TaskExecuteAckCommand taskExecuteAckCommand = new TaskExecuteAckCommand();
        taskExecuteAckCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
        taskExecuteAckCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
        taskExecuteAckCommand.setLogPath(taskExecutionContext.getLogPath());
        taskExecuteAckCommand.setHost(taskExecutionContext.getHost());
        taskExecuteAckCommand.setStartTime(taskExecutionContext.getStartTime());
        if (taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())) {
            taskExecuteAckCommand.setExecutePath((String) null);
        } else {
            taskExecuteAckCommand.setExecutePath(taskExecutionContext.getExecutePath());
        }
        return taskExecuteAckCommand;
    }

    private String getExecLocalPath(TaskExecutionContext taskExecutionContext) {
        return FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(), taskExecutionContext.getProcessDefineId(), taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
    }
}
