package org.apache.spark.api.python;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.Socket;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv$;
import org.apache.spark.internal.Logging;
import org.apache.spark.internal.config.ConfigEntry;
import org.apache.spark.internal.config.Python$;
import org.apache.spark.internal.config.package$;
import org.slf4j.Logger;
import scala.$less$colon$less$;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.ArrayOps$;
import scala.collection.IterableOnceOps;
import scala.collection.JavaConverters$;
import scala.collection.immutable.Seq;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: StreamingPythonRunner.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005]sA\u0002\u0010 \u0011\u0003\u0019\u0013F\u0002\u0004,?!\u00051\u0005\f\u0005\u0006g\u0005!\t!\u000e\u0005\u0006m\u0005!\ta\u000e\u0004\u0006W}\u00011%\u000f\u0005\t\u0001\u0012\u0011\t\u0011)A\u0005\u0003\"AA\t\u0002B\u0001B\u0003%Q\t\u0003\u0005Q\t\t\u0005\t\u0015!\u0003F\u0011!\tFA!A!\u0002\u0013)\u0005\"B\u001a\u0005\t\u0003\u0011\u0006bB,\u0005\u0005\u0004%I\u0001\u0017\u0005\u0007;\u0012\u0001\u000b\u0011B-\t\u000fy#!\u0019!C\t?\"11\r\u0002Q\u0001\n\u0001Dq\u0001\u001a\u0003C\u0002\u0013EQ\r\u0003\u0004j\t\u0001\u0006IA\u001a\u0005\bU\u0012\u0011\r\u0011\"\u0003l\u0011\u0019!H\u0001)A\u0005Y\"9Q\u000f\u0002b\u0001\n\u00131\bBB<\u0005A\u0003%Q\tC\u0004y\t\u0001\u0007I\u0011B=\t\u0013\u0005\u001dA\u00011A\u0005\n\u0005%\u0001bBA\u000b\t\u0001\u0006KA\u001f\u0005\n\u0003/!\u0001\u0019!C\u0005\u00033A\u0011\"a\t\u0005\u0001\u0004%I!!\n\t\u0011\u0005%B\u0001)Q\u0005\u00037A\u0001\"a\u000b\u0005\u0005\u0004%\tB\u001e\u0005\b\u0003[!\u0001\u0015!\u0003F\u0011\u001d\ty\u0003\u0002C\u0001\u0003cAq!a\u0013\u0005\t\u0003\ti%A\u000bTiJ,\u0017-\\5oOBKH\u000f[8o%Vtg.\u001a:\u000b\u0005\u0001\n\u0013A\u00029zi\"|gN\u0003\u0002#G\u0005\u0019\u0011\r]5\u000b\u0005\u0011*\u0013!B:qCJ\\'B\u0001\u0014(\u0003\u0019\t\u0007/Y2iK*\t\u0001&A\u0002pe\u001e\u0004\"AK\u0001\u000e\u0003}\u0011Qc\u0015;sK\u0006l\u0017N\\4QsRDwN\u001c*v]:,'o\u0005\u0002\u0002[A\u0011a&M\u0007\u0002_)\t\u0001'A\u0003tG\u0006d\u0017-\u0003\u00023_\t1\u0011I\\=SK\u001a\fa\u0001P5oSRt4\u0001\u0001\u000b\u0002S\u0005)\u0011\r\u001d9msRI\u0001(a\u0014\u0002R\u0005M\u0013Q\u000b\t\u0003U\u0011\u00192\u0001B\u0017;!\tYd(D\u0001=\u0015\ti4%\u0001\u0005j]R,'O\\1m\u0013\tyDHA\u0004M_\u001e<\u0017N\\4\u0002\t\u0019,hn\u0019\t\u0003U\tK!aQ\u0010\u0003\u001dAKH\u000f[8o\rVt7\r^5p]\u0006Q1m\u001c8oK\u000e$XK\u001d7\u0011\u0005\u0019keBA$L!\tAu&D\u0001J\u0015\tQE'\u0001\u0004=e>|GOP\u0005\u0003\u0019>\na\u0001\u0015:fI\u00164\u0017B\u0001(P\u0005\u0019\u0019FO]5oO*\u0011AjL\u0001\ng\u0016\u001c8/[8o\u0013\u0012\fAb^8sW\u0016\u0014Xj\u001c3vY\u0016$R\u0001O*U+ZCQ\u0001Q\u0005A\u0002\u0005CQ\u0001R\u0005A\u0002\u0015CQ\u0001U\u0005A\u0002\u0015CQ!U\u0005A\u0002\u0015\u000bAaY8oMV\t\u0011\f\u0005\u0002[76\t1%\u0003\u0002]G\tI1\u000b]1sW\u000e{gNZ\u0001\u0006G>tg\rI\u0001\u000bEV4g-\u001a:TSj,W#\u00011\u0011\u00059\n\u0017B\u000120\u0005\rIe\u000e^\u0001\fEV4g-\u001a:TSj,\u0007%A\tbkRD7k\\2lKR$\u0016.\\3pkR,\u0012A\u001a\t\u0003]\u001dL!\u0001[\u0018\u0003\t1{gnZ\u0001\u0013CV$\bnU8dW\u0016$H+[7f_V$\b%A\u0004f]Z4\u0016M]:\u0016\u00031\u0004B!\u001c:F\u000b6\taN\u0003\u0002pa\u0006!Q\u000f^5m\u0015\u0005\t\u0018\u0001\u00026bm\u0006L!a\u001d8\u0003\u00075\u000b\u0007/\u0001\u0005f]Z4\u0016M]:!\u0003)\u0001\u0018\u0010\u001e5p]\u0016CXmY\u000b\u0002\u000b\u0006Y\u0001/\u001f;i_:,\u00050Z2!\u00031\u0001\u0018\u0010\u001e5p]^{'o[3s+\u0005Q\bc\u0001\u0018|{&\u0011Ap\f\u0002\u0007\u001fB$\u0018n\u001c8\u0011\u0007y\f\u0019!D\u0001��\u0015\r\t\t\u0001]\u0001\u0004]\u0016$\u0018bAA\u0003\u007f\n11k\\2lKR\f\u0001\u0003]=uQ>twk\u001c:lKJ|F%Z9\u0015\t\u0005-\u0011\u0011\u0003\t\u0004]\u00055\u0011bAA\b_\t!QK\\5u\u0011!\t\u0019\"FA\u0001\u0002\u0004Q\u0018a\u0001=%c\u0005i\u0001/\u001f;i_:<vN]6fe\u0002\n1\u0003]=uQ>twk\u001c:lKJ4\u0015m\u0019;pef,\"!a\u0007\u0011\t9Z\u0018Q\u0004\t\u0004U\u0005}\u0011bAA\u0011?\t\u0019\u0002+\u001f;i_:<vN]6fe\u001a\u000b7\r^8ss\u00069\u0002/\u001f;i_:<vN]6fe\u001a\u000b7\r^8ss~#S-\u001d\u000b\u0005\u0003\u0017\t9\u0003C\u0005\u0002\u0014a\t\t\u00111\u0001\u0002\u001c\u0005!\u0002/\u001f;i_:<vN]6fe\u001a\u000b7\r^8ss\u0002\n\u0011\u0002]=uQ>tg+\u001a:\u0002\u0015ALH\u000f[8o-\u0016\u0014\b%\u0001\u0003j]&$HCAA\u001a!\u001dq\u0013QGA\u001d\u0003\u000bJ1!a\u000e0\u0005\u0019!V\u000f\u001d7feA!\u00111HA!\u001b\t\tiDC\u0002\u0002@A\f!![8\n\t\u0005\r\u0013Q\b\u0002\u0011\t\u0006$\u0018mT;uaV$8\u000b\u001e:fC6\u0004B!a\u000f\u0002H%!\u0011\u0011JA\u001f\u0005=!\u0015\r^1J]B,Ho\u0015;sK\u0006l\u0017\u0001B:u_B$\"!a\u0003\t\u000b\u0001\u001b\u0001\u0019A!\t\u000b\u0011\u001b\u0001\u0019A#\t\u000bA\u001b\u0001\u0019A#\t\u000bE\u001b\u0001\u0019A#")
/* loaded from: input_file:org/apache/spark/api/python/StreamingPythonRunner.class */
public class StreamingPythonRunner implements Logging {
    private final PythonFunction func;
    private final String connectUrl;
    private final String sessionId;
    private final String workerModule;
    private final SparkConf conf;
    private final int bufferSize;
    private final long authSocketTimeout;
    private final Map<String, String> envVars;
    private final String pythonExec;
    private Option<Socket> pythonWorker;
    private Option<PythonWorkerFactory> pythonWorkerFactory;
    private final String pythonVer;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public static StreamingPythonRunner apply(PythonFunction pythonFunction, String str, String str2, String str3) {
        return StreamingPythonRunner$.MODULE$.apply(pythonFunction, str, str2, str3);
    }

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    private SparkConf conf() {
        return this.conf;
    }

    public int bufferSize() {
        return this.bufferSize;
    }

    public long authSocketTimeout() {
        return this.authSocketTimeout;
    }

    private Map<String, String> envVars() {
        return this.envVars;
    }

    private String pythonExec() {
        return this.pythonExec;
    }

    private Option<Socket> pythonWorker() {
        return this.pythonWorker;
    }

    private void pythonWorker_$eq(Option<Socket> option) {
        this.pythonWorker = option;
    }

    private Option<PythonWorkerFactory> pythonWorkerFactory() {
        return this.pythonWorkerFactory;
    }

    private void pythonWorkerFactory_$eq(Option<PythonWorkerFactory> option) {
        this.pythonWorkerFactory = option;
    }

    public String pythonVer() {
        return this.pythonVer;
    }

    public Tuple2<DataOutputStream, DataInputStream> init() {
        Socket socket;
        logInfo(() -> {
            return new StringBuilder(52).append("Initializing Python runner (session: ").append(this.sessionId).append(", pythonExec: ").append(this.pythonExec()).append(")").toString();
        });
        envVars().put("SPARK_LOCAL_DIRS", Predef$.MODULE$.wrapRefArray((Object[]) ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.refArrayOps(SparkEnv$.MODULE$.get().blockManager().diskBlockManager().localDirs()), file -> {
            return file.getPath();
        }, ClassTag$.MODULE$.apply(String.class))).mkString(","));
        envVars().put("SPARK_AUTH_SOCKET_TIMEOUT", Long.toString(authSocketTimeout()));
        envVars().put("SPARK_BUFFER_SIZE", Integer.toString(bufferSize()));
        envVars().put("SPARK_CONNECT_LOCAL_URL", this.connectUrl);
        boolean unboxToBoolean = BoxesRunTime.unboxToBoolean(conf().get(Python$.MODULE$.PYTHON_USE_DAEMON()));
        conf().set((ConfigEntry<ConfigEntry<Object>>) Python$.MODULE$.PYTHON_USE_DAEMON(), (ConfigEntry<Object>) BoxesRunTime.boxToBoolean(false));
        try {
            PythonWorkerFactory pythonWorkerFactory = new PythonWorkerFactory(pythonExec(), ((IterableOnceOps) JavaConverters$.MODULE$.mapAsScalaMapConverter(envVars()).asScala()).toMap($less$colon$less$.MODULE$.refl()));
            Tuple2<Socket, Option<Object>> createStreamingWorker = pythonWorkerFactory.createStreamingWorker(this.workerModule);
            if (createStreamingWorker != null && (socket = (Socket) createStreamingWorker._1()) != null) {
                pythonWorker_$eq(new Some(socket));
                pythonWorkerFactory_$eq(new Some(pythonWorkerFactory));
                conf().set((ConfigEntry<ConfigEntry<Object>>) Python$.MODULE$.PYTHON_USE_DAEMON(), (ConfigEntry<Object>) BoxesRunTime.boxToBoolean(unboxToBoolean));
                DataOutputStream dataOutputStream = new DataOutputStream(new BufferedOutputStream(((Socket) pythonWorker().get()).getOutputStream(), bufferSize()));
                PythonWorkerUtils$.MODULE$.writePythonVersion(pythonVer(), dataOutputStream);
                PythonRDD$.MODULE$.writeUTF(this.sessionId, dataOutputStream);
                Seq<Object> command = this.func.command();
                dataOutputStream.writeInt(command.length());
                dataOutputStream.write((byte[]) command.toArray(ClassTag$.MODULE$.Byte()));
                dataOutputStream.flush();
                DataInputStream dataInputStream = new DataInputStream(new BufferedInputStream(((Socket) pythonWorker().get()).getInputStream(), bufferSize()));
                int readInt = dataInputStream.readInt();
                logInfo(() -> {
                    return new StringBuilder(44).append("Runner initialization succeeded (returned ").append(readInt).append(").").toString();
                });
                return new Tuple2<>(dataOutputStream, dataInputStream);
            }
            throw new MatchError(createStreamingWorker);
        } catch (Throwable th) {
            conf().set((ConfigEntry<ConfigEntry<Object>>) Python$.MODULE$.PYTHON_USE_DAEMON(), (ConfigEntry<Object>) BoxesRunTime.boxToBoolean(unboxToBoolean));
            throw th;
        }
    }

    public void stop() {
        pythonWorker().foreach(socket -> {
            $anonfun$stop$1(this, socket);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ void $anonfun$stop$2(Socket socket, PythonWorkerFactory pythonWorkerFactory) {
        pythonWorkerFactory.stopWorker(socket);
        pythonWorkerFactory.stop();
    }

    public static final /* synthetic */ void $anonfun$stop$1(StreamingPythonRunner streamingPythonRunner, Socket socket) {
        streamingPythonRunner.pythonWorkerFactory().foreach(pythonWorkerFactory -> {
            $anonfun$stop$2(socket, pythonWorkerFactory);
            return BoxedUnit.UNIT;
        });
    }

    public StreamingPythonRunner(PythonFunction pythonFunction, String str, String str2, String str3) {
        this.func = pythonFunction;
        this.connectUrl = str;
        this.sessionId = str2;
        this.workerModule = str3;
        Logging.$init$(this);
        this.conf = SparkEnv$.MODULE$.get().conf();
        this.bufferSize = BoxesRunTime.unboxToInt(conf().get(package$.MODULE$.BUFFER_SIZE()));
        this.authSocketTimeout = BoxesRunTime.unboxToLong(conf().get(Python$.MODULE$.PYTHON_AUTH_SOCKET_TIMEOUT()));
        this.envVars = pythonFunction.envVars();
        this.pythonExec = pythonFunction.pythonExec();
        this.pythonWorker = None$.MODULE$;
        this.pythonWorkerFactory = None$.MODULE$;
        this.pythonVer = pythonFunction.pythonVer();
    }
}
