package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.SortBuffer;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;

@NotThreadSafe
/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.class */
public class SortMergeResultPartition extends ResultPartition {
    private final Object lock;

    @GuardedBy("lock")
    private final Set<SortMergeSubpartitionReader> readers;

    @GuardedBy("lock")
    private PartitionedFile resultFile;
    private final int[] numDataBuffers;
    private final MemorySegment writeBuffer;
    private final int networkBufferSize;
    private final PartitionedFileWriter fileWriter;
    private SortBuffer currentSortBuffer;

    public SortMergeResultPartition(String str, int i, ResultPartitionID resultPartitionID, ResultPartitionType resultPartitionType, int i2, int i3, int i4, ResultPartitionManager resultPartitionManager, String str2, @Nullable BufferCompressor bufferCompressor, SupplierWithException<BufferPool, IOException> supplierWithException) {
        super(str, i, resultPartitionID, resultPartitionType, i2, i3, resultPartitionManager, bufferCompressor, supplierWithException);
        this.lock = new Object();
        this.readers = new HashSet();
        this.networkBufferSize = i4;
        this.numDataBuffers = new int[i2];
        this.writeBuffer = MemorySegmentFactory.allocateUnpooledOffHeapMemory(i4);
        PartitionedFileWriter partitionedFileWriter = null;
        try {
            partitionedFileWriter = new PartitionedFileWriter(i2, 4194304, str2);
        } catch (Throwable th) {
            ExceptionUtils.rethrow(th);
        }
        this.fileWriter = partitionedFileWriter;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition
    protected void releaseInternal() {
        synchronized (this.lock) {
            if (this.resultFile == null) {
                this.fileWriter.releaseQuietly();
            }
            if (this.readers.isEmpty() && this.resultFile != null) {
                this.resultFile.deleteQuietly();
                this.resultFile = null;
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void emitRecord(ByteBuffer byteBuffer, int i) throws IOException {
        emit(byteBuffer, i, Buffer.DataType.DATA_BUFFER);
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void broadcastRecord(ByteBuffer byteBuffer) throws IOException {
        broadcast(byteBuffer, Buffer.DataType.DATA_BUFFER);
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void broadcastEvent(AbstractEvent abstractEvent, boolean z) throws IOException {
        Buffer buffer = EventSerializer.toBuffer(abstractEvent, z);
        try {
            broadcast(buffer.getNioBufferReadable(), buffer.getDataType());
            buffer.recycleBuffer();
        } catch (Throwable th) {
            buffer.recycleBuffer();
            throw th;
        }
    }

    private void broadcast(ByteBuffer byteBuffer, Buffer.DataType dataType) throws IOException {
        for (int i = 0; i < this.numSubpartitions; i++) {
            byteBuffer.rewind();
            emit(byteBuffer, i, dataType);
        }
    }

    private void emit(ByteBuffer byteBuffer, int i, Buffer.DataType dataType) throws IOException {
        checkInProduceState();
        SortBuffer sortBuffer = getSortBuffer();
        if (sortBuffer.append(byteBuffer, i, dataType)) {
            return;
        }
        if (sortBuffer.hasRemaining()) {
            flushCurrentSortBuffer();
            emit(byteBuffer, i, dataType);
        } else {
            this.currentSortBuffer.finish();
            this.currentSortBuffer.release();
            writeLargeRecord(byteBuffer, i, dataType);
        }
    }

    private void releaseCurrentSortBuffer() {
        if (this.currentSortBuffer != null) {
            this.currentSortBuffer.release();
        }
    }

    private SortBuffer getSortBuffer() {
        if (this.currentSortBuffer != null && !this.currentSortBuffer.isFinished()) {
            return this.currentSortBuffer;
        }
        this.currentSortBuffer = new PartitionSortedBuffer(this.lock, this.bufferPool, this.numSubpartitions, this.networkBufferSize, null);
        return this.currentSortBuffer;
    }

    private void flushCurrentSortBuffer() throws IOException {
        if (this.currentSortBuffer == null) {
            return;
        }
        this.currentSortBuffer.finish();
        if (this.currentSortBuffer.hasRemaining()) {
            this.fileWriter.startNewRegion();
            while (this.currentSortBuffer.hasRemaining()) {
                SortBuffer.BufferWithChannel copyIntoSegment = this.currentSortBuffer.copyIntoSegment(this.writeBuffer);
                writeCompressedBufferIfPossible(copyIntoSegment.getBuffer(), copyIntoSegment.getChannelIndex());
            }
        }
        this.currentSortBuffer.release();
    }

    private void writeCompressedBufferIfPossible(Buffer buffer, int i) throws IOException {
        updateStatistics(buffer, i);
        try {
            if (canBeCompressed(buffer)) {
                buffer = this.bufferCompressor.compressToIntermediateBuffer(buffer);
            }
            this.fileWriter.writeBuffer(buffer, i);
            buffer.recycleBuffer();
        } catch (Throwable th) {
            buffer.recycleBuffer();
            throw th;
        }
    }

    private void updateStatistics(Buffer buffer, int i) {
        this.numBuffersOut.inc();
        this.numBytesOut.inc(buffer.readableBytes());
        if (buffer.isBuffer()) {
            int[] iArr = this.numDataBuffers;
            iArr[i] = iArr[i] + 1;
        }
    }

    private void writeLargeRecord(ByteBuffer byteBuffer, int i, Buffer.DataType dataType) throws IOException {
        this.fileWriter.startNewRegion();
        while (byteBuffer.hasRemaining()) {
            int min = Math.min(byteBuffer.remaining(), this.writeBuffer.size());
            this.writeBuffer.put(0, byteBuffer, min);
            writeCompressedBufferIfPossible(new NetworkBuffer(this.writeBuffer, memorySegment -> {
            }, dataType, min), i);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void releaseReader(SortMergeSubpartitionReader sortMergeSubpartitionReader) {
        synchronized (this.lock) {
            this.readers.remove(sortMergeSubpartitionReader);
            if (this.readers.isEmpty() && isReleased()) {
                releaseInternal();
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition, org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void finish() throws IOException {
        broadcastEvent(EndOfPartitionEvent.INSTANCE, false);
        flushCurrentSortBuffer();
        synchronized (this.lock) {
            Preconditions.checkState(!isReleased(), "Result partition is already released.");
            this.resultFile = this.fileWriter.finish();
            LOG.info("New partitioned file produced: {}.", this.resultFile);
        }
        super.finish();
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition, org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter, java.lang.AutoCloseable
    public void close() {
        releaseCurrentSortBuffer();
        super.close();
        IOUtils.closeQuietly(this.fileWriter);
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public ResultSubpartitionView createSubpartitionView(int i, BufferAvailabilityListener bufferAvailabilityListener) throws IOException {
        SortMergeSubpartitionReader sortMergeSubpartitionReader;
        synchronized (this.lock) {
            Preconditions.checkElementIndex(i, this.numSubpartitions, "Subpartition not found.");
            Preconditions.checkState(!isReleased(), "Partition released.");
            Preconditions.checkState(isFinished(), "Trying to read unfinished blocking partition.");
            sortMergeSubpartitionReader = new SortMergeSubpartitionReader(i, this.numDataBuffers[i], this.networkBufferSize, this, bufferAvailabilityListener, this.resultFile);
            this.readers.add(sortMergeSubpartitionReader);
        }
        return sortMergeSubpartitionReader;
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void flushAll() {
        try {
            flushCurrentSortBuffer();
        } catch (IOException e) {
            LOG.error("Failed to flush the current sort buffer.", e);
        }
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void flush(int i) {
        try {
            flushCurrentSortBuffer();
        } catch (IOException e) {
            LOG.error("Failed to flush the current sort buffer.", e);
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition, org.apache.flink.runtime.io.AvailabilityProvider
    public CompletableFuture<?> getAvailableFuture() {
        return AVAILABLE;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition
    public int getNumberOfQueuedBuffers() {
        return 0;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition
    public int getNumberOfQueuedBuffers(int i) {
        return 0;
    }

    @VisibleForTesting
    PartitionedFile getResultFile() {
        return this.resultFile;
    }
}
