package org.apache.hama.pipes;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.StringUtils;
import org.apache.hama.Constants;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.bsp.TaskLog;
import org.apache.hama.pipes.protocol.BinaryProtocol;
import org.apache.hama.pipes.protocol.DownwardProtocol;
import org.apache.hama.pipes.protocol.StreamingProtocol;

/* loaded from: input_file:org/apache/hama/pipes/PipesApplication.class */
public class PipesApplication<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable, M extends Writable> {
    private ServerSocket serverSocket;
    private Process process;
    private Socket clientSocket;
    private DownwardProtocol<K1, V1, K2, V2> downlink;
    private boolean streamingEnabled = false;
    private static final Log LOG = LogFactory.getLog(PipesApplication.class.getName());
    static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");

    private Map<String, String> setupEnvironment(Configuration configuration) throws IOException {
        HashMap hashMap = new HashMap();
        this.streamingEnabled = configuration.getBoolean("hama.streaming.enabled", false);
        if (!this.streamingEnabled) {
            this.serverSocket = new ServerSocket(0);
            hashMap.put("hama.pipes.command.port", Integer.toString(this.serverSocket.getLocalPort()));
        }
        hashMap.put("TMPDIR", System.getProperty("java.io.tmpdir"));
        hashMap.put("hama.pipes.logging", configuration.getBoolean("hama.pipes.logging", false) ? "1" : "0");
        LOG.debug("DEBUG hama.pipes.logging: " + configuration.getBoolean("hama.pipes.logging", false));
        return hashMap;
    }

    private List<String> setupCommand(Configuration configuration) throws IOException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        String str = configuration.get("hama.pipes.executable.interpretor");
        if (str != null) {
            arrayList.add(str);
        }
        try {
            if (DistributedCache.getLocalCacheFiles(configuration) == null) {
                LOG.debug("DEBUG: DistributedCache.getLocalCacheFiles(conf) returns null.");
                throw new IOException("Executable is missing!");
            }
            LOG.debug("DEBUG LocalCacheFilesCount: " + DistributedCache.getLocalCacheFiles(configuration).length);
            for (Path path : DistributedCache.getLocalCacheFiles(configuration)) {
                LOG.debug("DEBUG LocalCacheFiles: " + path);
            }
            String path2 = DistributedCache.getLocalCacheFiles(configuration)[0].toString();
            LOG.debug("DEBUG: executable: " + path2);
            if (!new File(path2).canExecute()) {
                FileUtil.chmod(path2, "u+x");
            }
            arrayList.add(path2);
            String str2 = configuration.get("hama.pipes.executable.args");
            boolean z = configuration.getBoolean("hama.pipes.resolve.executable.args", false);
            if (str2 != null && !str2.isEmpty()) {
                for (String str3 : str2.split(" ")) {
                    if (z) {
                        Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(configuration);
                        int length = localCacheFiles.length;
                        int i = 0;
                        while (true) {
                            if (i < length) {
                                Path path3 = localCacheFiles[i];
                                if (path3.getName().equals(str3)) {
                                    LOG.info("Resolved argument \"" + str3 + "\" with fully qualified path \"" + path3.toString() + "\"!");
                                    arrayList.add(path3.toString());
                                    break;
                                }
                                i++;
                            }
                        }
                    } else {
                        arrayList.add(str3);
                    }
                }
            }
            return arrayList;
        } catch (Exception e) {
            LOG.error("Executable: " + ((String) null) + " fs.default.name: " + configuration.get("fs.default.name"));
            throw new IOException("Executable is missing!");
        }
    }

    private void checkParentFile(File file) {
        if (file.getParentFile().exists()) {
            return;
        }
        file.getParentFile().mkdirs();
        LOG.debug("File: " + file.getParentFile().getAbsolutePath() + " created!");
    }

    public void start(Configuration configuration) throws IOException, InterruptedException {
        Map<String, String> map = setupEnvironment(configuration);
        List<String> list = setupCommand(configuration);
        File localTaskLogFile = TaskLog.getLocalTaskLogFile(TaskLog.LogName.STDOUT, "yyyyMMdd'_partitioner_'HHmmss");
        File localTaskLogFile2 = TaskLog.getLocalTaskLogFile(TaskLog.LogName.STDERR, "yyyyMMdd'_partitioner_'HHmmss");
        long taskLogLength = TaskLog.getTaskLogLength(configuration);
        List<String> captureOutAndError = !this.streamingEnabled ? TaskLog.captureOutAndError(null, list, localTaskLogFile, localTaskLogFile2, taskLogLength) : TaskLog.captureOutAndErrorTee(null, list, localTaskLogFile, localTaskLogFile2, taskLogLength);
        checkParentFile(localTaskLogFile);
        LOG.debug("STDOUT: " + localTaskLogFile.getAbsolutePath());
        checkParentFile(localTaskLogFile2);
        LOG.debug("STDERR: " + localTaskLogFile2.getAbsolutePath());
        LOG.debug("DEBUG: cmd: " + captureOutAndError);
        this.process = runClient(captureOutAndError, map);
        try {
            if (!this.streamingEnabled) {
                LOG.debug("DEBUG: waiting for Client at " + this.serverSocket.getLocalSocketAddress());
                this.serverSocket.setSoTimeout(Constants.DEFAULT_ZOOKEEPER_PAUSE);
                this.clientSocket = this.serverSocket.accept();
                LOG.debug("DEBUG: Client connected! - start BinaryProtocol!");
                this.downlink = new BinaryProtocol(configuration, this.clientSocket.getOutputStream(), this.clientSocket.getInputStream());
                this.downlink.start();
            }
        } catch (SocketException e) {
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(localTaskLogFile2)));
            while (true) {
                String readLine = bufferedReader.readLine();
                if (readLine == null) {
                    break;
                } else {
                    LOG.error("PipesApp Error: " + readLine);
                }
            }
            bufferedReader.close();
            throw new SocketException("Timout: Client pipes application was not connecting!");
        }
    }

    public void start(BSPPeer<K1, V1, K2, V2, BytesWritable> bSPPeer) throws IOException, InterruptedException {
        Map<String, String> map = setupEnvironment(bSPPeer.getConfiguration());
        List<String> list = setupCommand(bSPPeer.getConfiguration());
        TaskAttemptID taskId = bSPPeer.getTaskId();
        File taskLogFile = TaskLog.getTaskLogFile(taskId, TaskLog.LogName.STDOUT);
        File taskLogFile2 = TaskLog.getTaskLogFile(taskId, TaskLog.LogName.STDERR);
        long taskLogLength = TaskLog.getTaskLogLength(bSPPeer.getConfiguration());
        List<String> captureOutAndError = !this.streamingEnabled ? TaskLog.captureOutAndError(null, list, taskLogFile, taskLogFile2, taskLogLength) : TaskLog.captureOutAndErrorTee(null, list, taskLogFile, taskLogFile2, taskLogLength);
        checkParentFile(taskLogFile);
        LOG.debug("STDOUT: " + taskLogFile.getAbsolutePath());
        checkParentFile(taskLogFile2);
        LOG.debug("STDERR: " + taskLogFile2.getAbsolutePath());
        LOG.debug("DEBUG: cmd: " + captureOutAndError);
        this.process = runClient(captureOutAndError, map);
        try {
            if (this.streamingEnabled) {
                this.downlink = new StreamingProtocol(bSPPeer, this.process.getOutputStream(), this.process.getInputStream());
            } else {
                LOG.debug("DEBUG: waiting for Client at " + this.serverSocket.getLocalSocketAddress());
                this.serverSocket.setSoTimeout(Constants.DEFAULT_ZOOKEEPER_PAUSE);
                this.clientSocket = this.serverSocket.accept();
                LOG.debug("DEBUG: Client connected! - start BinaryProtocol!");
                this.downlink = new BinaryProtocol(bSPPeer, this.clientSocket.getOutputStream(), this.clientSocket.getInputStream());
            }
            this.downlink.start();
        } catch (SocketException e) {
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(taskLogFile2)));
            while (true) {
                String readLine = bufferedReader.readLine();
                if (readLine == null) {
                    break;
                } else {
                    LOG.error("PipesApp Error: " + readLine);
                }
            }
            bufferedReader.close();
            throw new SocketException("Timout: Client pipes application was not connecting!");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DownwardProtocol<K1, V1, K2, V2> getDownlink() {
        return this.downlink;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean waitForFinish() throws InterruptedException, IOException {
        this.downlink.flush();
        return this.downlink.waitForFinish();
    }

    void abort(Throwable th) throws IOException {
        LOG.info("Aborting because of " + StringUtils.stringifyException(th));
        try {
            this.downlink.abort();
            this.downlink.flush();
        } catch (IOException e) {
        }
        try {
            this.downlink.waitForFinish();
        } catch (Throwable th2) {
            this.process.destroy();
        }
        IOException iOException = new IOException("pipe child exception");
        iOException.initCause(th);
        throw iOException;
    }

    public void cleanup() throws IOException {
        if (this.serverSocket != null) {
            this.serverSocket.close();
        }
        try {
            if (this.downlink != null) {
                this.downlink.close();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    static Process runClient(List<String> list, Map<String, String> map) throws IOException {
        ProcessBuilder processBuilder = new ProcessBuilder(list);
        if (map != null) {
            processBuilder.environment().putAll(map);
        }
        return processBuilder.start();
    }
}
