/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.worker.task;

import ch.qos.logback.classic.ClassicConstants;
import com.sun.jna.platform.win32.Kernel32;
import com.sun.jna.platform.win32.WinNT;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;

public abstract class AbstractCommandExecutor {
    protected static final Pattern APPLICATION_REGEX = Pattern.compile("application_\\d+_\\d+");
    private Process process;
    protected Consumer<List<String>> logHandler;
    protected Logger logger;
    protected final List<String> logBuffer;
    protected TaskExecutionContext taskExecutionContext;
    protected boolean logOutputIsSuccess = false;
    private TaskExecutionContextCacheManager taskExecutionContextCacheManager;

    public AbstractCommandExecutor(Consumer<List<String>> logHandler, TaskExecutionContext taskExecutionContext, Logger logger) {
        this.logHandler = logHandler;
        this.taskExecutionContext = taskExecutionContext;
        this.logger = logger;
        this.logBuffer = Collections.synchronizedList(new ArrayList());
        this.taskExecutionContextCacheManager = (TaskExecutionContextCacheManager)SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
    }

    private void buildProcess(String commandFile) throws IOException {
        LinkedList<String> command = new LinkedList<String>();
        if (OSUtils.isWindows()) {
            throw new RuntimeException("not support windows !");
        }
        ProcessBuilder processBuilder = new ProcessBuilder(new String[0]);
        processBuilder.directory(new File(this.taskExecutionContext.getExecutePath()));
        processBuilder.redirectErrorStream(true);
        command.add("sudo");
        command.add("-u");
        command.add(this.taskExecutionContext.getTenantCode());
        command.add(this.commandInterpreter());
        command.addAll(this.commandOptions());
        command.add(commandFile);
        processBuilder.command(command);
        this.process = processBuilder.start();
        this.printCommand(command);
    }

    public CommandExecuteResult run(String execCommand) throws Exception {
        CommandExecuteResult result = new CommandExecuteResult();
        int taskInstanceId = this.taskExecutionContext.getTaskInstanceId();
        if (null == this.taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
            result.setExitStatusCode(137);
            return result;
        }
        if (StringUtils.isEmpty((CharSequence)execCommand)) {
            this.taskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
            return result;
        }
        String commandFilePath = this.buildCommandFilePath();
        this.createCommandFileIfNotExists(execCommand, commandFilePath);
        this.buildProcess(commandFilePath);
        this.parseProcessOutput(this.process);
        Integer processId = this.getProcessId(this.process);
        result.setProcessId(processId);
        this.taskExecutionContext.setProcessId(processId);
        boolean updateTaskExecutionContextStatus = this.taskExecutionContextCacheManager.updateTaskExecutionContext(this.taskExecutionContext);
        if (Boolean.FALSE.equals(updateTaskExecutionContextStatus)) {
            ProcessUtils.kill(this.taskExecutionContext);
            result.setExitStatusCode(137);
            return result;
        }
        this.logger.info("process start, process id is: {}", (Object)processId);
        long remainTime = this.getRemaintime();
        boolean status = this.process.waitFor(remainTime, TimeUnit.SECONDS);
        this.logger.info("process has exited, execute path:{}, processId:{} ,exitStatusCode:{}", new Object[]{this.taskExecutionContext.getExecutePath(), processId, result.getExitStatusCode()});
        if (status) {
            List<String> appIds = this.getAppIds(this.taskExecutionContext.getLogPath());
            result.setAppIds(String.join((CharSequence)",", appIds));
            result.setExitStatusCode(this.process.exitValue());
            if (this.process.exitValue() == 0) {
                result.setExitStatusCode(this.isSuccessOfYarnState(appIds) ? 0 : -1);
            }
        } else {
            this.logger.error("process has failure , exitStatusCode : {} , ready to kill ...", (Object)result.getExitStatusCode());
            ProcessUtils.kill(this.taskExecutionContext);
            result.setExitStatusCode(-1);
        }
        return result;
    }

    public void cancelApplication() throws Exception {
        if (this.process == null) {
            return;
        }
        this.clear();
        int processId = this.getProcessId(this.process);
        this.logger.info("cancel process: {}", (Object)processId);
        boolean killed = this.softKill(processId);
        if (!killed) {
            this.hardKill(processId);
            this.process.destroy();
            this.process = null;
        }
    }

    private boolean softKill(int processId) {
        if (processId != 0 && this.process.isAlive()) {
            try {
                String cmd = String.format("sudo kill %d", processId);
                this.logger.info("soft kill task:{}, process id:{}, cmd:{}", new Object[]{this.taskExecutionContext.getTaskAppId(), processId, cmd});
                Runtime.getRuntime().exec(cmd);
            }
            catch (IOException e) {
                this.logger.info("kill attempt failed", (Throwable)e);
            }
        }
        return !this.process.isAlive();
    }

    private void hardKill(int processId) {
        if (processId != 0 && this.process.isAlive()) {
            try {
                String cmd = String.format("sudo kill -9 %d", processId);
                this.logger.info("hard kill task:{}, process id:{}, cmd:{}", new Object[]{this.taskExecutionContext.getTaskAppId(), processId, cmd});
                Runtime.getRuntime().exec(cmd);
            }
            catch (IOException e) {
                this.logger.error("kill attempt failed ", (Throwable)e);
            }
        }
    }

    private void printCommand(List<String> commands) {
        try {
            String cmdStr = ProcessUtils.buildCommandStr(commands);
            this.logger.info("task run command:\n{}", (Object)cmdStr);
        }
        catch (IOException e) {
            this.logger.error(e.getMessage(), (Throwable)e);
        }
    }

    private void clear() {
        ArrayList<String> markerList = new ArrayList<String>();
        markerList.add(ClassicConstants.FINALIZE_SESSION_MARKER.toString());
        if (!this.logBuffer.isEmpty()) {
            this.logHandler.accept(this.logBuffer);
            this.logBuffer.clear();
        }
        this.logHandler.accept(markerList);
    }

    private void parseProcessOutput(Process process) {
        String threadLoggerInfoName = String.format("TaskLogInfo-%s", this.taskExecutionContext.getTaskAppId());
        ExecutorService getOutputLogService = ThreadUtils.newDaemonSingleThreadExecutor((String)(threadLoggerInfoName + "-getOutputLogService"));
        getOutputLogService.submit(() -> {
            BufferedReader inReader = null;
            try {
                String line;
                inReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
                while ((line = inReader.readLine()) != null) {
                    this.logBuffer.add(line);
                }
                this.logOutputIsSuccess = true;
                this.close(inReader);
            }
            catch (Exception e) {
                try {
                    this.logger.error(e.getMessage(), (Throwable)e);
                    this.logOutputIsSuccess = true;
                    this.close(inReader);
                }
                catch (Throwable throwable) {
                    this.logOutputIsSuccess = true;
                    this.close(inReader);
                    throw throwable;
                }
            }
        });
        getOutputLogService.shutdown();
        ExecutorService parseProcessOutputExecutorService = ThreadUtils.newDaemonSingleThreadExecutor((String)threadLoggerInfoName);
        parseProcessOutputExecutorService.submit(() -> {
            try {
                long lastFlushTime = System.currentTimeMillis();
                while (this.logBuffer.size() > 0 || !this.logOutputIsSuccess) {
                    if (this.logBuffer.size() > 0) {
                        lastFlushTime = this.flush(lastFlushTime);
                        continue;
                    }
                    Thread.sleep(1000L);
                }
            }
            catch (Exception e) {
                this.logger.error(e.getMessage(), (Throwable)e);
            }
            finally {
                this.clear();
            }
        });
        parseProcessOutputExecutorService.shutdown();
    }

    public boolean isSuccessOfYarnState(List<String> appIds) {
        boolean result = true;
        try {
            block2: for (String appId : appIds) {
                while (Stopper.isRunning()) {
                    ExecutionStatus applicationStatus = HadoopUtils.getInstance().getApplicationStatus(appId);
                    this.logger.info("appId:{}, final state:{}", (Object)appId, (Object)applicationStatus.name());
                    if (applicationStatus.equals((Object)ExecutionStatus.FAILURE) || applicationStatus.equals((Object)ExecutionStatus.KILL)) {
                        return false;
                    }
                    if (applicationStatus.equals((Object)ExecutionStatus.SUCCESS)) continue block2;
                    Thread.sleep(1000L);
                }
            }
        }
        catch (Exception e) {
            this.logger.error(String.format("yarn applications: %s  status failed ", appIds.toString()), (Throwable)e);
            result = false;
        }
        return result;
    }

    public int getProcessId() {
        return this.getProcessId(this.process);
    }

    private List<String> getAppIds(String logPath) {
        List<String> logs = this.convertFile2List(logPath);
        ArrayList<String> appIds = new ArrayList<String>();
        for (String log : logs) {
            String appId = this.findAppId(log);
            if (!StringUtils.isNotEmpty((CharSequence)appId) || appIds.contains(appId)) continue;
            this.logger.info("find app id: {}", (Object)appId);
            appIds.add(appId);
        }
        return appIds;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<String> convertFile2List(String filename) {
        ArrayList<String> lineList = new ArrayList<String>(100);
        File file = new File(filename);
        if (!file.exists()) {
            return lineList;
        }
        BufferedReader br = null;
        try {
            br = new BufferedReader(new InputStreamReader((InputStream)new FileInputStream(filename), StandardCharsets.UTF_8));
            String line = null;
            while ((line = br.readLine()) != null) {
                lineList.add(line);
            }
        }
        catch (Exception e) {
            this.logger.error(String.format("read file: %s failed : ", filename), (Throwable)e);
        }
        finally {
            if (br != null) {
                try {
                    br.close();
                }
                catch (IOException e) {
                    this.logger.error(e.getMessage(), (Throwable)e);
                }
            }
        }
        return lineList;
    }

    private String findAppId(String line) {
        Matcher matcher = APPLICATION_REGEX.matcher(line);
        if (matcher.find()) {
            return matcher.group();
        }
        return null;
    }

    private long getRemaintime() {
        long usedTime = (System.currentTimeMillis() - this.taskExecutionContext.getStartTime().getTime()) / 1000L;
        long remainTime = (long)this.taskExecutionContext.getTaskTimeout() - usedTime;
        if (remainTime < 0L) {
            throw new RuntimeException("task execution time out");
        }
        return remainTime;
    }

    private int getProcessId(Process process) {
        int processId = 0;
        try {
            Field f = process.getClass().getDeclaredField(Constants.PID);
            f.setAccessible(true);
            if (OSUtils.isWindows()) {
                WinNT.HANDLE handle = (WinNT.HANDLE)f.get(process);
                processId = Kernel32.INSTANCE.GetProcessId(handle);
            } else {
                processId = f.getInt(process);
            }
        }
        catch (Throwable e) {
            this.logger.error(e.getMessage(), e);
        }
        return processId;
    }

    private long flush(long lastFlushTime) {
        long now = System.currentTimeMillis();
        if (this.logBuffer.size() >= 64 || now - lastFlushTime > 1000L) {
            lastFlushTime = now;
            this.logHandler.accept(this.logBuffer);
            this.logBuffer.clear();
        }
        return lastFlushTime;
    }

    private void close(BufferedReader inReader) {
        if (inReader != null) {
            try {
                inReader.close();
            }
            catch (IOException e) {
                this.logger.error(e.getMessage(), (Throwable)e);
            }
        }
    }

    protected List<String> commandOptions() {
        return Collections.emptyList();
    }

    protected abstract String buildCommandFilePath();

    protected abstract String commandInterpreter();

    protected abstract void createCommandFileIfNotExists(String var1, String var2) throws IOException;
}

