package org.apache.spark.api.python;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv$;
import org.apache.spark.SparkPythonException;
import org.apache.spark.SparkPythonException$;
import org.apache.spark.internal.LogEntry;
import org.apache.spark.internal.LogEntry$;
import org.apache.spark.internal.LogKeys$PYTHON_EXEC$;
import org.apache.spark.internal.LogKeys$PYTHON_WORKER_MODULE$;
import org.apache.spark.internal.LogKeys$PYTHON_WORKER_RESPONSE$;
import org.apache.spark.internal.LogKeys$SESSION_ID$;
import org.apache.spark.internal.Logging;
import org.apache.spark.internal.MDC;
import org.apache.spark.internal.config.Python$;
import org.apache.spark.internal.config.package$;
import org.slf4j.Logger;
import scala.$less$colon$less$;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.ArrayOps$;
import scala.collection.immutable.Nil$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;

/* compiled from: StreamingPythonRunner.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005%uA\u0002\u0013&\u0011\u0003IsF\u0002\u00042K!\u0005\u0011F\r\u0005\u0006s\u0005!\ta\u000f\u0005\u0006y\u0005!\t!\u0010\u0004\u0006c\u0015\u0002\u0011f\u0010\u0005\t\r\u0012\u0011\t\u0011)A\u0005\u000f\"A!\n\u0002B\u0001B\u0003%1\n\u0003\u0005W\t\t\u0005\t\u0015!\u0003L\u0011!9FA!A!\u0002\u0013Y\u0005\"B\u001d\u0005\t\u0003A\u0006bB/\u0005\u0005\u0004%IA\u0018\u0005\u0007G\u0012\u0001\u000b\u0011B0\t\u000f\u0011$!\u0019!C\tK\"1\u0011\u000e\u0002Q\u0001\n\u0019DqA\u001b\u0003C\u0002\u0013E1\u000e\u0003\u0004p\t\u0001\u0006I\u0001\u001c\u0005\ba\u0012\u0011\r\u0011\"\u0003r\u0011\u0019QH\u0001)A\u0005e\"91\u0010\u0002b\u0001\n\u0013a\bBB?\u0005A\u0003%1\nC\u0004\u007f\t\u0001\u0007I\u0011B@\t\u0013\u00055A\u00011A\u0005\n\u0005=\u0001\u0002CA\u000e\t\u0001\u0006K!!\u0001\t\u0013\u0005uA\u00011A\u0005\n\u0005}\u0001\"CA\u0015\t\u0001\u0007I\u0011BA\u0016\u0011!\ty\u0003\u0002Q!\n\u0005\u0005\u0002\u0002CA\u0019\t\t\u0007I\u0011\u0003?\t\u000f\u0005MB\u0001)A\u0005\u0017\"9\u0011Q\u0007\u0003\u0005\u0002\u0005]\u0002bBA)\t\u0011\u0005\u00111\u000b\u0004\u0007\u00033\"\u0001!a\u0017\t\u0013\u0005\rdD!A!\u0002\u00131\u0007\"CA3=\t\u0005\t\u0015!\u0003L\u0011\u0019Id\u0004\"\u0001\u0002h!9\u0011\u0011\u000f\u0003\u0005\u0002\u0005M\u0004bBA;\t\u0011\u0005\u0011qO\u0001\u0016'R\u0014X-Y7j]\u001e\u0004\u0016\u0010\u001e5p]J+hN\\3s\u0015\t1s%\u0001\u0004qsRDwN\u001c\u0006\u0003Q%\n1!\u00199j\u0015\tQ3&A\u0003ta\u0006\u00148N\u0003\u0002-[\u00051\u0011\r]1dQ\u0016T\u0011AL\u0001\u0004_J<\u0007C\u0001\u0019\u0002\u001b\u0005)#!F*ue\u0016\fW.\u001b8h!f$\bn\u001c8Sk:tWM]\n\u0003\u0003M\u0002\"\u0001N\u001c\u000e\u0003UR\u0011AN\u0001\u0006g\u000e\fG.Y\u0005\u0003qU\u0012a!\u00118z%\u00164\u0017A\u0002\u001fj]&$hh\u0001\u0001\u0015\u0003=\nQ!\u00199qYf$\u0012BPAA\u0003\u0007\u000b))a\"\u0011\u0005A\"1c\u0001\u00034\u0001B\u0011\u0011\tR\u0007\u0002\u0005*\u00111)K\u0001\tS:$XM\u001d8bY&\u0011QI\u0011\u0002\b\u0019><w-\u001b8h\u0003\u00111WO\\2\u0011\u0005AB\u0015BA%&\u00059\u0001\u0016\u0010\u001e5p]\u001a+hn\u0019;j_:\f!bY8o]\u0016\u001cG/\u0016:m!\ta5K\u0004\u0002N#B\u0011a*N\u0007\u0002\u001f*\u0011\u0001KO\u0001\u0007yI|w\u000e\u001e \n\u0005I+\u0014A\u0002)sK\u0012,g-\u0003\u0002U+\n11\u000b\u001e:j]\u001eT!AU\u001b\u0002\u0013M,7o]5p]&#\u0017\u0001D<pe.,'/T8ek2,G#\u0002 Z5nc\u0006\"\u0002$\n\u0001\u00049\u0005\"\u0002&\n\u0001\u0004Y\u0005\"\u0002,\n\u0001\u0004Y\u0005\"B,\n\u0001\u0004Y\u0015\u0001B2p]\u001a,\u0012a\u0018\t\u0003A\u0006l\u0011!K\u0005\u0003E&\u0012\u0011b\u00159be.\u001cuN\u001c4\u0002\u000b\r|gN\u001a\u0011\u0002\u0015\t,hMZ3s'&TX-F\u0001g!\t!t-\u0003\u0002ik\t\u0019\u0011J\u001c;\u0002\u0017\t,hMZ3s'&TX\rI\u0001\u0012CV$\bnU8dW\u0016$H+[7f_V$X#\u00017\u0011\u0005Qj\u0017B\u000186\u0005\u0011auN\\4\u0002%\u0005,H\u000f[*pG.,G\u000fV5nK>,H\u000fI\u0001\bK:4h+\u0019:t+\u0005\u0011\b\u0003B:y\u0017.k\u0011\u0001\u001e\u0006\u0003kZ\fA!\u001e;jY*\tq/\u0001\u0003kCZ\f\u0017BA=u\u0005\ri\u0015\r]\u0001\tK:4h+\u0019:tA\u0005Q\u0001/\u001f;i_:,\u00050Z2\u0016\u0003-\u000b1\u0002]=uQ>tW\t_3dA\u0005a\u0001/\u001f;i_:<vN]6feV\u0011\u0011\u0011\u0001\t\u0006i\u0005\r\u0011qA\u0005\u0004\u0003\u000b)$AB(qi&|g\u000eE\u00021\u0003\u0013I1!a\u0003&\u00051\u0001\u0016\u0010\u001e5p]^{'o[3s\u0003A\u0001\u0018\u0010\u001e5p]^{'o[3s?\u0012*\u0017\u000f\u0006\u0003\u0002\u0012\u0005]\u0001c\u0001\u001b\u0002\u0014%\u0019\u0011QC\u001b\u0003\tUs\u0017\u000e\u001e\u0005\n\u00033)\u0012\u0011!a\u0001\u0003\u0003\t1\u0001\u001f\u00132\u00035\u0001\u0018\u0010\u001e5p]^{'o[3sA\u0005\u0019\u0002/\u001f;i_:<vN]6fe\u001a\u000b7\r^8ssV\u0011\u0011\u0011\u0005\t\u0006i\u0005\r\u00111\u0005\t\u0004a\u0005\u0015\u0012bAA\u0014K\t\u0019\u0002+\u001f;i_:<vN]6fe\u001a\u000b7\r^8ss\u00069\u0002/\u001f;i_:<vN]6fe\u001a\u000b7\r^8ss~#S-\u001d\u000b\u0005\u0003#\ti\u0003C\u0005\u0002\u001aa\t\t\u00111\u0001\u0002\"\u0005!\u0002/\u001f;i_:<vN]6fe\u001a\u000b7\r^8ss\u0002\n\u0011\u0002]=uQ>tg+\u001a:\u0002\u0015ALH\u000f[8o-\u0016\u0014\b%\u0001\u0003j]&$HCAA\u001d!\u001d!\u00141HA \u0003\u0017J1!!\u00106\u0005\u0019!V\u000f\u001d7feA!\u0011\u0011IA$\u001b\t\t\u0019EC\u0002\u0002FY\f!![8\n\t\u0005%\u00131\t\u0002\u0011\t\u0006$\u0018mT;uaV$8\u000b\u001e:fC6\u0004B!!\u0011\u0002N%!\u0011qJA\"\u0005=!\u0015\r^1J]B,Ho\u0015;sK\u0006l\u0017AK:ue\u0016\fW.\u001b8h!f$\bn\u001c8Sk:tWM]%oSRL\u0017\r\\5{CRLwN\u001c$bS2,(/\u001a\u000b\u0007\u0003+\ni'a\u001c\u0011\u0007\u0005]c$D\u0001\u0005\u00051\u001aFO]3b[&tw\rU=uQ>t'+\u001e8oKJLe.\u001b;jC2L'0\u0019;j_:,\u0005pY3qi&|gnE\u0002\u001f\u0003;\u00022\u0001YA0\u0013\r\t\t'\u000b\u0002\u0015'B\f'o\u001b)zi\"|g.\u0012=dKB$\u0018n\u001c8\u0002\u001bI,7O\u0012:p[BKH\u000f[8o\u0003))'O]'fgN\fw-\u001a\u000b\u0007\u0003+\nI'a\u001b\t\r\u0005\r\u0014\u00051\u0001g\u0011\u0019\t)'\ta\u0001\u0017\"1\u00111M\u000fA\u0002\u0019Da!!\u001a\u001e\u0001\u0004Y\u0015\u0001B:u_B$\"!!\u0005\u0002\u001f%\u001cxk\u001c:lKJ\u001cFo\u001c9qK\u0012$\"!!\u001f\u0011\u000bQ\n\u0019!a\u001f\u0011\u0007Q\ni(C\u0002\u0002��U\u0012qAQ8pY\u0016\fg\u000eC\u0003G\u0007\u0001\u0007q\tC\u0003K\u0007\u0001\u00071\nC\u0003W\u0007\u0001\u00071\nC\u0003X\u0007\u0001\u00071\n")
/* loaded from: input_file:org/apache/spark/api/python/StreamingPythonRunner.class */
public class StreamingPythonRunner implements Logging {
    private final PythonFunction func;
    private final String connectUrl;
    private final String sessionId;
    private final String workerModule;
    private final SparkConf conf;
    private final int bufferSize;
    private final long authSocketTimeout;
    private final Map<String, String> envVars;
    private final String pythonExec;
    private Option<PythonWorker> pythonWorker;
    private Option<PythonWorkerFactory> pythonWorkerFactory;
    private final String pythonVer;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    /* compiled from: StreamingPythonRunner.scala */
    /* loaded from: input_file:org/apache/spark/api/python/StreamingPythonRunner$StreamingPythonRunnerInitializationException.class */
    public class StreamingPythonRunnerInitializationException extends SparkPythonException {
        public final /* synthetic */ StreamingPythonRunner $outer;

        public /* synthetic */ StreamingPythonRunner org$apache$spark$api$python$StreamingPythonRunner$StreamingPythonRunnerInitializationException$$$outer() {
            return this.$outer;
        }

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        public StreamingPythonRunnerInitializationException(StreamingPythonRunner streamingPythonRunner, int i, String str) {
            super("STREAMING_PYTHON_RUNNER_INITIALIZATION_FAILURE", (scala.collection.immutable.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("resFromPython"), Integer.toString(i)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("msg"), str)})), SparkPythonException$.MODULE$.$lessinit$greater$default$3(), SparkPythonException$.MODULE$.$lessinit$greater$default$4(), SparkPythonException$.MODULE$.$lessinit$greater$default$5());
            if (streamingPythonRunner == null) {
                throw null;
            }
            this.$outer = streamingPythonRunner;
        }
    }

    public static StreamingPythonRunner apply(PythonFunction pythonFunction, String str, String str2, String str3) {
        return StreamingPythonRunner$.MODULE$.apply(pythonFunction, str, str2, str3);
    }

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public Logging.LogStringContext LogStringContext(StringContext stringContext) {
        return Logging.LogStringContext$(this, stringContext);
    }

    public void withLogContext(HashMap<String, String> hashMap, Function0<BoxedUnit> function0) {
        Logging.withLogContext$(this, hashMap, function0);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logInfo(LogEntry logEntry) {
        Logging.logInfo$(this, logEntry);
    }

    public void logInfo(LogEntry logEntry, Throwable th) {
        Logging.logInfo$(this, logEntry, th);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logDebug(LogEntry logEntry) {
        Logging.logDebug$(this, logEntry);
    }

    public void logDebug(LogEntry logEntry, Throwable th) {
        Logging.logDebug$(this, logEntry, th);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logTrace(LogEntry logEntry) {
        Logging.logTrace$(this, logEntry);
    }

    public void logTrace(LogEntry logEntry, Throwable th) {
        Logging.logTrace$(this, logEntry, th);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logWarning(LogEntry logEntry) {
        Logging.logWarning$(this, logEntry);
    }

    public void logWarning(LogEntry logEntry, Throwable th) {
        Logging.logWarning$(this, logEntry, th);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logError(LogEntry logEntry) {
        Logging.logError$(this, logEntry);
    }

    public void logError(LogEntry logEntry, Throwable th) {
        Logging.logError$(this, logEntry, th);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    private SparkConf conf() {
        return this.conf;
    }

    public int bufferSize() {
        return this.bufferSize;
    }

    public long authSocketTimeout() {
        return this.authSocketTimeout;
    }

    private Map<String, String> envVars() {
        return this.envVars;
    }

    private String pythonExec() {
        return this.pythonExec;
    }

    private Option<PythonWorker> pythonWorker() {
        return this.pythonWorker;
    }

    private void pythonWorker_$eq(Option<PythonWorker> option) {
        this.pythonWorker = option;
    }

    private Option<PythonWorkerFactory> pythonWorkerFactory() {
        return this.pythonWorkerFactory;
    }

    private void pythonWorkerFactory_$eq(Option<PythonWorkerFactory> option) {
        this.pythonWorkerFactory = option;
    }

    public String pythonVer() {
        return this.pythonVer;
    }

    public Tuple2<DataOutputStream, DataInputStream> init() {
        PythonWorker pythonWorker;
        logInfo(LogEntry$.MODULE$.from(() -> {
            return this.LogStringContext(new StringContext(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"Initializing Python runner (session: ", ","}))).log(ScalaRunTime$.MODULE$.wrapRefArray(new MDC[]{new MDC(LogKeys$SESSION_ID$.MODULE$, this.sessionId)})).$plus(this.LogStringContext(new StringContext(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{" pythonExec: ", ")"}))).log(ScalaRunTime$.MODULE$.wrapRefArray(new MDC[]{new MDC(LogKeys$PYTHON_EXEC$.MODULE$, this.pythonExec())})));
        }));
        envVars().put("SPARK_LOCAL_DIRS", Predef$.MODULE$.wrapRefArray((Object[]) ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.refArrayOps(SparkEnv$.MODULE$.get().blockManager().diskBlockManager().localDirs()), file -> {
            return file.getPath();
        }, ClassTag$.MODULE$.apply(String.class))).mkString(","));
        envVars().put("SPARK_AUTH_SOCKET_TIMEOUT", Long.toString(authSocketTimeout()));
        envVars().put("SPARK_BUFFER_SIZE", Integer.toString(bufferSize()));
        envVars().put("SPARK_CONNECT_LOCAL_URL", this.connectUrl);
        PythonWorkerFactory pythonWorkerFactory = new PythonWorkerFactory(pythonExec(), this.workerModule, CollectionConverters$.MODULE$.MapHasAsScala(envVars()).asScala().toMap($less$colon$less$.MODULE$.refl()), false);
        Tuple2<PythonWorker, Option<Object>> createSimpleWorker = pythonWorkerFactory.createSimpleWorker(true);
        if (createSimpleWorker == null || (pythonWorker = (PythonWorker) createSimpleWorker._1()) == null) {
            throw new MatchError(createSimpleWorker);
        }
        pythonWorker_$eq(new Some(pythonWorker));
        pythonWorkerFactory_$eq(new Some(pythonWorkerFactory));
        DataOutputStream dataOutputStream = new DataOutputStream(new BufferedOutputStream(((PythonWorker) pythonWorker().get()).channel().socket().getOutputStream(), bufferSize()));
        PythonWorkerUtils$.MODULE$.writePythonVersion(pythonVer(), dataOutputStream);
        PythonRDD$.MODULE$.writeUTF(this.sessionId, dataOutputStream);
        PythonWorkerUtils$.MODULE$.writePythonFunction(this.func, dataOutputStream);
        dataOutputStream.flush();
        DataInputStream dataInputStream = new DataInputStream(new BufferedInputStream(((PythonWorker) pythonWorker().get()).channel().socket().getInputStream(), bufferSize()));
        int readInt = dataInputStream.readInt();
        if (readInt != 0) {
            throw streamingPythonRunnerInitializationFailure(readInt, PythonWorkerUtils$.MODULE$.readUTF(dataInputStream));
        }
        logInfo(LogEntry$.MODULE$.from(() -> {
            return this.LogStringContext(new StringContext(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"Runner initialization succeeded (returned"}))).log(Nil$.MODULE$).$plus(this.LogStringContext(new StringContext(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{" ", ")."}))).log(ScalaRunTime$.MODULE$.wrapRefArray(new MDC[]{new MDC(LogKeys$PYTHON_WORKER_RESPONSE$.MODULE$, BoxesRunTime.boxToInteger(readInt))})));
        }));
        return new Tuple2<>(dataOutputStream, dataInputStream);
    }

    public StreamingPythonRunnerInitializationException streamingPythonRunnerInitializationFailure(int i, String str) {
        return new StreamingPythonRunnerInitializationException(this, i, str);
    }

    public void stop() {
        logInfo(LogEntry$.MODULE$.from(() -> {
            return this.LogStringContext(new StringContext(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"Stopping streaming runner for sessionId: ", ","}))).log(ScalaRunTime$.MODULE$.wrapRefArray(new MDC[]{new MDC(LogKeys$SESSION_ID$.MODULE$, this.sessionId)})).$plus(this.LogStringContext(new StringContext(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{" module: ", "."}))).log(ScalaRunTime$.MODULE$.wrapRefArray(new MDC[]{new MDC(LogKeys$PYTHON_WORKER_MODULE$.MODULE$, this.workerModule)})));
        }));
        try {
            pythonWorkerFactory().foreach(pythonWorkerFactory -> {
                $anonfun$stop$2(this, pythonWorkerFactory);
                return BoxedUnit.UNIT;
            });
        } catch (Exception e) {
            logError(() -> {
                return "Exception when trying to kill worker";
            }, e);
        }
    }

    public Option<Object> isWorkerStopped() {
        return pythonWorkerFactory().flatMap(pythonWorkerFactory -> {
            return this.pythonWorker().map(pythonWorker -> {
                return BoxesRunTime.boxToBoolean(pythonWorkerFactory.isWorkerStopped(pythonWorker));
            });
        });
    }

    public static final /* synthetic */ void $anonfun$stop$3(PythonWorkerFactory pythonWorkerFactory, PythonWorker pythonWorker) {
        pythonWorkerFactory.stopWorker(pythonWorker);
        pythonWorkerFactory.stop();
    }

    public static final /* synthetic */ void $anonfun$stop$2(StreamingPythonRunner streamingPythonRunner, PythonWorkerFactory pythonWorkerFactory) {
        streamingPythonRunner.pythonWorker().foreach(pythonWorker -> {
            $anonfun$stop$3(pythonWorkerFactory, pythonWorker);
            return BoxedUnit.UNIT;
        });
    }

    public StreamingPythonRunner(PythonFunction pythonFunction, String str, String str2, String str3) {
        this.func = pythonFunction;
        this.connectUrl = str;
        this.sessionId = str2;
        this.workerModule = str3;
        Logging.$init$(this);
        this.conf = SparkEnv$.MODULE$.get().conf();
        this.bufferSize = BoxesRunTime.unboxToInt(conf().get(package$.MODULE$.BUFFER_SIZE()));
        this.authSocketTimeout = BoxesRunTime.unboxToLong(conf().get(Python$.MODULE$.PYTHON_AUTH_SOCKET_TIMEOUT()));
        this.envVars = pythonFunction.envVars();
        this.pythonExec = pythonFunction.pythonExec();
        this.pythonWorker = None$.MODULE$;
        this.pythonWorkerFactory = None$.MODULE$;
        this.pythonVer = pythonFunction.pythonVer();
    }
}
