/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.worker.processor;

import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskExecuteProcessor
implements NettyRequestProcessor {
    private final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class);
    private final ExecutorService workerExecService;
    private final WorkerConfig workerConfig;
    private final TaskCallbackService taskCallbackService = (TaskCallbackService)SpringApplicationContext.getBean(TaskCallbackService.class);
    private TaskExecutionContextCacheManager taskExecutionContextCacheManager;

    public TaskExecuteProcessor() {
        this.workerConfig = (WorkerConfig)SpringApplicationContext.getBean(WorkerConfig.class);
        this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor((String)"Worker-Execute-Thread", (int)this.workerConfig.getWorkerExecThreads());
        this.taskExecutionContextCacheManager = (TaskExecutionContextCacheManager)SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
    }

    private void setTaskCache(TaskExecutionContext taskExecutionContext) {
        TaskExecutionContext preTaskCache = new TaskExecutionContext();
        preTaskCache.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
        this.taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
    }

    public void process(Channel channel, Command command) {
        Preconditions.checkArgument((CommandType.TASK_EXECUTE_REQUEST == command.getType() ? 1 : 0) != 0, (Object)String.format("invalid command type : %s", command.getType()));
        TaskExecuteRequestCommand taskRequestCommand = (TaskExecuteRequestCommand)FastJsonSerializer.deserialize((byte[])command.getBody(), TaskExecuteRequestCommand.class);
        this.logger.info("received command : {}", (Object)taskRequestCommand);
        if (taskRequestCommand == null) {
            this.logger.error("task execute request command is null");
            return;
        }
        String contextJson = taskRequestCommand.getTaskExecutionContext();
        TaskExecutionContext taskExecutionContext = (TaskExecutionContext)JSONObject.parseObject((String)contextJson, TaskExecutionContext.class);
        if (taskExecutionContext == null) {
            this.logger.error("task execution context is null");
            return;
        }
        this.setTaskCache(taskExecutionContext);
        Logger taskLogger = LoggerFactory.getLogger((String)LoggerUtils.buildTaskId((String)"TASK", (int)taskExecutionContext.getProcessDefineId(), (int)taskExecutionContext.getProcessInstanceId(), (int)taskExecutionContext.getTaskInstanceId()));
        taskExecutionContext.setHost(OSUtils.getAddr((int)this.workerConfig.getListenPort()));
        taskExecutionContext.setStartTime(new Date());
        taskExecutionContext.setLogPath(this.getTaskLogPath(taskExecutionContext));
        String execLocalPath = this.getExecLocalPath(taskExecutionContext);
        this.logger.info("task instance  local execute path : {} ", (Object)execLocalPath);
        FileUtils.taskLoggerThreadLocal.set(taskLogger);
        try {
            FileUtils.createWorkDirAndUserIfAbsent((String)execLocalPath, (String)taskExecutionContext.getTenantCode());
        }
        catch (Throwable ex) {
            String errorLog = String.format("create execLocalPath : %s", execLocalPath);
            LoggerUtils.logError(Optional.ofNullable(this.logger), (String)errorLog, (Throwable)ex);
            LoggerUtils.logError(Optional.ofNullable(taskLogger), (String)errorLog, (Throwable)ex);
            this.taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
        }
        FileUtils.taskLoggerThreadLocal.remove();
        this.taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque()));
        this.doAck(taskExecutionContext);
        this.workerExecService.submit(new TaskExecuteThread(taskExecutionContext, this.taskCallbackService, taskLogger));
    }

    private void doAck(TaskExecutionContext taskExecutionContext) {
        TaskExecuteAckCommand ackCommand = this.buildAckCommand(taskExecutionContext);
        ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command(), Event.ACK);
        this.taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command());
    }

    private String getTaskLogPath(TaskExecutionContext taskExecutionContext) {
        String baseLog = ((TaskLogDiscriminator)((SiftingAppender)((LoggerContext)LoggerFactory.getILoggerFactory()).getLogger("ROOT").getAppender("TASKLOGFILE")).getDiscriminator()).getLogBase();
        if (baseLog.startsWith("/")) {
            return baseLog + "/" + taskExecutionContext.getProcessDefineId() + "/" + taskExecutionContext.getProcessInstanceId() + "/" + taskExecutionContext.getTaskInstanceId() + ".log";
        }
        return System.getProperty("user.dir") + "/" + baseLog + "/" + taskExecutionContext.getProcessDefineId() + "/" + taskExecutionContext.getProcessInstanceId() + "/" + taskExecutionContext.getTaskInstanceId() + ".log";
    }

    private TaskExecuteAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) {
        TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
        ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
        ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
        ackCommand.setLogPath(taskExecutionContext.getLogPath());
        ackCommand.setHost(taskExecutionContext.getHost());
        ackCommand.setStartTime(taskExecutionContext.getStartTime());
        if (taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())) {
            ackCommand.setExecutePath(null);
        } else {
            ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
        }
        return ackCommand;
    }

    private String getExecLocalPath(TaskExecutionContext taskExecutionContext) {
        return FileUtils.getProcessExecDir((int)taskExecutionContext.getProjectId(), (int)taskExecutionContext.getProcessDefineId(), (int)taskExecutionContext.getProcessInstanceId(), (int)taskExecutionContext.getTaskInstanceId());
    }
}

