package org.apache.hama.bsp;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.RunJar;
import org.apache.hama.bsp.GroomServer;
import org.apache.hama.bsp.TaskStatus;

/* loaded from: input_file:org/apache/hama/bsp/TaskRunner.class */
public class TaskRunner extends Thread {
    public static final Log LOG = LogFactory.getLog(TaskRunner.class);
    private static final String SYSTEM_PATH_SEPARATOR = System.getProperty("path.separator");
    boolean bspKilled = false;
    private Process bspProcess;
    private Thread errorLog;
    private Thread infoLog;
    private final Task task;
    private final BSPJob bspJob;
    private final GroomServer groomServer;
    private File logDir;

    /* loaded from: input_file:org/apache/hama/bsp/TaskRunner$BspChildRunner.class */
    class BspChildRunner implements Callable<Object> {
        private final List<String> commands;
        private final File workDir;
        private final ScheduledExecutorService sched = Executors.newScheduledThreadPool(1);
        private final AtomicReference<ScheduledFuture<Object>> future = new AtomicReference<>();

        BspChildRunner(List<String> list, File file) {
            this.commands = list;
            this.workDir = file;
        }

        void start() {
            this.future.set(this.sched.schedule(this, 0L, TimeUnit.SECONDS));
            TaskRunner.LOG.debug("Start building BSPPeer process.");
        }

        void stop() {
            TaskRunner.this.killBsp();
            this.sched.schedule(this, 0L, TimeUnit.SECONDS);
            TaskRunner.LOG.info("Stop BSPPeer process.");
        }

        void join() throws InterruptedException, ExecutionException {
            this.future.get().get();
        }

        @Override // java.util.concurrent.Callable
        public Object call() throws Exception {
            final boolean z = TaskRunner.this.bspJob.getConfiguration().getBoolean("hama.child.redirect.log.console", false);
            ProcessBuilder processBuilder = new ProcessBuilder(this.commands);
            processBuilder.directory(this.workDir);
            try {
                try {
                    TaskRunner.this.bspProcess = processBuilder.start();
                    TaskRunner.this.errorLog = new Thread() { // from class: org.apache.hama.bsp.TaskRunner.BspChildRunner.1
                        @Override // java.lang.Thread, java.lang.Runnable
                        public void run() {
                            TaskRunner.this.logStream(TaskRunner.this.bspProcess.getErrorStream(), z ? LogType.CONSOLE : LogType.ERROR);
                        }
                    };
                    TaskRunner.this.errorLog.start();
                    TaskRunner.this.infoLog = new Thread() { // from class: org.apache.hama.bsp.TaskRunner.BspChildRunner.2
                        @Override // java.lang.Thread, java.lang.Runnable
                        public void run() {
                            TaskRunner.this.logStream(TaskRunner.this.bspProcess.getInputStream(), z ? LogType.CONSOLE : LogType.STDOUT);
                        }
                    };
                    TaskRunner.this.infoLog.start();
                    int waitFor = TaskRunner.this.bspProcess.waitFor();
                    if (!TaskRunner.this.bspKilled && waitFor != 0) {
                        throw new IOException("BSP task process exit with nonzero status of " + waitFor + ". command = " + this.commands);
                    }
                    TaskRunner.this.killBsp();
                    return null;
                } catch (IOException e) {
                    TaskRunner.LOG.error("Error when executing BSPPeer process.", e);
                    TaskRunner.this.killBsp();
                    return null;
                } catch (InterruptedException e2) {
                    TaskRunner.LOG.warn("Thread is interrupted when execeuting BSP process.", e2);
                    TaskRunner.this.killBsp();
                    return null;
                }
            } catch (Throwable th) {
                TaskRunner.this.killBsp();
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hama/bsp/TaskRunner$LogType.class */
    public enum LogType {
        STDOUT,
        ERROR,
        CONSOLE
    }

    public TaskRunner(BSPTask bSPTask, GroomServer groomServer, BSPJob bSPJob) {
        this.task = bSPTask;
        this.bspJob = bSPJob;
        this.groomServer = groomServer;
    }

    public Task getTask() {
        return this.task;
    }

    public boolean prepare() throws IOException {
        return true;
    }

    private File createWorkDirectory() {
        File file = new File(new File(this.task.getJobFile()).getParent(), "work");
        if (file.mkdirs()) {
            LOG.debug("TaskRunner.workDir : " + file);
        }
        return file;
    }

    private static String assembleClasspath(BSPJob bSPJob, File file) {
        StringBuffer stringBuffer = new StringBuffer();
        stringBuffer.append(System.getProperty("java.class.path"));
        stringBuffer.append(SYSTEM_PATH_SEPARATOR);
        String jar = bSPJob.getJar();
        if (jar != null) {
            try {
                RunJar.unJar(new File(jar), file);
            } catch (IOException e) {
                LOG.error("Unable to uncompressing file to " + file.toString(), e);
            }
            File[] listFiles = new File(file, "lib").listFiles();
            if (listFiles != null) {
                for (File file2 : listFiles) {
                    stringBuffer.append(SYSTEM_PATH_SEPARATOR);
                    stringBuffer.append(file2);
                }
            }
            stringBuffer.append(SYSTEM_PATH_SEPARATOR);
            stringBuffer.append(new File(file, "classes"));
            stringBuffer.append(SYSTEM_PATH_SEPARATOR);
            stringBuffer.append(file);
        }
        return stringBuffer.toString();
    }

    private List<String> buildJvmArgs(BSPJob bSPJob, String str, Class<?> cls) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new File(new File(System.getProperty("java.home"), "bin"), "java").toString());
        for (String str2 : bSPJob.getConfiguration().get("bsp.child.java.opts", "-Xmx200m").replace("@taskid@", this.task.getTaskID().toString()).split(" ")) {
            arrayList.add(str2);
        }
        arrayList.add("-classpath");
        arrayList.add(str.toString());
        LOG.debug("Executing child Process " + cls.getName());
        arrayList.add(cls.getName());
        if (GroomServer.BSPPeerChild.class.equals(cls)) {
            InetSocketAddress taskTrackerReportAddress = this.groomServer.getTaskTrackerReportAddress();
            arrayList.add(taskTrackerReportAddress.getHostName());
            arrayList.add(Integer.toString(taskTrackerReportAddress.getPort()));
            arrayList.add(this.task.getTaskID().toString());
            arrayList.add(this.groomServer.groomHostName);
            arrayList.add(Long.toString(this.groomServer.getStartSuperstep(this.task.getTaskID())));
            TaskStatus taskStatus = this.groomServer.getTaskStatus(this.task.getTaskID());
            if (taskStatus == null || !TaskStatus.State.RECOVERING.equals(taskStatus.getRunState())) {
                arrayList.add(TaskStatus.State.RUNNING.name());
            } else {
                arrayList.add(TaskStatus.State.RECOVERING.name());
            }
        }
        return arrayList;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        File createWorkDirectory = createWorkDirectory();
        this.logDir = createLogDirectory();
        String assembleClasspath = assembleClasspath(this.bspJob, createWorkDirectory);
        LOG.debug("Spawned child's classpath " + assembleClasspath);
        BspChildRunner bspChildRunner = new BspChildRunner(buildJvmArgs(this.bspJob, assembleClasspath, GroomServer.BSPPeerChild.class), createWorkDirectory);
        bspChildRunner.start();
        try {
            try {
                try {
                    bspChildRunner.join();
                    killBsp();
                } catch (InterruptedException e) {
                    LOG.error("BSPPeer child process is interrupted.", e);
                    killBsp();
                }
            } catch (ExecutionException e2) {
                LOG.error("Failure occurs when retrieving tasks result.", e2);
                killBsp();
            }
            LOG.debug("Finishes executing BSPPeer child process.");
        } catch (Throwable th) {
            killBsp();
            throw th;
        }
    }

    private File createLogDirectory() {
        File file = new File(System.getProperty("hama.log.dir") + File.separator + "tasklogs" + File.separator + this.task.jobId.toString());
        if (!file.exists()) {
            file.mkdirs();
        }
        return file;
    }

    public void killBsp() {
        this.bspKilled = true;
        if (this.errorLog != null || this.infoLog != null) {
            this.errorLog = null;
            this.infoLog = null;
        }
        if (this.bspProcess != null) {
            this.bspProcess.destroy();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void logStream(InputStream inputStream, LogType logType) {
        if (logType == LogType.CONSOLE) {
            try {
                IOUtils.copyBytes(inputStream, System.out, this.bspJob.getConfiguration());
                return;
            } catch (IOException e) {
                return;
            }
        }
        BufferedWriter bufferedWriter = null;
        try {
            try {
                bufferedWriter = new BufferedWriter(new FileWriter(new File(this.logDir, this.task.getTaskAttemptId() + getFileEndingForType(logType))));
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
                while (true) {
                    String readLine = bufferedReader.readLine();
                    if (readLine == null) {
                        break;
                    }
                    bufferedWriter.write(readLine);
                    bufferedWriter.newLine();
                }
                try {
                    inputStream.close();
                } catch (IOException e2) {
                    LOG.warn(this.task.getTaskID() + " Error closing child output", e2);
                }
                try {
                    bufferedWriter.close();
                } catch (IOException e3) {
                    LOG.warn(this.task.getTaskID() + " Error closing log file", e3);
                }
            } catch (Throwable th) {
                try {
                    inputStream.close();
                } catch (IOException e4) {
                    LOG.warn(this.task.getTaskID() + " Error closing child output", e4);
                }
                try {
                    bufferedWriter.close();
                } catch (IOException e5) {
                    LOG.warn(this.task.getTaskID() + " Error closing log file", e5);
                }
                throw th;
            }
        } catch (IOException e6) {
            if (!this.bspKilled) {
                LOG.warn(this.task.getTaskID() + " Error reading child output", e6);
            }
            try {
                inputStream.close();
            } catch (IOException e7) {
                LOG.warn(this.task.getTaskID() + " Error closing child output", e7);
            }
            try {
                bufferedWriter.close();
            } catch (IOException e8) {
                LOG.warn(this.task.getTaskID() + " Error closing log file", e8);
            }
        }
    }

    private static String getFileEndingForType(LogType logType) {
        return logType != LogType.ERROR ? ".err" : ".log";
    }
}
