package spark.broadcast;

import com.ning.compress.lzf.LZFInputStream;
import com.ning.compress.lzf.LZFOutputStream;
import it.unimi.dsi.fastutil.io.FastBufferedInputStream;
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.net.URL;
import java.util.Iterator;
import java.util.Map;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.Predef$;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.mutable.StringBuilder;
import scala.runtime.BoxesRunTime;
import scala.runtime.TraitSetter;
import spark.HttpServer;
import spark.Logging;
import spark.SparkEnv$;
import spark.Utils$;
import spark.serializer.DeserializationStream;
import spark.serializer.SerializationStream;
import spark.util.MetadataCleaner;
import spark.util.TimeStampedHashSet;

/* compiled from: HttpBroadcast.scala */
/* loaded from: input_file:spark/broadcast/HttpBroadcast$.class */
public final class HttpBroadcast$ implements Logging, Serializable {
    public static final HttpBroadcast$ MODULE$ = null;
    private boolean initialized;
    private File broadcastDir;
    private boolean compress;
    private int bufferSize;
    private String spark$broadcast$HttpBroadcast$$serverUri;
    private HttpServer server;
    private final TimeStampedHashSet<String> files;
    private final MetadataCleaner cleaner;
    private transient Logger spark$Logging$$log_;

    static {
        new HttpBroadcast$();
    }

    @Override // spark.Logging
    public final Logger spark$Logging$$log_() {
        return this.spark$Logging$$log_;
    }

    @Override // spark.Logging
    @TraitSetter
    public final void spark$Logging$$log__$eq(Logger logger) {
        this.spark$Logging$$log_ = logger;
    }

    @Override // spark.Logging
    public Logger log() {
        return Logging.Cclass.log(this);
    }

    @Override // spark.Logging
    public void logInfo(Function0<String> function0) {
        Logging.Cclass.logInfo(this, function0);
    }

    @Override // spark.Logging
    public void logDebug(Function0<String> function0) {
        Logging.Cclass.logDebug(this, function0);
    }

    @Override // spark.Logging
    public void logTrace(Function0<String> function0) {
        Logging.Cclass.logTrace(this, function0);
    }

    @Override // spark.Logging
    public void logWarning(Function0<String> function0) {
        Logging.Cclass.logWarning(this, function0);
    }

    @Override // spark.Logging
    public void logError(Function0<String> function0) {
        Logging.Cclass.logError(this, function0);
    }

    @Override // spark.Logging
    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.Cclass.logInfo(this, function0, th);
    }

    @Override // spark.Logging
    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.Cclass.logDebug(this, function0, th);
    }

    @Override // spark.Logging
    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.Cclass.logTrace(this, function0, th);
    }

    @Override // spark.Logging
    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.Cclass.logWarning(this, function0, th);
    }

    @Override // spark.Logging
    public void logError(Function0<String> function0, Throwable th) {
        Logging.Cclass.logError(this, function0, th);
    }

    @Override // spark.Logging
    public void initLogging() {
        Logging.Cclass.initLogging(this);
    }

    private boolean initialized() {
        return this.initialized;
    }

    private void initialized_$eq(boolean z) {
        this.initialized = z;
    }

    private File broadcastDir() {
        return this.broadcastDir;
    }

    private void broadcastDir_$eq(File file) {
        this.broadcastDir = file;
    }

    private boolean compress() {
        return this.compress;
    }

    private void compress_$eq(boolean z) {
        this.compress = z;
    }

    private int bufferSize() {
        return this.bufferSize;
    }

    private void bufferSize_$eq(int i) {
        this.bufferSize = i;
    }

    public final String spark$broadcast$HttpBroadcast$$serverUri() {
        return this.spark$broadcast$HttpBroadcast$$serverUri;
    }

    private void spark$broadcast$HttpBroadcast$$serverUri_$eq(String str) {
        this.spark$broadcast$HttpBroadcast$$serverUri = str;
    }

    private HttpServer server() {
        return this.server;
    }

    private void server_$eq(HttpServer httpServer) {
        this.server = httpServer;
    }

    private TimeStampedHashSet<String> files() {
        return this.files;
    }

    private MetadataCleaner cleaner() {
        return this.cleaner;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10 */
    public void initialize(boolean z) {
        ?? r0 = this;
        synchronized (r0) {
            if (!initialized()) {
                bufferSize_$eq(Predef$.MODULE$.augmentString(System.getProperty("spark.buffer.size", "65536")).toInt());
                compress_$eq(Predef$.MODULE$.augmentString(System.getProperty("spark.broadcast.compress", "true")).toBoolean());
                if (z) {
                    createServer();
                }
                spark$broadcast$HttpBroadcast$$serverUri_$eq(System.getProperty("spark.httpBroadcast.uri"));
                initialized_$eq(true);
            }
            r0 = this;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v7 */
    public void stop() {
        ?? r0 = this;
        synchronized (r0) {
            if (server() != null) {
                server().stop();
                server_$eq(null);
            }
            initialized_$eq(false);
            cleaner().cancel();
            r0 = this;
        }
    }

    private void createServer() {
        broadcastDir_$eq(Utils$.MODULE$.createTempDir(Utils$.MODULE$.getLocalDir()));
        server_$eq(new HttpServer(broadcastDir()));
        server().start();
        spark$broadcast$HttpBroadcast$$serverUri_$eq(server().uri());
        System.setProperty("spark.httpBroadcast.uri", this.spark$broadcast$HttpBroadcast$$serverUri);
        Logging.Cclass.logInfo(this, new HttpBroadcast$$anonfun$createServer$1());
    }

    public void write(long j, Object obj) {
        File file = new File(broadcastDir(), new StringBuilder().append("broadcast-").append(BoxesRunTime.boxToLong(j)).toString());
        SerializationStream serializeStream = SparkEnv$.MODULE$.get().serializer().newInstance().serializeStream(compress() ? new LZFOutputStream(new FileOutputStream(file)) : new FastBufferedOutputStream(new FileOutputStream(file), bufferSize()));
        serializeStream.writeObject(obj);
        serializeStream.close();
        files().$plus$eq((TimeStampedHashSet<String>) file.getAbsolutePath());
    }

    public <T> T read(long j) {
        String stringBuilder = new StringBuilder().append(spark$broadcast$HttpBroadcast$$serverUri()).append("/broadcast-").append(BoxesRunTime.boxToLong(j)).toString();
        DeserializationStream deserializeStream = SparkEnv$.MODULE$.get().serializer().newInstance().deserializeStream(compress() ? new LZFInputStream(new URL(stringBuilder).openStream()) : new FastBufferedInputStream(new URL(stringBuilder).openStream(), bufferSize()));
        T t = (T) deserializeStream.readObject();
        deserializeStream.close();
        return t;
    }

    public void cleanup(long j) {
        String str;
        Iterator<Map.Entry<String, Object>> it = files().internalMap().entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, Object> next = it.next();
            Tuple2 tuple2 = new Tuple2(next.getKey(), next.getValue());
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            Tuple2 tuple22 = new Tuple2(tuple2._1(), tuple2._2());
            str = (String) tuple22._1();
            if (tuple22._2$mcJ$sp() < j) {
                try {
                    it.remove();
                    new File(str.toString()).delete();
                    Logging.Cclass.logInfo(this, new HttpBroadcast$$anonfun$cleanup$1(str));
                } catch (Exception e) {
                    Logging.Cclass.logWarning(this, new HttpBroadcast$$anonfun$cleanup$2(str), e);
                }
            }
            Logging.Cclass.logWarning(this, new HttpBroadcast$$anonfun$cleanup$2(str), e);
        }
    }

    public Object readResolve() {
        return MODULE$;
    }

    private HttpBroadcast$() {
        MODULE$ = this;
        spark$Logging$$log__$eq(null);
        this.initialized = false;
        this.broadcastDir = null;
        this.compress = false;
        this.bufferSize = 65536;
        this.spark$broadcast$HttpBroadcast$$serverUri = null;
        this.server = null;
        this.files = new TimeStampedHashSet<>();
        this.cleaner = new MetadataCleaner("HttpBroadcast", new HttpBroadcast$$anonfun$1());
    }
}
