/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.plugin.task.api;

import ch.qos.logback.classic.ClassicConstants;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.ProcessUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
import org.slf4j.Logger;

public abstract class AbstractCommandExecutor {
    protected static final Pattern SETVALUE_REGEX = Pattern.compile("[\\$#]\\{setValue\\(([^)]*)\\)}");
    protected StringBuilder varPool = new StringBuilder();
    private Process process;
    protected Consumer<LinkedBlockingQueue<String>> logHandler;
    protected Logger logger;
    protected LinkedBlockingQueue<String> logBuffer;
    protected boolean logOutputIsSuccess = false;
    protected String taskResultString;
    protected TaskExecutionContext taskRequest;

    public AbstractCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler, TaskExecutionContext taskRequest, Logger logger) {
        this.logHandler = logHandler;
        this.taskRequest = taskRequest;
        this.logger = logger;
        this.logBuffer = new LinkedBlockingQueue();
    }

    public AbstractCommandExecutor(LinkedBlockingQueue<String> logBuffer) {
        this.logBuffer = logBuffer;
    }

    private void buildProcess(String commandFile) throws IOException {
        LinkedList<String> command = new LinkedList<String>();
        ProcessBuilder processBuilder = new ProcessBuilder(new String[0]);
        processBuilder.directory(new File(this.taskRequest.getExecutePath()));
        processBuilder.redirectErrorStream(true);
        if (OSUtils.isSudoEnable()) {
            if (SystemUtils.IS_OS_LINUX && PropertyUtils.getBoolean((String)"task.resource.limit.state")) {
                this.generateCgroupCommand(command);
            } else {
                command.add("sudo");
                command.add("-u");
                command.add(this.taskRequest.getTenantCode());
                command.add("-i");
            }
        }
        command.add(this.commandInterpreter());
        command.addAll(Collections.emptyList());
        command.add(commandFile);
        processBuilder.command(command);
        this.process = processBuilder.start();
        this.printCommand(command);
    }

    private void generateCgroupCommand(List<String> command) {
        Integer cpuQuota = this.taskRequest.getCpuQuota();
        Integer memoryMax = this.taskRequest.getMemoryMax();
        command.add("sudo");
        command.add("systemd-run");
        command.add("-q");
        command.add("--scope");
        if (cpuQuota == -1) {
            command.add("-p");
            command.add("CPUQuota=");
        } else {
            command.add("-p");
            command.add(String.format("CPUQuota=%s%%", this.taskRequest.getCpuQuota()));
        }
        if (memoryMax == -1) {
            command.add("-p");
            command.add(String.format("MemoryMax=%s", "infinity"));
        } else {
            command.add("-p");
            command.add(String.format("MemoryMax=%sM", this.taskRequest.getMemoryMax()));
        }
        command.add(String.format("--uid=%s", this.taskRequest.getTenantCode()));
    }

    public TaskResponse run(String execCommand) throws IOException, InterruptedException {
        TaskResponse result = new TaskResponse();
        int taskInstanceId = this.taskRequest.getTaskInstanceId();
        if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
            result.setExitStatusCode(137);
            return result;
        }
        if (StringUtils.isEmpty((CharSequence)execCommand)) {
            TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
            return result;
        }
        String commandFilePath = this.buildCommandFilePath();
        this.createCommandFileIfNotExists(execCommand, commandFilePath);
        this.buildProcess(commandFilePath);
        this.parseProcessOutput(this.process);
        int processId = this.getProcessId(this.process);
        result.setProcessId(processId);
        this.taskRequest.setProcessId(processId);
        boolean updateTaskExecutionContextStatus = TaskExecutionContextCacheManager.updateTaskExecutionContext(this.taskRequest);
        if (Boolean.FALSE.equals(updateTaskExecutionContextStatus)) {
            ProcessUtils.kill(this.taskRequest);
            result.setExitStatusCode(137);
            return result;
        }
        this.logger.info("process start, process id is: {}", (Object)processId);
        long remainTime = this.getRemainTime();
        boolean status = this.process.waitFor(remainTime, TimeUnit.SECONDS);
        if (status) {
            result.setExitStatusCode(this.process.exitValue());
        } else {
            this.logger.error("process has failure, the task timeout configuration value is:{}, ready to kill ...", (Object)this.taskRequest.getTaskTimeout());
            ProcessUtils.kill(this.taskRequest);
            result.setExitStatusCode(-1);
        }
        int exitCode = this.process.exitValue();
        String exitLogMessage = 137 == exitCode ? "process has killed." : "process has exited.";
        this.logger.info(exitLogMessage + " execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}", new Object[]{this.taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode});
        return result;
    }

    public String getVarPool() {
        return this.varPool.toString();
    }

    public void cancelApplication() throws Exception {
        if (this.process == null) {
            return;
        }
        this.clear();
        int processId = this.getProcessId(this.process);
        this.logger.info("cancel process: {}", (Object)processId);
        boolean alive = this.softKill(processId);
        if (alive) {
            this.hardKill(processId);
        }
    }

    private boolean softKill(int processId) {
        if (processId != 0 && this.process.isAlive()) {
            try {
                String cmd = String.format("kill %d", processId);
                cmd = OSUtils.getSudoCmd(this.taskRequest.getTenantCode(), cmd);
                this.logger.info("soft kill task:{}, process id:{}, cmd:{}", new Object[]{this.taskRequest.getTaskAppId(), processId, cmd});
                Runtime.getRuntime().exec(cmd);
            }
            catch (IOException e) {
                this.logger.info("kill attempt failed", (Throwable)e);
            }
        }
        return this.process.isAlive();
    }

    private void hardKill(int processId) {
        if (processId != 0 && this.process.isAlive()) {
            try {
                String cmd = String.format("kill -9 %s", ProcessUtils.getPidsStr(processId));
                cmd = OSUtils.getSudoCmd(this.taskRequest.getTenantCode(), cmd);
                this.logger.info("hard kill task:{}, process id:{}, cmd:{}", new Object[]{this.taskRequest.getTaskAppId(), processId, cmd});
                OSUtils.exeCmd(cmd);
            }
            catch (Exception e) {
                this.logger.error("kill attempt failed ", (Throwable)e);
            }
        }
    }

    private void printCommand(List<String> commands) {
        this.logger.info("task run command: {}", (Object)String.join((CharSequence)" ", commands));
    }

    private void clear() {
        LinkedBlockingQueue<String> markerLog = new LinkedBlockingQueue<String>(1);
        markerLog.add(ClassicConstants.FINALIZE_SESSION_MARKER.toString());
        if (!this.logBuffer.isEmpty()) {
            this.logHandler.accept(this.logBuffer);
            this.logBuffer.clear();
        }
        this.logHandler.accept(markerLog);
    }

    private void parseProcessOutput(Process process) {
        String threadLoggerInfoName = this.taskRequest.getTaskLogName();
        ExecutorService getOutputLogService = this.newDaemonSingleThreadExecutor(threadLoggerInfoName);
        getOutputLogService.submit(() -> {
            try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()));){
                String line;
                while ((line = inReader.readLine()) != null) {
                    if (line.startsWith("${setValue(") || line.startsWith("#{setValue(")) {
                        this.varPool.append(this.findVarPool(line));
                        this.varPool.append("$VarPool$");
                        continue;
                    }
                    this.logBuffer.add(line);
                    this.taskResultString = line;
                }
                this.logOutputIsSuccess = true;
            }
            catch (Exception e) {
                this.logger.error(e.getMessage(), (Throwable)e);
                this.logOutputIsSuccess = true;
            }
        });
        getOutputLogService.shutdown();
        ExecutorService parseProcessOutputExecutorService = this.newDaemonSingleThreadExecutor(threadLoggerInfoName);
        parseProcessOutputExecutorService.submit(() -> {
            try {
                long lastFlushTime = System.currentTimeMillis();
                while (this.logBuffer.size() > 0 || !this.logOutputIsSuccess) {
                    if (this.logBuffer.size() > 0) {
                        lastFlushTime = this.flush(lastFlushTime);
                        continue;
                    }
                    Thread.sleep(1000L);
                }
            }
            catch (Exception e) {
                Thread.currentThread().interrupt();
                this.logger.error(e.getMessage(), (Throwable)e);
            }
            finally {
                this.clear();
            }
        });
        parseProcessOutputExecutorService.shutdown();
    }

    private String findVarPool(String line) {
        Matcher matcher = SETVALUE_REGEX.matcher(line);
        if (matcher.find()) {
            return matcher.group(1);
        }
        return null;
    }

    private long getRemainTime() {
        long usedTime = (System.currentTimeMillis() - this.taskRequest.getStartTime().getTime()) / 1000L;
        long remainTime = (long)this.taskRequest.getTaskTimeout() - usedTime;
        if (remainTime < 0L) {
            throw new RuntimeException("task execution time out");
        }
        return remainTime;
    }

    private int getProcessId(Process process) {
        int processId = 0;
        try {
            Field f = process.getClass().getDeclaredField("pid");
            f.setAccessible(true);
            processId = f.getInt(process);
        }
        catch (Throwable e) {
            this.logger.error(e.getMessage(), e);
        }
        return processId;
    }

    private long flush(long lastFlushTime) {
        long now = System.currentTimeMillis();
        if (this.logBuffer.size() >= 64 || now - lastFlushTime > 1000L) {
            lastFlushTime = now;
            this.logHandler.accept(this.logBuffer);
            this.logBuffer.clear();
        }
        return lastFlushTime;
    }

    protected abstract String buildCommandFilePath();

    protected abstract void createCommandFileIfNotExists(String var1, String var2) throws IOException;

    ExecutorService newDaemonSingleThreadExecutor(String threadName) {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build();
        return Executors.newSingleThreadExecutor(threadFactory);
    }

    protected abstract String commandInterpreter();
}

