/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.worker.task;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.slf4j.Logger;

public abstract class AbstractCommandExecutor {
    protected static final Pattern APPLICATION_REGEX = Pattern.compile("application_\\d+_\\d+");
    private Process process;
    protected Consumer<List<String>> logHandler;
    protected final String taskDir;
    protected final String taskAppId;
    protected final int taskInstId;
    protected final String tenantCode;
    protected final String envFile;
    protected final Date startTime;
    protected int timeout;
    protected Logger logger;
    protected final List<String> logBuffer;

    public AbstractCommandExecutor(Consumer<List<String>> logHandler, String taskDir, String taskAppId, int taskInstId, String tenantCode, String envFile, Date startTime, int timeout, Logger logger) {
        this.logHandler = logHandler;
        this.taskDir = taskDir;
        this.taskAppId = taskAppId;
        this.taskInstId = taskInstId;
        this.tenantCode = tenantCode;
        this.envFile = envFile;
        this.startTime = startTime;
        this.timeout = timeout;
        this.logger = logger;
        this.logBuffer = Collections.synchronizedList(new ArrayList());
    }

    public int run(String execCommand, ProcessDao processDao) {
        int exitStatusCode;
        try {
            if (StringUtils.isEmpty((CharSequence)execCommand)) {
                int exitStatusCode2 = 0;
                return exitStatusCode2;
            }
            String commandFilePath = this.buildCommandFilePath();
            this.createCommandFileIfNotExists(execCommand, commandFilePath);
            this.buildProcess(commandFilePath);
            this.parseProcessOutput(this.process);
            int pid = this.getProcessId(this.process);
            int taskInstId = Integer.parseInt(this.taskAppId.split("_")[2]);
            processDao.updatePidByTaskInstId(taskInstId, pid, "");
            this.logger.info("process start, process id is: {}", (Object)pid);
            long remainTime = this.getRemaintime();
            boolean status = this.process.waitFor(remainTime, TimeUnit.SECONDS);
            if (status) {
                exitStatusCode = this.process.exitValue();
                this.logger.info("process has exited, work dir:{}, pid:{} ,exitStatusCode:{}", new Object[]{this.taskDir, pid, exitStatusCode});
                exitStatusCode = this.updateState(processDao, exitStatusCode, pid, taskInstId);
            } else {
                TaskInstance taskInstance = processDao.findTaskInstanceById(Integer.valueOf(taskInstId));
                if (taskInstance == null) {
                    this.logger.error("task instance id:{} not exist", (Object)taskInstId);
                } else {
                    ProcessUtils.kill(taskInstance);
                }
                exitStatusCode = -1;
                this.logger.warn("process timeout, work dir:{}, pid:{}", (Object)this.taskDir, (Object)pid);
            }
        }
        catch (InterruptedException e) {
            int exitStatusCode3 = -1;
            this.logger.error(String.format("interrupt exception: {}, task may be cancelled or killed", e.getMessage()), (Throwable)e);
            throw new RuntimeException("interrupt exception. exitCode is :  " + exitStatusCode3);
        }
        catch (Exception e) {
            int exitStatusCode4 = -1;
            this.logger.error(e.getMessage(), (Throwable)e);
            throw new RuntimeException("process error . exitCode is :  " + exitStatusCode4);
        }
        return exitStatusCode;
    }

    private void buildProcess(String commandFile) throws IOException {
        ProcessBuilder processBuilder = new ProcessBuilder(new String[0]);
        processBuilder.directory(new File(this.taskDir));
        processBuilder.redirectErrorStream(true);
        processBuilder.command("sudo", "-u", this.tenantCode, this.commandType(), commandFile);
        this.process = processBuilder.start();
        this.printCommand(processBuilder);
    }

    private int updateState(ProcessDao processDao, int exitStatusCode, int pid, int taskInstId) {
        if (exitStatusCode == 0) {
            TaskInstance taskInstance = processDao.findTaskInstanceById(Integer.valueOf(taskInstId));
            this.logger.info("process id is {}", (Object)pid);
            List<String> appIds = this.getAppLinks(taskInstance.getLogPath());
            if (appIds.size() > 0) {
                String appUrl = String.join((CharSequence)",", appIds);
                this.logger.info("yarn log url:{}", (Object)appUrl);
                processDao.updatePidByTaskInstId(taskInstId, pid, appUrl);
            }
            if (!this.isSuccessOfYarnState(appIds)) {
                exitStatusCode = -1;
            }
        }
        return exitStatusCode;
    }

    public void cancelApplication() throws Exception {
        if (this.process == null) {
            return;
        }
        this.clear();
        int processId = this.getProcessId(this.process);
        this.logger.info("cancel process: {}", (Object)processId);
        boolean killed = this.softKill(processId);
        if (!killed) {
            this.hardKill(processId);
            this.process.destroy();
            this.process = null;
        }
    }

    private boolean softKill(int processId) {
        if (processId != 0 && this.process.isAlive()) {
            try {
                String cmd = String.format("sudo kill %d", processId);
                this.logger.info("soft kill task:{}, process id:{}, cmd:{}", new Object[]{this.taskAppId, processId, cmd});
                Runtime.getRuntime().exec(cmd);
            }
            catch (IOException e) {
                this.logger.info("kill attempt failed." + e.getMessage(), (Throwable)e);
            }
        }
        return this.process.isAlive();
    }

    private void hardKill(int processId) {
        if (processId != 0 && this.process.isAlive()) {
            try {
                String cmd = String.format("sudo kill -9 %d", processId);
                this.logger.info("hard kill task:{}, process id:{}, cmd:{}", new Object[]{this.taskAppId, processId, cmd});
                Runtime.getRuntime().exec(cmd);
            }
            catch (IOException e) {
                this.logger.error("kill attempt failed." + e.getMessage(), (Throwable)e);
            }
        }
    }

    private void printCommand(ProcessBuilder processBuilder) {
        try {
            String cmdStr = ProcessUtils.buildCommandStr(processBuilder.command());
            this.logger.info("task run command:\n{}", (Object)cmdStr);
        }
        catch (IOException e) {
            this.logger.error(e.getMessage(), (Throwable)e);
        }
    }

    private void clear() {
        if (!this.logBuffer.isEmpty()) {
            this.logHandler.accept(this.logBuffer);
            this.logBuffer.clear();
        }
    }

    private void parseProcessOutput(final Process process) {
        String threadLoggerInfoName = String.format("TaskLogInfo-%s", this.taskAppId);
        ExecutorService parseProcessOutputExecutorService = ThreadUtils.newDaemonSingleThreadExecutor((String)threadLoggerInfoName);
        parseProcessOutputExecutorService.submit(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                BufferedReader inReader = null;
                try {
                    String line;
                    inReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
                    long lastFlushTime = System.currentTimeMillis();
                    while ((line = inReader.readLine()) != null) {
                        AbstractCommandExecutor.this.logBuffer.add(line);
                        lastFlushTime = AbstractCommandExecutor.this.flush(lastFlushTime);
                    }
                }
                catch (Exception e) {
                    try {
                        AbstractCommandExecutor.this.logger.error(e.getMessage(), (Throwable)e);
                    }
                    catch (Throwable throwable) {
                        AbstractCommandExecutor.this.clear();
                        AbstractCommandExecutor.this.close(inReader);
                        throw throwable;
                    }
                    AbstractCommandExecutor.this.clear();
                    AbstractCommandExecutor.this.close(inReader);
                }
                AbstractCommandExecutor.this.clear();
                AbstractCommandExecutor.this.close(inReader);
            }
        });
        parseProcessOutputExecutorService.shutdown();
    }

    public int getPid() {
        return this.getProcessId(this.process);
    }

    public boolean isSuccessOfYarnState(List<String> appIds) {
        boolean result = true;
        try {
            block2: for (String appId : appIds) {
                while (true) {
                    ExecutionStatus applicationStatus = HadoopUtils.getInstance().getApplicationStatus(appId);
                    this.logger.info("appId:{}, final state:{}", (Object)appId, (Object)applicationStatus.name());
                    if (applicationStatus.equals((Object)ExecutionStatus.FAILURE) || applicationStatus.equals((Object)ExecutionStatus.KILL)) {
                        return false;
                    }
                    if (applicationStatus.equals((Object)ExecutionStatus.SUCCESS)) continue block2;
                    Thread.sleep(1000L);
                }
            }
        }
        catch (Exception e) {
            this.logger.error(String.format("yarn applications: %s  status failed : " + e.getMessage(), appIds.toString()), (Throwable)e);
            result = false;
        }
        return result;
    }

    private List<String> getAppLinks(String fileName) {
        List<String> logs = this.convertFile2List(fileName);
        ArrayList<String> appIds = new ArrayList<String>();
        for (String log : logs) {
            String appId = this.findAppId(log);
            if (!StringUtils.isNotEmpty((CharSequence)appId) || appIds.contains(appId)) continue;
            this.logger.info("find app id: {}", (Object)appId);
            appIds.add(appId);
        }
        return appIds;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<String> convertFile2List(String filename) {
        ArrayList<String> lineList = new ArrayList<String>(100);
        File file = new File(filename);
        if (!file.exists()) {
            return lineList;
        }
        BufferedReader br = null;
        try {
            br = new BufferedReader(new InputStreamReader((InputStream)new FileInputStream(filename), StandardCharsets.UTF_8));
            String line = null;
            while ((line = br.readLine()) != null) {
                lineList.add(line);
            }
        }
        catch (Exception e) {
            this.logger.error(String.format("read file: %s failed : ", filename), (Throwable)e);
        }
        finally {
            if (br != null) {
                try {
                    br.close();
                }
                catch (IOException e) {
                    this.logger.error(e.getMessage(), (Throwable)e);
                }
            }
        }
        return lineList;
    }

    private String findAppId(String line) {
        Matcher matcher = APPLICATION_REGEX.matcher(line);
        if (matcher.find()) {
            return matcher.group();
        }
        return null;
    }

    private long getRemaintime() {
        long usedTime = (System.currentTimeMillis() - this.startTime.getTime()) / 1000L;
        long remainTime = (long)this.timeout - usedTime;
        if (remainTime < 0L) {
            throw new RuntimeException("task execution time out");
        }
        return remainTime;
    }

    private int getProcessId(Process process) {
        int processId = 0;
        try {
            Field f = process.getClass().getDeclaredField("pid");
            f.setAccessible(true);
            processId = f.getInt(process);
        }
        catch (Throwable e) {
            this.logger.error(e.getMessage(), e);
        }
        return processId;
    }

    private long flush(long lastFlushTime) {
        long now = System.currentTimeMillis();
        if (this.logBuffer.size() >= 64 || now - lastFlushTime > 1000L) {
            lastFlushTime = now;
            this.logHandler.accept(this.logBuffer);
            this.logBuffer.clear();
        }
        return lastFlushTime;
    }

    private void close(BufferedReader inReader) {
        if (inReader != null) {
            try {
                inReader.close();
            }
            catch (IOException e) {
                this.logger.error(e.getMessage(), (Throwable)e);
            }
        }
    }

    protected abstract String buildCommandFilePath();

    protected abstract String commandType();

    protected abstract boolean checkFindApp(String var1);

    protected abstract void createCommandFileIfNotExists(String var1, String var2) throws IOException;
}

