package org.apache.spark.api.python;

import java.io.DataInputStream;
import java.io.File;
import java.io.OutputStreamWriter;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.List;
import org.apache.spark.Logging;
import org.apache.spark.SparkException;
import org.slf4j.Logger;
import scala.Array$;
import scala.Function0;
import scala.Predef$;
import scala.collection.JavaConversions$;
import scala.collection.Seq$;
import scala.collection.immutable.Map;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.TraitSetter;

/* compiled from: PythonWorkerFactory.scala */
@ScalaSignature(bytes = "\u0006\u0001I4Q!\u0001\u0002\u0001\r1\u00111\u0003U=uQ>twk\u001c:lKJ4\u0015m\u0019;pefT!a\u0001\u0003\u0002\rALH\u000f[8o\u0015\t)a!A\u0002ba&T!a\u0002\u0005\u0002\u000bM\u0004\u0018M]6\u000b\u0005%Q\u0011AB1qC\u000eDWMC\u0001\f\u0003\ry'oZ\n\u0004\u00015\u0019\u0002C\u0001\b\u0012\u001b\u0005y!\"\u0001\t\u0002\u000bM\u001c\u0017\r\\1\n\u0005Iy!AB!osJ+g\r\u0005\u0002\u0015+5\ta!\u0003\u0002\u0017\r\t9Aj\\4hS:<\u0007\u0002\u0003\r\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u000e\u0002\u0015ALH\u000f[8o\u000bb,7m\u0001\u0001\u0011\u0005mqbB\u0001\b\u001d\u0013\tir\"\u0001\u0004Qe\u0016$WMZ\u0005\u0003?\u0001\u0012aa\u0015;sS:<'BA\u000f\u0010\u0011!\u0011\u0003A!A!\u0002\u0013\u0019\u0013aB3omZ\u000b'o\u001d\t\u00057\u0011R\"$\u0003\u0002&A\t\u0019Q*\u00199\t\u000b\u001d\u0002A\u0011\u0001\u0015\u0002\rqJg.\u001b;?)\rI3\u0006\f\t\u0003U\u0001i\u0011A\u0001\u0005\u00061\u0019\u0002\rA\u0007\u0005\u0006E\u0019\u0002\ra\t\u0005\b]\u0001\u0011\r\u0011\"\u00010\u0003%)8/\u001a#bK6|g.F\u00011!\tq\u0011'\u0003\u00023\u001f\t9!i\\8mK\u0006t\u0007B\u0002\u001b\u0001A\u0003%\u0001'\u0001\u0006vg\u0016$\u0015-Z7p]\u0002BqA\u000e\u0001A\u0002\u0013\u0005q'\u0001\u0004eC\u0016lwN\\\u000b\u0002qA\u0011\u0011HP\u0007\u0002u)\u00111\bP\u0001\u0005Y\u0006twMC\u0001>\u0003\u0011Q\u0017M^1\n\u0005}R$a\u0002)s_\u000e,7o\u001d\u0005\b\u0003\u0002\u0001\r\u0011\"\u0001C\u0003)!\u0017-Z7p]~#S-\u001d\u000b\u0003\u0007\u001a\u0003\"A\u0004#\n\u0005\u0015{!\u0001B+oSRDqa\u0012!\u0002\u0002\u0003\u0007\u0001(A\u0002yIEBa!\u0013\u0001!B\u0013A\u0014a\u00023bK6|g\u000e\t\u0005\b\u0017\u0002\u0011\r\u0011\"\u0001M\u0003)!\u0017-Z7p]\"{7\u000f^\u000b\u0002\u001bB\u0011a*U\u0007\u0002\u001f*\u0011\u0001\u000bP\u0001\u0004]\u0016$\u0018B\u0001*P\u0005-Ie.\u001a;BI\u0012\u0014Xm]:\t\rQ\u0003\u0001\u0015!\u0003N\u0003-!\u0017-Z7p]\"{7\u000f\u001e\u0011\t\u000fY\u0003\u0001\u0019!C\u0001/\u0006QA-Y3n_:\u0004vN\u001d;\u0016\u0003a\u0003\"AD-\n\u0005i{!aA%oi\"9A\f\u0001a\u0001\n\u0003i\u0016A\u00043bK6|g\u000eU8si~#S-\u001d\u000b\u0003\u0007zCqaR.\u0002\u0002\u0003\u0007\u0001\f\u0003\u0004a\u0001\u0001\u0006K\u0001W\u0001\fI\u0006,Wn\u001c8Q_J$\b\u0005C\u0003c\u0001\u0011\u00051-\u0001\u0004de\u0016\fG/\u001a\u000b\u0002IB\u0011a*Z\u0005\u0003M>\u0013aaU8dW\u0016$\b\"\u00025\u0001\t\u0013\u0019\u0017aE2sK\u0006$X\r\u00165s_V<\u0007\u000eR1f[>t\u0007\"\u00026\u0001\t\u0013\u0019\u0017AE2sK\u0006$XmU5na2,wk\u001c:lKJDQ\u0001\u001c\u0001\u0005\u00025\fAa\u001d;paR\t1\tC\u0003p\u0001\u0011%Q.A\u0006ti\u0006\u0014H\u000fR1f[>t\u0007\"B9\u0001\t\u0013i\u0017AC:u_B$\u0015-Z7p]\u0002")
/* loaded from: input_file:org/apache/spark/api/python/PythonWorkerFactory.class */
public class PythonWorkerFactory implements Logging {
    public final String org$apache$spark$api$python$PythonWorkerFactory$$pythonExec;
    private final Map<String, String> envVars;
    private final boolean useDaemon;
    private Process daemon;
    private final InetAddress daemonHost;
    private int daemonPort;
    private transient Logger org$apache$spark$Logging$$log_;

    @Override // org.apache.spark.Logging
    public Logger org$apache$spark$Logging$$log_() {
        return this.org$apache$spark$Logging$$log_;
    }

    @Override // org.apache.spark.Logging
    @TraitSetter
    public void org$apache$spark$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$Logging$$log_ = logger;
    }

    @Override // org.apache.spark.Logging
    public Logger log() {
        return Logging.Cclass.log(this);
    }

    @Override // org.apache.spark.Logging
    public void logInfo(Function0<String> function0) {
        Logging.Cclass.logInfo(this, function0);
    }

    @Override // org.apache.spark.Logging
    public void logDebug(Function0<String> function0) {
        Logging.Cclass.logDebug(this, function0);
    }

    @Override // org.apache.spark.Logging
    public void logTrace(Function0<String> function0) {
        Logging.Cclass.logTrace(this, function0);
    }

    @Override // org.apache.spark.Logging
    public void logWarning(Function0<String> function0) {
        Logging.Cclass.logWarning(this, function0);
    }

    @Override // org.apache.spark.Logging
    public void logError(Function0<String> function0) {
        Logging.Cclass.logError(this, function0);
    }

    @Override // org.apache.spark.Logging
    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.Cclass.logInfo(this, function0, th);
    }

    @Override // org.apache.spark.Logging
    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.Cclass.logDebug(this, function0, th);
    }

    @Override // org.apache.spark.Logging
    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.Cclass.logTrace(this, function0, th);
    }

    @Override // org.apache.spark.Logging
    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.Cclass.logWarning(this, function0, th);
    }

    @Override // org.apache.spark.Logging
    public void logError(Function0<String> function0, Throwable th) {
        Logging.Cclass.logError(this, function0, th);
    }

    @Override // org.apache.spark.Logging
    public boolean isTraceEnabled() {
        return Logging.Cclass.isTraceEnabled(this);
    }

    public boolean useDaemon() {
        return this.useDaemon;
    }

    public Process daemon() {
        return this.daemon;
    }

    public void daemon_$eq(Process process) {
        this.daemon = process;
    }

    public InetAddress daemonHost() {
        return this.daemonHost;
    }

    public int daemonPort() {
        return this.daemonPort;
    }

    public void daemonPort_$eq(int i) {
        this.daemonPort = i;
    }

    public Socket create() {
        return useDaemon() ? createThroughDaemon() : createSimpleWorker();
    }

    private synchronized Socket createThroughDaemon() {
        startDaemon();
        try {
            return new Socket(daemonHost(), daemonPort());
        } catch (SocketException e) {
            logWarning(new PythonWorkerFactory$$anonfun$createThroughDaemon$1(this));
            stopDaemon();
            startDaemon();
            return new Socket(daemonHost(), daemonPort());
        }
    }

    private Socket createSimpleWorker() {
        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress((byte[]) Array$.MODULE$.apply(Predef$.MODULE$.wrapByteArray(new byte[]{Byte.MAX_VALUE, 0, 0, 1}), ClassTag$.MODULE$.Byte())));
            String str = new ProcessBuilder(new String[0]).environment().get("SPARK_HOME");
            ProcessBuilder processBuilder = new ProcessBuilder((List<String>) JavaConversions$.MODULE$.seqAsJavaList(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{this.org$apache$spark$api$python$PythonWorkerFactory$$pythonExec, new StringBuilder().append(str).append("/python/pyspark/worker.py").toString()}))));
            java.util.Map<String, String> environment = processBuilder.environment();
            environment.putAll(JavaConversions$.MODULE$.mapAsJavaMap(this.envVars));
            environment.put("PYTHONPATH", new StringBuilder().append(str).append("/python/").append(File.pathSeparator).append(environment.get("PYTHONPATH")).toString());
            Process start = processBuilder.start();
            new PythonWorkerFactory$$anon$1(this, start).start();
            new PythonWorkerFactory$$anon$2(this, start).start();
            OutputStreamWriter outputStreamWriter = new OutputStreamWriter(start.getOutputStream());
            outputStreamWriter.write(new StringBuilder().append(serverSocket.getLocalPort()).append("\n").toString());
            outputStreamWriter.flush();
            serverSocket.setSoTimeout(10000);
            try {
                Socket accept = serverSocket.accept();
                if (serverSocket != null) {
                    serverSocket.close();
                }
                return accept;
            } catch (Exception e) {
                throw new SparkException("Python worker did not connect back in time", e);
            }
        } catch (Throwable th) {
            if (serverSocket != null) {
                serverSocket.close();
            }
            throw th;
        }
    }

    public void stop() {
        stopDaemon();
    }

    private synchronized void startDaemon() {
        if (daemon() == null) {
            try {
                String str = new ProcessBuilder(new String[0]).environment().get("SPARK_HOME");
                ProcessBuilder processBuilder = new ProcessBuilder((List<String>) JavaConversions$.MODULE$.seqAsJavaList(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{this.org$apache$spark$api$python$PythonWorkerFactory$$pythonExec, new StringBuilder().append(str).append("/python/pyspark/daemon.py").toString()}))));
                java.util.Map<String, String> environment = processBuilder.environment();
                environment.putAll(JavaConversions$.MODULE$.mapAsJavaMap(this.envVars));
                environment.put("PYTHONPATH", new StringBuilder().append(str).append("/python/").append(File.pathSeparator).append(environment.get("PYTHONPATH")).toString());
                daemon_$eq(processBuilder.start());
                new PythonWorkerFactory$$anon$3(this).start();
                DataInputStream dataInputStream = new DataInputStream(daemon().getInputStream());
                daemonPort_$eq(dataInputStream.readInt());
                new PythonWorkerFactory$$anon$4(this, dataInputStream).start();
            } catch (Throwable th) {
                stopDaemon();
                throw th;
            }
        }
    }

    private synchronized void stopDaemon() {
        if (daemon() != null) {
            daemon().destroy();
        }
        daemon_$eq(null);
        daemonPort_$eq(0);
    }

    public PythonWorkerFactory(String str, Map<String, String> map) {
        this.org$apache$spark$api$python$PythonWorkerFactory$$pythonExec = str;
        this.envVars = map;
        org$apache$spark$Logging$$log__$eq(null);
        this.useDaemon = !System.getProperty("os.name").startsWith("Windows");
        this.daemon = null;
        this.daemonHost = InetAddress.getByAddress((byte[]) Array$.MODULE$.apply(Predef$.MODULE$.wrapByteArray(new byte[]{Byte.MAX_VALUE, 0, 0, 1}), ClassTag$.MODULE$.Byte()));
        this.daemonPort = 0;
    }
}
