package org.apache.spark.rdd;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkEnv;
import org.apache.spark.SparkEnv$;
import org.apache.spark.SparkException;
import org.apache.spark.TaskContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.internal.Logging;
import org.apache.spark.io.CompressionCodec$;
import org.apache.spark.serializer.DeserializationStream;
import org.apache.spark.serializer.SerializationStream;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.util.SerializableConfiguration;
import org.apache.spark.util.Utils$;
import org.slf4j.Logger;
import org.spark_project.jetty.http.HttpStatus;
import org.spark_project.jetty.util.IO;
import scala.Function0;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.collection.Iterator;
import scala.collection.immutable.StringOps;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.control.NonFatal$;

/* compiled from: ReliableCheckpointRDD.scala */
/* loaded from: input_file:org/apache/spark/rdd/ReliableCheckpointRDD$.class */
public final class ReliableCheckpointRDD$ implements Logging, Serializable {
    public static ReliableCheckpointRDD$ MODULE$;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    static {
        new ReliableCheckpointRDD$();
    }

    @Override // org.apache.spark.internal.Logging
    public String logName() {
        return logName();
    }

    @Override // org.apache.spark.internal.Logging
    public Logger log() {
        return log();
    }

    @Override // org.apache.spark.internal.Logging
    public void logInfo(Function0<String> function0) {
        logInfo(function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logDebug(Function0<String> function0) {
        logDebug(function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logTrace(Function0<String> function0) {
        logTrace(function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logWarning(Function0<String> function0) {
        logWarning(function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logError(Function0<String> function0) {
        logError(function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logInfo(Function0<String> function0, Throwable th) {
        logInfo(function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public void logDebug(Function0<String> function0, Throwable th) {
        logDebug(function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public void logTrace(Function0<String> function0, Throwable th) {
        logTrace(function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public void logWarning(Function0<String> function0, Throwable th) {
        logWarning(function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public void logError(Function0<String> function0, Throwable th) {
        logError(function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public boolean isTraceEnabled() {
        return isTraceEnabled();
    }

    @Override // org.apache.spark.internal.Logging
    public void initializeLogIfNecessary(boolean z) {
        initializeLogIfNecessary(z);
    }

    @Override // org.apache.spark.internal.Logging
    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return initializeLogIfNecessary(z, z2);
    }

    @Override // org.apache.spark.internal.Logging
    public boolean initializeLogIfNecessary$default$2() {
        return initializeLogIfNecessary$default$2();
    }

    @Override // org.apache.spark.internal.Logging
    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    @Override // org.apache.spark.internal.Logging
    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    public <T> Option<Partitioner> $lessinit$greater$default$3() {
        return None$.MODULE$;
    }

    public String org$apache$spark$rdd$ReliableCheckpointRDD$$checkpointFileName(int i) {
        return new StringOps(Predef$.MODULE$.augmentString("part-%05d")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(i)}));
    }

    private String checkpointPartitionerFileName() {
        return "_partitioner";
    }

    public <T> ReliableCheckpointRDD<T> writeRDDToCheckpointDirectory(RDD<T> rdd, String str, int i, ClassTag<T> classTag) {
        long nanoTime = System.nanoTime();
        SparkContext sparkContext = rdd.sparkContext();
        Path path = new Path(str);
        if (!path.getFileSystem(sparkContext.hadoopConfiguration()).mkdirs(path)) {
            throw new SparkException(new StringBuilder(33).append("Failed to create checkpoint path ").append(path).toString());
        }
        Broadcast<T> broadcast = sparkContext.broadcast(new SerializableConfiguration(sparkContext.hadoopConfiguration()), ClassTag$.MODULE$.apply(SerializableConfiguration.class));
        String path2 = path.toString();
        int writePartitionToCheckpointFile$default$3 = writePartitionToCheckpointFile$default$3();
        sparkContext.runJob(rdd, (taskContext, iterator) -> {
            $anonfun$writeRDDToCheckpointDirectory$1(path2, broadcast, writePartitionToCheckpointFile$default$3, classTag, taskContext, iterator);
            return BoxedUnit.UNIT;
        }, ClassTag$.MODULE$.Unit());
        if (rdd.mo314partitioner().nonEmpty()) {
            writePartitionerToCheckpointDir(sparkContext, (Partitioner) rdd.mo314partitioner().get(), path);
        }
        long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime);
        logInfo(() -> {
            return new StringBuilder(23).append("Checkpointing took ").append(millis).append(" ms.").toString();
        });
        ReliableCheckpointRDD<T> reliableCheckpointRDD = new ReliableCheckpointRDD<>(sparkContext, path.toString(), rdd.mo314partitioner(), classTag);
        if (reliableCheckpointRDD.partitions().length != rdd.partitions().length) {
            throw new SparkException(new StringBuilder(80).append("Checkpoint RDD has a different number of partitions from original RDD. Original ").append(new StringBuilder(33).append("RDD [ID: ").append(rdd.id()).append(", num of partitions: ").append(rdd.partitions().length).append("]; ").toString()).append(new StringBuilder(41).append("Checkpoint RDD [ID: ").append(reliableCheckpointRDD.id()).append(", num of partitions: ").toString()).append(new StringBuilder(2).append(reliableCheckpointRDD.partitions().length).append("].").toString()).toString());
        }
        return reliableCheckpointRDD;
    }

    public <T> int writeRDDToCheckpointDirectory$default$3() {
        return -1;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <T> void writePartitionToCheckpointFile(String str, Broadcast<SerializableConfiguration> broadcast, int i, TaskContext taskContext, Iterator<T> iterator, ClassTag<T> classTag) {
        OutputStream create;
        SparkEnv sparkEnv = SparkEnv$.MODULE$.get();
        Path path = new Path(str);
        FileSystem fileSystem = path.getFileSystem(broadcast.value().value());
        String org$apache$spark$rdd$ReliableCheckpointRDD$$checkpointFileName = org$apache$spark$rdd$ReliableCheckpointRDD$$checkpointFileName(taskContext.partitionId());
        Path path2 = new Path(path, org$apache$spark$rdd$ReliableCheckpointRDD$$checkpointFileName);
        Path path3 = new Path(path, new StringBuilder(10).append(".").append(org$apache$spark$rdd$ReliableCheckpointRDD$$checkpointFileName).append("-attempt-").append(taskContext.attemptNumber()).toString());
        int i2 = sparkEnv.conf().getInt("spark.buffer.size", IO.bufferSize);
        if (i < 0) {
            OutputStream create2 = fileSystem.create(path3, false, i2);
            create = BoxesRunTime.unboxToBoolean(sparkEnv.conf().get(org.apache.spark.internal.config.package$.MODULE$.CHECKPOINT_COMPRESS())) ? CompressionCodec$.MODULE$.createCodec(sparkEnv.conf()).compressedOutputStream(create2) : create2;
        } else {
            create = fileSystem.create(path3, false, i2, fileSystem.getDefaultReplication(fileSystem.getWorkingDirectory()), i);
        }
        SerializationStream serializeStream = sparkEnv.serializer().newInstance().serializeStream(create);
        Utils$.MODULE$.tryWithSafeFinally(() -> {
            return serializeStream.writeAll(iterator, classTag);
        }, () -> {
            serializeStream.close();
        });
        if (fileSystem.rename(path3, path2)) {
            return;
        }
        if (!fileSystem.exists(path2)) {
            logInfo(() -> {
                return new StringBuilder(24).append("Deleting tempOutputPath ").append(path3).toString();
            });
            fileSystem.delete(path3, false);
            throw new IOException(new StringBuilder(50).append("Checkpoint failed: failed to save output of task: ").append(new StringBuilder(39).append(taskContext.attemptNumber()).append(" and final output path does not exist: ").append(path2).toString()).toString());
        }
        logInfo(() -> {
            return new StringBuilder(53).append("Final output path ").append(path2).append(" already exists; not overwriting it").toString();
        });
        if (fileSystem.delete(path3, false)) {
            return;
        }
        logWarning(() -> {
            return new StringBuilder(15).append("Error deleting ").append(path3).toString();
        });
    }

    public <T> int writePartitionToCheckpointFile$default$3() {
        return -1;
    }

    private void writePartitionerToCheckpointDir(SparkContext sparkContext, Partitioner partitioner, Path path) {
        try {
            Path path2 = new Path(path, checkpointPartitionerFileName());
            SerializationStream serializeStream = SparkEnv$.MODULE$.get().serializer().newInstance().serializeStream(path2.getFileSystem(sparkContext.hadoopConfiguration()).create(path2, false, sparkContext.conf().getInt("spark.buffer.size", IO.bufferSize)));
            Utils$.MODULE$.tryWithSafeFinally(() -> {
                return serializeStream.writeObject(partitioner, ClassTag$.MODULE$.apply(Partitioner.class));
            }, () -> {
                serializeStream.close();
            });
            logDebug(() -> {
                return new StringBuilder(23).append("Written partitioner to ").append(path2).toString();
            });
        } catch (Throwable th) {
            if (NonFatal$.MODULE$.unapply(th).isEmpty()) {
                throw th;
            }
            logWarning(() -> {
                return new StringBuilder(30).append("Error writing partitioner ").append(partitioner).append(" to ").append(path).toString();
            });
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
    }

    public Option<Partitioner> org$apache$spark$rdd$ReliableCheckpointRDD$$readCheckpointedPartitionerFile(SparkContext sparkContext, String str) {
        None$ none$;
        try {
            int i = sparkContext.conf().getInt("spark.buffer.size", IO.bufferSize);
            Path path = new Path(str, checkpointPartitionerFileName());
            FSDataInputStream open = path.getFileSystem(sparkContext.hadoopConfiguration()).open(path, i);
            SerializerInstance newInstance = SparkEnv$.MODULE$.get().serializer().newInstance();
            Partitioner partitioner = (Partitioner) Utils$.MODULE$.tryWithSafeFinally(() -> {
                DeserializationStream deserializeStream = newInstance.deserializeStream(open);
                return (Partitioner) Utils$.MODULE$.tryWithSafeFinally(() -> {
                    return (Partitioner) deserializeStream.readObject(ClassTag$.MODULE$.apply(Partitioner.class));
                }, () -> {
                    deserializeStream.close();
                });
            }, () -> {
                open.close();
            });
            logDebug(() -> {
                return new StringBuilder(22).append("Read partitioner from ").append(path).toString();
            });
            return new Some(partitioner);
        } catch (Throwable th) {
            if (th instanceof FileNotFoundException) {
                logDebug(() -> {
                    return "No partitioner file";
                }, (FileNotFoundException) th);
                none$ = None$.MODULE$;
            } else {
                Option unapply = NonFatal$.MODULE$.unapply(th);
                if (unapply.isEmpty()) {
                    throw th;
                }
                logWarning(() -> {
                    return new StringBuilder(HttpStatus.SWITCHING_PROTOCOLS_101).append("Error reading partitioner from ").append(str).append(", ").append("partitioner will not be recovered which may lead to performance loss").toString();
                }, (Throwable) unapply.get());
                none$ = None$.MODULE$;
            }
            return none$;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <T> Iterator<T> readCheckpointFile(Path path, Broadcast<SerializableConfiguration> broadcast, TaskContext taskContext) {
        SparkEnv sparkEnv = SparkEnv$.MODULE$.get();
        InputStream open = path.getFileSystem(broadcast.value().value()).open(path, sparkEnv.conf().getInt("spark.buffer.size", IO.bufferSize));
        DeserializationStream deserializeStream = sparkEnv.serializer().newInstance().deserializeStream(BoxesRunTime.unboxToBoolean(sparkEnv.conf().get(org.apache.spark.internal.config.package$.MODULE$.CHECKPOINT_COMPRESS())) ? CompressionCodec$.MODULE$.createCodec(sparkEnv.conf()).compressedInputStream(open) : open);
        taskContext.addTaskCompletionListener(taskContext2 -> {
            deserializeStream.close();
            return BoxedUnit.UNIT;
        });
        return (Iterator<T>) deserializeStream.asIterator();
    }

    private Object readResolve() {
        return MODULE$;
    }

    public static final /* synthetic */ void $anonfun$writeRDDToCheckpointDirectory$1(String str, Broadcast broadcast, int i, ClassTag classTag, TaskContext taskContext, Iterator iterator) {
        MODULE$.writePartitionToCheckpointFile(str, broadcast, i, taskContext, iterator, classTag);
    }

    private ReliableCheckpointRDD$() {
        MODULE$ = this;
        org$apache$spark$internal$Logging$$log__$eq(null);
    }
}
