package org.apache.spark.shuffle.unsafe;

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.shuffle.ShuffleMemoryManager;
import org.apache.spark.shuffle.unsafe.UnsafeShuffleInMemorySorter;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.BlockObjectWriter;
import org.apache.spark.storage.TempShuffleBlockId;
import org.apache.spark.unsafe.PlatformDependent;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.memory.TaskMemoryManager;
import org.apache.spark.util.Utils;
import org.p000sparkproject.guava.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.class */
public final class UnsafeShuffleExternalSorter {
    private static final int PAGE_SIZE = 134217728;

    @VisibleForTesting
    static final int DISK_WRITE_BUFFER_SIZE = 1048576;

    @VisibleForTesting
    static final int MAX_RECORD_SIZE = 134217724;
    private final int initialSize;
    private final int numPartitions;
    private final TaskMemoryManager memoryManager;
    private final ShuffleMemoryManager shuffleMemoryManager;
    private final BlockManager blockManager;
    private final TaskContext taskContext;
    private final ShuffleWriteMetrics writeMetrics;
    private final int fileBufferSizeBytes;
    private UnsafeShuffleInMemorySorter sorter;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Logger logger = LoggerFactory.getLogger(UnsafeShuffleExternalSorter.class);
    private final LinkedList<MemoryBlock> allocatedPages = new LinkedList<>();
    private final LinkedList<SpillInfo> spills = new LinkedList<>();
    private MemoryBlock currentPage = null;
    private long currentPagePosition = -1;
    private long freeSpaceInCurrentPage = 0;

    public UnsafeShuffleExternalSorter(TaskMemoryManager taskMemoryManager, ShuffleMemoryManager shuffleMemoryManager, BlockManager blockManager, TaskContext taskContext, int i, int i2, SparkConf sparkConf, ShuffleWriteMetrics shuffleWriteMetrics) throws IOException {
        this.memoryManager = taskMemoryManager;
        this.shuffleMemoryManager = shuffleMemoryManager;
        this.blockManager = blockManager;
        this.taskContext = taskContext;
        this.initialSize = i;
        this.numPartitions = i2;
        this.fileBufferSizeBytes = ((int) sparkConf.getSizeAsKb("spark.shuffle.file.buffer", "32k")) * 1024;
        this.writeMetrics = shuffleWriteMetrics;
        initializeForWriting();
    }

    private void initializeForWriting() throws IOException {
        long j = this.initialSize * 8;
        long tryToAcquire = this.shuffleMemoryManager.tryToAcquire(j);
        if (tryToAcquire != j) {
            this.shuffleMemoryManager.release(tryToAcquire);
            throw new IOException("Could not acquire " + j + " bytes of memory");
        }
        this.sorter = new UnsafeShuffleInMemorySorter(this.initialSize);
    }

    private void writeSortedFile(boolean z) throws IOException {
        ShuffleWriteMetrics shuffleWriteMetrics = z ? this.writeMetrics : new ShuffleWriteMetrics();
        UnsafeShuffleInMemorySorter.UnsafeShuffleSorterIterator sortedIterator = this.sorter.getSortedIterator();
        byte[] bArr = new byte[DISK_WRITE_BUFFER_SIZE];
        Tuple2<TempShuffleBlockId, File> createTempShuffleBlock = this.blockManager.diskBlockManager().createTempShuffleBlock();
        File file = (File) createTempShuffleBlock._2();
        TempShuffleBlockId tempShuffleBlockId = (TempShuffleBlockId) createTempShuffleBlock._1();
        SpillInfo spillInfo = new SpillInfo(this.numPartitions, file, tempShuffleBlockId);
        DummySerializerInstance dummySerializerInstance = DummySerializerInstance.INSTANCE;
        BlockObjectWriter diskWriter = this.blockManager.getDiskWriter(tempShuffleBlockId, file, dummySerializerInstance, this.fileBufferSizeBytes, shuffleWriteMetrics);
        int i = -1;
        while (sortedIterator.hasNext()) {
            sortedIterator.loadNext();
            int partitionId = sortedIterator.packedRecordPointer.getPartitionId();
            if (!$assertionsDisabled && partitionId < i) {
                throw new AssertionError();
            }
            if (partitionId != i) {
                if (i != -1) {
                    diskWriter.commitAndClose();
                    spillInfo.partitionLengths[i] = diskWriter.fileSegment().length();
                }
                i = partitionId;
                diskWriter = this.blockManager.getDiskWriter(tempShuffleBlockId, file, dummySerializerInstance, this.fileBufferSizeBytes, shuffleWriteMetrics);
            }
            long recordPointer = sortedIterator.packedRecordPointer.getRecordPointer();
            Object page = this.memoryManager.getPage(recordPointer);
            long offsetInPage = this.memoryManager.getOffsetInPage(recordPointer);
            int i2 = PlatformDependent.UNSAFE.getInt(page, offsetInPage);
            long j = offsetInPage + 4;
            while (i2 > 0) {
                int min = Math.min(DISK_WRITE_BUFFER_SIZE, i2);
                PlatformDependent.copyMemory(page, j, bArr, PlatformDependent.BYTE_ARRAY_OFFSET, min);
                diskWriter.write(bArr, 0, min);
                j += min;
                i2 -= min;
            }
            diskWriter.recordWritten();
        }
        if (diskWriter != null) {
            diskWriter.commitAndClose();
            if (i != -1) {
                spillInfo.partitionLengths[i] = diskWriter.fileSegment().length();
                this.spills.add(spillInfo);
            }
        }
        if (z) {
            return;
        }
        this.writeMetrics.incShuffleRecordsWritten(shuffleWriteMetrics.shuffleRecordsWritten());
        this.taskContext.taskMetrics().incDiskBytesSpilled(shuffleWriteMetrics.shuffleBytesWritten());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public void spill() throws IOException {
        Logger logger = this.logger;
        Object[] objArr = new Object[4];
        objArr[0] = Long.valueOf(Thread.currentThread().getId());
        objArr[1] = Utils.bytesToString(getMemoryUsage());
        objArr[2] = Integer.valueOf(this.spills.size());
        objArr[3] = this.spills.size() > 1 ? " times" : " time";
        logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)", objArr);
        writeSortedFile(false);
        long memoryUsage = this.sorter.getMemoryUsage();
        this.sorter = null;
        this.shuffleMemoryManager.release(memoryUsage);
        this.taskContext.taskMetrics().incMemoryBytesSpilled(freeMemory());
        initializeForWriting();
    }

    private long getMemoryUsage() {
        return this.sorter.getMemoryUsage() + (this.allocatedPages.size() * 134217728);
    }

    private long freeMemory() {
        long j = 0;
        Iterator<MemoryBlock> it = this.allocatedPages.iterator();
        while (it.hasNext()) {
            MemoryBlock next = it.next();
            this.memoryManager.freePage(next);
            this.shuffleMemoryManager.release(next.size());
            j += next.size();
        }
        this.allocatedPages.clear();
        this.currentPage = null;
        this.currentPagePosition = -1L;
        this.freeSpaceInCurrentPage = 0L;
        return j;
    }

    public void cleanupAfterError() {
        freeMemory();
        Iterator<SpillInfo> it = this.spills.iterator();
        while (it.hasNext()) {
            SpillInfo next = it.next();
            if (next.file.exists() && !next.file.delete()) {
                this.logger.error("Unable to delete spill file {}", next.file.getPath());
            }
        }
        if (this.sorter != null) {
            this.shuffleMemoryManager.release(this.sorter.getMemoryUsage());
            this.sorter = null;
        }
    }

    private boolean haveSpaceForRecord(int i) {
        if ($assertionsDisabled || i > 0) {
            return this.sorter.hasSpaceForAnotherRecord() && ((long) i) <= this.freeSpaceInCurrentPage;
        }
        throw new AssertionError();
    }

    private void allocateSpaceForRecord(int i) throws IOException {
        if (!this.sorter.hasSpaceForAnotherRecord()) {
            this.logger.debug("Attempting to expand sort pointer array");
            long memoryUsage = this.sorter.getMemoryUsage();
            long j = memoryUsage * 2;
            long tryToAcquire = this.shuffleMemoryManager.tryToAcquire(j);
            if (tryToAcquire < j) {
                this.shuffleMemoryManager.release(tryToAcquire);
                spill();
            } else {
                this.sorter.expandPointerArray();
                this.shuffleMemoryManager.release(memoryUsage);
            }
        }
        if (i > this.freeSpaceInCurrentPage) {
            this.logger.trace("Required space {} is less than free space in current page ({})", Integer.valueOf(i), Long.valueOf(this.freeSpaceInCurrentPage));
            if (i > PAGE_SIZE) {
                throw new IOException("Required space " + i + " is greater than page size (" + PAGE_SIZE + ")");
            }
            long tryToAcquire2 = this.shuffleMemoryManager.tryToAcquire(134217728L);
            if (tryToAcquire2 < 134217728) {
                this.shuffleMemoryManager.release(tryToAcquire2);
                spill();
                long tryToAcquire3 = this.shuffleMemoryManager.tryToAcquire(134217728L);
                if (tryToAcquire3 != 134217728) {
                    this.shuffleMemoryManager.release(tryToAcquire3);
                    throw new IOException("Unable to acquire 134217728 bytes of memory");
                }
            }
            this.currentPage = this.memoryManager.allocatePage(134217728L);
            this.currentPagePosition = this.currentPage.getBaseOffset();
            this.freeSpaceInCurrentPage = 134217728L;
            this.allocatedPages.add(this.currentPage);
        }
    }

    public void insertRecord(Object obj, long j, int i, int i2) throws IOException {
        int i3 = i + 4;
        if (!haveSpaceForRecord(i3)) {
            allocateSpaceForRecord(i3);
        }
        long encodePageNumberAndOffset = this.memoryManager.encodePageNumberAndOffset(this.currentPage, this.currentPagePosition);
        Object baseObject = this.currentPage.getBaseObject();
        PlatformDependent.UNSAFE.putInt(baseObject, this.currentPagePosition, i);
        this.currentPagePosition += 4;
        this.freeSpaceInCurrentPage -= 4;
        PlatformDependent.copyMemory(obj, j, baseObject, this.currentPagePosition, i);
        this.currentPagePosition += i;
        this.freeSpaceInCurrentPage -= i;
        this.sorter.insertRecord(encodePageNumberAndOffset, i2);
    }

    public SpillInfo[] closeAndGetSpills() throws IOException {
        try {
            if (this.sorter != null) {
                writeSortedFile(true);
                freeMemory();
            }
            return (SpillInfo[]) this.spills.toArray(new SpillInfo[this.spills.size()]);
        } catch (IOException e) {
            cleanupAfterError();
            throw e;
        }
    }

    static {
        $assertionsDisabled = !UnsafeShuffleExternalSorter.class.desiredAssertionStatus();
    }
}
