/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.worker.processor;

import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import java.util.Date;
import java.util.Optional;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskExecuteProcessor
implements NettyRequestProcessor {
    private static final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class);
    private final WorkerConfig workerConfig;
    private final TaskCallbackService taskCallbackService = (TaskCallbackService)SpringApplicationContext.getBean(TaskCallbackService.class);
    private AlertClientService alertClientService;
    private TaskPluginManager taskPluginManager;
    private final WorkerManagerThread workerManager;

    public TaskExecuteProcessor() {
        this.workerConfig = (WorkerConfig)SpringApplicationContext.getBean(WorkerConfig.class);
        this.workerManager = (WorkerManagerThread)SpringApplicationContext.getBean(WorkerManagerThread.class);
    }

    private void setTaskCache(TaskExecutionContext taskExecutionContext) {
        TaskExecutionContext preTaskCache = new TaskExecutionContext();
        preTaskCache.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
        TaskRequest taskRequest = (TaskRequest)JSONUtils.parseObject((String)JSONUtils.toJsonString((Object)taskExecutionContext), TaskRequest.class);
        TaskExecutionContextCacheManager.cacheTaskExecutionContext((TaskRequest)taskRequest);
    }

    public TaskExecuteProcessor(AlertClientService alertClientService, TaskPluginManager taskPluginManager) {
        this();
        this.alertClientService = alertClientService;
        this.taskPluginManager = taskPluginManager;
    }

    public void process(Channel channel, Command command) {
        Preconditions.checkArgument((CommandType.TASK_EXECUTE_REQUEST == command.getType() ? 1 : 0) != 0, (Object)String.format("invalid command type : %s", command.getType()));
        TaskExecuteRequestCommand taskRequestCommand = (TaskExecuteRequestCommand)JSONUtils.parseObject((byte[])command.getBody(), TaskExecuteRequestCommand.class);
        logger.info("received command : {}", (Object)taskRequestCommand);
        if (taskRequestCommand == null) {
            logger.error("task execute request command is null");
            return;
        }
        String contextJson = taskRequestCommand.getTaskExecutionContext();
        TaskExecutionContext taskExecutionContext = (TaskExecutionContext)JSONUtils.parseObject((String)contextJson, TaskExecutionContext.class);
        if (taskExecutionContext == null) {
            logger.error("task execution context is null");
            return;
        }
        this.setTaskCache(taskExecutionContext);
        taskExecutionContext.setHost(NetUtils.getAddr((int)this.workerConfig.getListenPort()));
        taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
        String execLocalPath = this.getExecLocalPath(taskExecutionContext);
        logger.info("task instance local execute path : {}", (Object)execLocalPath);
        taskExecutionContext.setExecutePath(execLocalPath);
        try {
            FileUtils.createWorkDirIfAbsent((String)execLocalPath);
            if (CommonUtils.isSudoEnable() && this.workerConfig.getWorkerTenantAutoCreate()) {
                OSUtils.createUserIfAbsent((String)taskExecutionContext.getTenantCode());
            }
        }
        catch (Throwable ex) {
            String errorLog = String.format("create execLocalPath : %s", execLocalPath);
            LoggerUtils.logError(Optional.of(logger), (String)errorLog, (Throwable)ex);
            TaskExecutionContextCacheManager.removeByTaskInstanceId((Integer)taskExecutionContext.getTaskInstanceId());
        }
        FileUtils.taskLoggerThreadLocal.remove();
        this.taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque()));
        long remainTime = DateUtils.getRemainTime((Date)taskExecutionContext.getFirstSubmitTime(), (long)((long)taskExecutionContext.getDelayTime() * 60L));
        if (remainTime > 0L) {
            logger.info("delay the execution of task instance {}, delay time: {} s", (Object)taskExecutionContext.getTaskInstanceId(), (Object)remainTime);
            taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
            taskExecutionContext.setStartTime(null);
        } else {
            taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
            taskExecutionContext.setStartTime(new Date());
        }
        this.doAck(taskExecutionContext);
        if (!this.workerManager.offer(new TaskExecuteThread(taskExecutionContext, this.taskCallbackService, this.alertClientService, this.taskPluginManager))) {
            logger.info("submit task to manager error, queue is full, queue size is {}", (Object)this.workerManager.getDelayQueueSize());
        }
    }

    private void doAck(TaskExecutionContext taskExecutionContext) {
        TaskExecuteAckCommand ackCommand = this.buildAckCommand(taskExecutionContext);
        ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command(), Event.ACK);
        this.taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command());
    }

    private TaskExecuteAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) {
        TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
        ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
        ackCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
        ackCommand.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
        ackCommand.setHost(taskExecutionContext.getHost());
        ackCommand.setStartTime(taskExecutionContext.getStartTime());
        if (TaskType.SQL.getDesc().equalsIgnoreCase(taskExecutionContext.getTaskType()) || TaskType.PROCEDURE.getDesc().equalsIgnoreCase(taskExecutionContext.getTaskType())) {
            ackCommand.setExecutePath(null);
        } else {
            ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
        }
        taskExecutionContext.setLogPath(ackCommand.getLogPath());
        ackCommand.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
        return ackCommand;
    }

    private String getExecLocalPath(TaskExecutionContext taskExecutionContext) {
        return FileUtils.getProcessExecDir((long)taskExecutionContext.getProjectCode(), (long)taskExecutionContext.getProcessDefineCode(), (int)taskExecutionContext.getProcessDefineVersion(), (int)taskExecutionContext.getProcessInstanceId(), (int)taskExecutionContext.getTaskInstanceId());
    }
}

