package org.apache.hama.pipes;

import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.StringUtils;
import org.apache.hama.Constants;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.bsp.TaskLog;

/* loaded from: input_file:org/apache/hama/pipes/Application.class */
public class Application<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable, M extends Writable> {
    private ServerSocket serverSocket;
    private Process process;
    private Socket clientSocket;
    private DownwardProtocol<K1, V1> downlink;
    private static final Log LOG = LogFactory.getLog(Application.class.getName());
    static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");

    /* JADX INFO: Access modifiers changed from: package-private */
    public Application(BSPPeer<K1, V1, K2, V2, BytesWritable> bSPPeer) throws IOException, InterruptedException {
        HashMap hashMap = new HashMap();
        boolean z = bSPPeer.getConfiguration().getBoolean("hama.streaming.enabled", false);
        if (!z) {
            this.serverSocket = new ServerSocket(0);
            hashMap.put("hama.pipes.command.port", Integer.toString(this.serverSocket.getLocalPort()));
        }
        hashMap.put("TMPDIR", System.getProperty("java.io.tmpdir"));
        hashMap.put("hama.pipes.logging", bSPPeer.getConfiguration().getBoolean("hama.pipes.logging", false) ? "1" : "0");
        LOG.debug("DEBUG hama.pipes.logging: " + bSPPeer.getConfiguration().getBoolean("hama.pipes.logging", false));
        ArrayList arrayList = new ArrayList();
        String str = bSPPeer.getConfiguration().get("hama.pipes.executable.interpretor");
        if (str != null) {
            arrayList.add(str);
        }
        String str2 = null;
        try {
            LOG.debug("DEBUG LocalCacheFilesCount: " + DistributedCache.getLocalCacheFiles(bSPPeer.getConfiguration()).length);
            for (Path path : DistributedCache.getLocalCacheFiles(bSPPeer.getConfiguration())) {
                LOG.debug("DEBUG LocalCacheFiles: " + path);
            }
            str2 = DistributedCache.getLocalCacheFiles(bSPPeer.getConfiguration())[0].toString();
            LOG.info("executable: " + str2);
            if (!new File(str2).canExecute()) {
                FileUtil.chmod(str2, "u+x");
            }
            arrayList.add(str2);
            String str3 = bSPPeer.getConfiguration().get("hama.pipes.executable.args");
            boolean z2 = bSPPeer.getConfiguration().getBoolean("hama.pipes.resolve.executable.args", false);
            if (str3 != null && !str3.isEmpty()) {
                for (String str4 : str3.split(" ")) {
                    if (z2) {
                        Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(bSPPeer.getConfiguration());
                        int length = localCacheFiles.length;
                        int i = 0;
                        while (true) {
                            if (i < length) {
                                Path path2 = localCacheFiles[i];
                                if (path2.getName().equals(str4)) {
                                    LOG.info("Resolved argument \"" + str4 + "\" with fully qualified path \"" + path2.toString() + "\"!");
                                    arrayList.add(path2.toString());
                                    break;
                                }
                                i++;
                            }
                        }
                    } else {
                        arrayList.add(str4);
                    }
                }
            }
            TaskAttemptID taskId = bSPPeer.getTaskId();
            File taskLogFile = TaskLog.getTaskLogFile(taskId, TaskLog.LogName.STDOUT);
            File taskLogFile2 = TaskLog.getTaskLogFile(taskId, TaskLog.LogName.STDERR);
            long taskLogLength = TaskLog.getTaskLogLength(bSPPeer.getConfiguration());
            List<String> captureOutAndError = !z ? TaskLog.captureOutAndError(null, arrayList, taskLogFile, taskLogFile2, taskLogLength) : TaskLog.captureOutAndErrorTee(null, arrayList, taskLogFile, taskLogFile2, taskLogLength);
            if (!taskLogFile.getParentFile().exists()) {
                taskLogFile.getParentFile().mkdirs();
                LOG.info("STDOUT: " + taskLogFile.getParentFile().getAbsolutePath() + " created!");
            }
            LOG.info("STDOUT: " + taskLogFile.getAbsolutePath());
            if (!taskLogFile2.getParentFile().exists()) {
                taskLogFile2.getParentFile().mkdirs();
                LOG.info("STDERR: " + taskLogFile2.getParentFile().getAbsolutePath() + " created!");
            }
            LOG.info("STDERR: " + taskLogFile2.getAbsolutePath());
            LOG.info("DEBUG: cmd: " + captureOutAndError);
            this.process = runClient(captureOutAndError, hashMap);
            try {
                if (z) {
                    this.downlink = new StreamingProtocol(bSPPeer, this.process.getOutputStream(), this.process.getInputStream());
                } else {
                    LOG.info("DEBUG: waiting for Client at " + this.serverSocket.getLocalSocketAddress());
                    this.serverSocket.setSoTimeout(Constants.DEFAULT_ZOOKEEPER_PAUSE);
                    this.clientSocket = this.serverSocket.accept();
                    this.downlink = new BinaryProtocol(bSPPeer, this.clientSocket.getOutputStream(), this.clientSocket.getInputStream());
                }
                this.downlink.start();
            } catch (SocketException e) {
                throw new SocketException("Timout: Client pipes application was not connecting!");
            }
        } catch (Exception e2) {
            LOG.error("Executable: " + str2 + " fs.default.name: " + bSPPeer.getConfiguration().get("fs.default.name"));
            throw new IOException("Executable is missing!");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DownwardProtocol<K1, V1> getDownlink() {
        return this.downlink;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean waitForFinish() throws InterruptedException, IOException {
        this.downlink.flush();
        return this.downlink.waitForFinish();
    }

    void abort(Throwable th) throws IOException {
        LOG.info("Aborting because of " + StringUtils.stringifyException(th));
        try {
            this.downlink.abort();
            this.downlink.flush();
        } catch (IOException e) {
        }
        try {
            this.downlink.waitForFinish();
        } catch (Throwable th2) {
            this.process.destroy();
        }
        IOException iOException = new IOException("pipe child exception");
        iOException.initCause(th);
        throw iOException;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void cleanup() throws IOException {
        if (this.serverSocket != null) {
            this.serverSocket.close();
        }
        try {
            if (this.downlink != null) {
                this.downlink.close();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    static Process runClient(List<String> list, Map<String, String> map) throws IOException {
        ProcessBuilder processBuilder = new ProcessBuilder(list);
        if (map != null) {
            processBuilder.environment().putAll(map);
        }
        return processBuilder.start();
    }
}
