package org.apache.spark.shuffle.sort;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.serializer.Serializer;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.DiskBlockManager;
import org.apache.spark.storage.DiskBlockObjectWriter;
import org.apache.spark.storage.TempShuffleBlockId;
import org.apache.spark.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark-project.guava.io.Closeables;
import scala.Product2;
import scala.Tuple2;
import scala.collection.Iterator;

/* loaded from: input_file:org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.class */
final class BypassMergeSortShuffleWriter<K, V> implements SortShuffleFileWriter<K, V> {
    private final Logger logger = LoggerFactory.getLogger(BypassMergeSortShuffleWriter.class);
    private final int fileBufferSize;
    private final boolean transferToEnabled;
    private final int numPartitions;
    private final BlockManager blockManager;
    private final Partitioner partitioner;
    private final ShuffleWriteMetrics writeMetrics;
    private final Serializer serializer;
    private DiskBlockObjectWriter[] partitionWriters;
    static final /* synthetic */ boolean $assertionsDisabled;

    public BypassMergeSortShuffleWriter(SparkConf sparkConf, BlockManager blockManager, Partitioner partitioner, ShuffleWriteMetrics shuffleWriteMetrics, Serializer serializer) {
        this.fileBufferSize = ((int) sparkConf.getSizeAsKb("spark.shuffle.file.buffer", "32k")) * 1024;
        this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true);
        this.numPartitions = partitioner.numPartitions();
        this.blockManager = blockManager;
        this.partitioner = partitioner;
        this.writeMetrics = shuffleWriteMetrics;
        this.serializer = serializer;
    }

    @Override // org.apache.spark.shuffle.sort.SortShuffleFileWriter
    public void insertAll(Iterator<Product2<K, V>> iterator) throws IOException {
        if (!$assertionsDisabled && this.partitionWriters != null) {
            throw new AssertionError();
        }
        if (iterator.hasNext()) {
            SerializerInstance newInstance = this.serializer.newInstance();
            long nanoTime = System.nanoTime();
            this.partitionWriters = new DiskBlockObjectWriter[this.numPartitions];
            for (int i = 0; i < this.numPartitions; i++) {
                Tuple2<TempShuffleBlockId, File> createTempShuffleBlock = this.blockManager.diskBlockManager().createTempShuffleBlock();
                this.partitionWriters[i] = this.blockManager.getDiskWriter((BlockId) createTempShuffleBlock._1(), (File) createTempShuffleBlock._2(), newInstance, this.fileBufferSize, this.writeMetrics).open();
            }
            this.writeMetrics.incShuffleWriteTime(System.nanoTime() - nanoTime);
            while (iterator.hasNext()) {
                Product2 product2 = (Product2) iterator.next();
                Object _1 = product2._1();
                this.partitionWriters[this.partitioner.getPartition(_1)].write(_1, product2._2());
            }
            for (DiskBlockObjectWriter diskBlockObjectWriter : this.partitionWriters) {
                diskBlockObjectWriter.commitAndClose();
            }
        }
    }

    @Override // org.apache.spark.shuffle.sort.SortShuffleFileWriter
    public long[] writePartitionedFile(BlockId blockId, TaskContext taskContext, File file) throws IOException {
        long[] jArr = new long[this.numPartitions];
        if (this.partitionWriters == null) {
            return jArr;
        }
        FileOutputStream fileOutputStream = new FileOutputStream(file, true);
        long nanoTime = System.nanoTime();
        for (int i = 0; i < this.numPartitions; i++) {
            try {
                FileInputStream fileInputStream = new FileInputStream(this.partitionWriters[i].fileSegment().file());
                boolean z = true;
                try {
                    jArr[i] = Utils.copyStream(fileInputStream, fileOutputStream, false, this.transferToEnabled);
                    z = false;
                    Closeables.close(fileInputStream, false);
                    if (!this.blockManager.diskBlockManager().getFile(this.partitionWriters[i].blockId()).delete()) {
                        this.logger.error("Unable to delete file for partition {}", Integer.valueOf(i));
                    }
                } finally {
                }
            } catch (Throwable th) {
                Closeables.close(fileOutputStream, true);
                this.writeMetrics.incShuffleWriteTime(System.nanoTime() - nanoTime);
                throw th;
            }
        }
        Closeables.close(fileOutputStream, false);
        this.writeMetrics.incShuffleWriteTime(System.nanoTime() - nanoTime);
        this.partitionWriters = null;
        return jArr;
    }

    @Override // org.apache.spark.shuffle.sort.SortShuffleFileWriter
    public void stop() throws IOException {
        if (this.partitionWriters != null) {
            try {
                DiskBlockManager diskBlockManager = this.blockManager.diskBlockManager();
                for (DiskBlockObjectWriter diskBlockObjectWriter : this.partitionWriters) {
                    diskBlockObjectWriter.revertPartialWritesAndClose();
                    if (!diskBlockManager.getFile(diskBlockObjectWriter.blockId()).delete()) {
                        this.logger.error("Error while deleting file for block {}", diskBlockObjectWriter.blockId());
                    }
                }
            } finally {
                this.partitionWriters = null;
            }
        }
    }

    static {
        $assertionsDisabled = !BypassMergeSortShuffleWriter.class.desiredAssertionStatus();
    }
}
