/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.plugin.task.api;

import io.fabric8.kubernetes.client.dsl.LogWatch;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.constants.TenantConstants;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ShellUtils;
import org.slf4j.Logger;

public abstract class AbstractCommandExecutor {
    protected static final Pattern SETVALUE_REGEX = Pattern.compile("[\\$#]\\{setValue\\((.*?)\\)}");
    protected StringBuilder varPool = new StringBuilder();
    private Process process;
    protected Consumer<LinkedBlockingQueue<String>> logHandler;
    protected Logger logger;
    protected LinkedBlockingQueue<String> logBuffer;
    protected boolean processLogOutputIsSuccess = false;
    protected boolean podLogOutputIsFinished = false;
    protected TaskExecutionContext taskRequest;
    protected Future<?> taskOutputFuture;
    protected Future<?> podLogOutputFuture;

    public AbstractCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler, TaskExecutionContext taskRequest, Logger logger) {
        this.logHandler = logHandler;
        this.taskRequest = taskRequest;
        this.logger = logger;
        this.logBuffer = new LinkedBlockingQueue();
        this.logBuffer.add("");
        if (this.taskRequest != null) {
            this.taskRequest.setLogBufferEnable(true);
        }
    }

    public TaskResponse run(IShellInterceptorBuilder iShellInterceptorBuilder, TaskCallBack taskCallBack) throws Exception {
        TaskResponse result = new TaskResponse();
        int taskInstanceId = this.taskRequest.getTaskInstanceId();
        if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
            this.logger.warn("Cannot find the taskInstance: {} from TaskExecutionContextCacheManager, the task might already been killed", (Object)taskInstanceId);
            result.setExitStatusCode(137);
            return result;
        }
        iShellInterceptorBuilder = iShellInterceptorBuilder.shellDirectory(this.taskRequest.getExecutePath()).shellName(this.taskRequest.getTaskAppId());
        if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {
            ShellUtils.ENV_SOURCE_LIST.forEach(iShellInterceptorBuilder::appendSystemEnv);
        }
        if (StringUtils.isNotBlank((CharSequence)this.taskRequest.getEnvironmentConfig())) {
            iShellInterceptorBuilder.appendCustomEnvScript(this.taskRequest.getEnvironmentConfig());
        }
        if (this.taskRequest.getK8sTaskExecutionContext() != null) {
            iShellInterceptorBuilder.k8sConfigYaml(this.taskRequest.getK8sTaskExecutionContext().getConfigYaml());
        }
        iShellInterceptorBuilder.sudoMode(OSUtils.isSudoEnable());
        if ("default".equals(this.taskRequest.getTenantCode())) {
            iShellInterceptorBuilder.runUser(TenantConstants.BOOTSTRAPT_SYSTEM_USER);
        } else {
            iShellInterceptorBuilder.runUser(this.taskRequest.getTenantCode());
        }
        if (this.taskRequest.getCpuQuota() != null) {
            iShellInterceptorBuilder.cpuQuota(this.taskRequest.getCpuQuota());
        }
        if (this.taskRequest.getMemoryMax() != null) {
            iShellInterceptorBuilder.memoryQuota(this.taskRequest.getMemoryMax());
        }
        Object iShellInterceptor = iShellInterceptorBuilder.build();
        this.process = iShellInterceptor.execute();
        this.parseProcessOutput(this.process);
        this.collectPodLogIfNeeded();
        int processId = this.getProcessId(this.process);
        result.setProcessId(processId);
        this.taskRequest.setProcessId(processId);
        boolean updateTaskExecutionContextStatus = TaskExecutionContextCacheManager.updateTaskExecutionContext(this.taskRequest);
        if (Boolean.FALSE.equals(updateTaskExecutionContextStatus)) {
            result.setExitStatusCode(137);
            this.cancelApplication();
            return result;
        }
        this.logger.info("process start, process id is: {}", (Object)processId);
        long remainTime = this.getRemainTime();
        if (null != taskCallBack) {
            taskCallBack.updateTaskInstanceInfo(taskInstanceId);
        }
        boolean status = this.process.waitFor(remainTime, TimeUnit.SECONDS);
        TaskExecutionStatus kubernetesStatus = ProcessUtils.getApplicationStatus(this.taskRequest.getK8sTaskExecutionContext(), this.taskRequest.getTaskAppId());
        if (this.taskOutputFuture != null) {
            try {
                this.taskOutputFuture.get();
            }
            catch (ExecutionException e) {
                this.logger.error("Handle task log error", (Throwable)e);
            }
        }
        if (this.podLogOutputFuture != null) {
            try {
                this.podLogOutputFuture.get();
                ProcessUtils.cancelApplication(this.taskRequest);
            }
            catch (ExecutionException e) {
                this.logger.error("Handle pod log error", (Throwable)e);
            }
        }
        if (status && kubernetesStatus.isSuccess()) {
            result.setExitStatusCode(this.process.exitValue());
        } else {
            this.logger.error("process has failure, the task timeout configuration value is:{}, ready to kill ...", (Object)this.taskRequest.getTaskTimeout());
            result.setExitStatusCode(-1);
            this.cancelApplication();
        }
        int exitCode = this.process.exitValue();
        String exitLogMessage = 137 == exitCode ? "process has killed." : "process has exited.";
        this.logger.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}", new Object[]{exitLogMessage, this.taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode});
        return result;
    }

    public String getVarPool() {
        return this.varPool.toString();
    }

    public void cancelApplication() throws InterruptedException {
        if (this.process == null) {
            return;
        }
        this.logger.info("Begin to kill process process, pid is : {}", (Object)this.taskRequest.getProcessId());
        this.process.destroy();
        if (!this.process.waitFor(5L, TimeUnit.SECONDS)) {
            this.process.destroyForcibly();
        }
        this.logger.info("Success kill task: {}, pid: {}", (Object)this.taskRequest.getTaskAppId(), (Object)this.taskRequest.getProcessId());
    }

    private void printCommand(List<String> commands) {
        this.logger.info("task run command: {}", (Object)String.join((CharSequence)" ", commands));
    }

    private void collectPodLogIfNeeded() {
        if (null == this.taskRequest.getK8sTaskExecutionContext()) {
            this.podLogOutputIsFinished = true;
            return;
        }
        ScheduledExecutorService collectPodLogExecutorService = ThreadUtils.newSingleDaemonScheduledExecutorService((String)("CollectPodLogOutput-thread-" + this.taskRequest.getTaskName()));
        this.podLogOutputFuture = collectPodLogExecutorService.submit(() -> {
            ThreadUtils.sleep((long)5000L);
            try (LogWatch watcher = ProcessUtils.getPodLogWatcher(this.taskRequest.getK8sTaskExecutionContext(), this.taskRequest.getTaskAppId(), "");){
                if (watcher == null) {
                    throw new RuntimeException("The driver pod does not exist.");
                }
                try (BufferedReader reader = new BufferedReader(new InputStreamReader(watcher.getOutput()));){
                    String line;
                    while ((line = reader.readLine()) != null) {
                        this.logBuffer.add(String.format("[K8S-pod-log-%s]: %s", this.taskRequest.getTaskName(), line));
                    }
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            finally {
                this.podLogOutputIsFinished = true;
            }
        });
        collectPodLogExecutorService.shutdown();
    }

    private void parseProcessOutput(Process process) {
        ScheduledExecutorService getOutputLogService = ThreadUtils.newSingleDaemonScheduledExecutorService((String)("ResolveOutputLog-thread-" + this.taskRequest.getTaskName()));
        getOutputLogService.submit(() -> {
            try (LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setTaskInstanceLogFullPathMDC(this.taskRequest.getLogPath());
                 BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()));){
                String line;
                while ((line = inReader.readLine()) != null) {
                    if (line.startsWith("${setValue(") || line.startsWith("#{setValue(")) {
                        this.varPool.append(this.findVarPool(line));
                        this.varPool.append("$VarPool$");
                        continue;
                    }
                    this.logBuffer.add(line);
                }
                this.processLogOutputIsSuccess = true;
            }
            catch (Exception e) {
                this.logger.error("Parse var pool error", (Throwable)e);
                this.processLogOutputIsSuccess = true;
            }
        });
        getOutputLogService.shutdown();
        ScheduledExecutorService parseProcessOutputExecutorService = ThreadUtils.newSingleDaemonScheduledExecutorService((String)("TaskInstanceLogOutput-thread-" + this.taskRequest.getTaskName()));
        this.taskOutputFuture = parseProcessOutputExecutorService.submit(() -> {
            try (LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setTaskInstanceLogFullPathMDC(this.taskRequest.getLogPath());){
                while (this.logBuffer.size() > 1 || !this.processLogOutputIsSuccess || !this.podLogOutputIsFinished) {
                    if (this.logBuffer.size() > 1) {
                        this.logHandler.accept(this.logBuffer);
                        this.logBuffer.clear();
                        this.logBuffer.add("");
                        continue;
                    }
                    Thread.sleep(1000L);
                }
            }
            catch (Exception e) {
                this.logger.error("Output task log error", (Throwable)e);
            }
        });
        parseProcessOutputExecutorService.shutdown();
    }

    private String findVarPool(String line) {
        Matcher matcher = SETVALUE_REGEX.matcher(line);
        if (matcher.find()) {
            return matcher.group(1);
        }
        return null;
    }

    private long getRemainTime() {
        long usedTime = (System.currentTimeMillis() - this.taskRequest.getStartTime()) / 1000L;
        long remainTime = (long)this.taskRequest.getTaskTimeout() - usedTime;
        if (remainTime < 0L) {
            throw new RuntimeException("task execution time out");
        }
        return remainTime;
    }

    private int getProcessId(Process process) {
        int processId = 0;
        try {
            Field f = process.getClass().getDeclaredField("pid");
            f.setAccessible(true);
            processId = f.getInt(process);
        }
        catch (Exception e) {
            this.logger.error("Get task pid failed", (Throwable)e);
        }
        return processId;
    }
}

