package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import javax.annotation.Nullable;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.BufferWithSubpartition;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.TriConsumer;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/SortBufferAccumulator.class */
public class SortBufferAccumulator implements BufferAccumulator {
    private final int numSubpartitions;
    private final int numBuffers;
    private final int bufferSizeBytes;
    private final LinkedList<MemorySegment> freeSegments = new LinkedList<>();
    private final TieredStorageMemoryManager memoryManager;
    private final boolean isPartialRecordAllowed;

    @Nullable
    private TieredStorageSortBuffer currentDataBuffer;

    @Nullable
    private BufferRecycler bufferRecycler;

    @Nullable
    private TriConsumer<TieredStorageSubpartitionId, Buffer, Integer> accumulatedBufferFlusher;
    private boolean isBroadcastDataBuffer;

    public SortBufferAccumulator(int i, int i2, int i3, TieredStorageMemoryManager tieredStorageMemoryManager, boolean z) {
        this.numSubpartitions = i;
        this.bufferSizeBytes = i3;
        this.numBuffers = i2;
        this.memoryManager = tieredStorageMemoryManager;
        this.isPartialRecordAllowed = z;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferAccumulator
    public void setup(TriConsumer<TieredStorageSubpartitionId, Buffer, Integer> triConsumer) {
        this.accumulatedBufferFlusher = triConsumer;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferAccumulator
    public void receive(ByteBuffer byteBuffer, TieredStorageSubpartitionId tieredStorageSubpartitionId, Buffer.DataType dataType, boolean z) throws IOException {
        int subpartitionId = tieredStorageSubpartitionId.getSubpartitionId();
        switchCurrentDataBufferIfNeeded(z);
        if (((TieredStorageSortBuffer) Preconditions.checkNotNull(this.currentDataBuffer)).append(byteBuffer, subpartitionId, dataType)) {
            if (!this.currentDataBuffer.hasRemaining()) {
                this.currentDataBuffer.release();
                writeLargeRecord(byteBuffer, subpartitionId, dataType);
            } else {
                flushDataBuffer();
                Preconditions.checkState(byteBuffer.hasRemaining(), "Empty record.");
                receive(byteBuffer, tieredStorageSubpartitionId, dataType, z);
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferAccumulator, java.lang.AutoCloseable
    public void close() {
        flushCurrentDataBuffer();
        releaseFreeBuffers();
        if (this.currentDataBuffer != null) {
            this.currentDataBuffer.release();
        }
    }

    private void switchCurrentDataBufferIfNeeded(boolean z) {
        if (z != this.isBroadcastDataBuffer || this.currentDataBuffer == null || this.currentDataBuffer.isReleased() || this.currentDataBuffer.isFinished()) {
            this.isBroadcastDataBuffer = z;
            flushCurrentDataBuffer();
            this.currentDataBuffer = createNewDataBuffer();
        }
    }

    private TieredStorageSortBuffer createNewDataBuffer() {
        requestBuffers();
        return new TieredStorageSortBuffer(this.freeSegments, this::recycleBuffer, this.numSubpartitions, this.bufferSizeBytes, this.freeSegments.size() / 2, this.isPartialRecordAllowed);
    }

    private void requestBuffers() {
        while (this.freeSegments.size() < this.numBuffers) {
            Buffer requestBuffer = requestBuffer();
            this.freeSegments.add(((Buffer) Preconditions.checkNotNull(requestBuffer)).getMemorySegment());
            if (this.bufferRecycler == null) {
                this.bufferRecycler = requestBuffer.getRecycler();
            }
        }
    }

    private void flushDataBuffer() {
        if (this.currentDataBuffer == null || this.currentDataBuffer.isReleased() || !this.currentDataBuffer.hasRemaining()) {
            return;
        }
        this.currentDataBuffer.finish();
        while (true) {
            BufferWithSubpartition nextBuffer = this.currentDataBuffer.getNextBuffer(getFreeSegment());
            if (nextBuffer == null) {
                releaseFreeBuffers();
                this.currentDataBuffer.release();
                return;
            }
            flushBuffer(nextBuffer, (int) Math.ceil(this.currentDataBuffer.getRecordRemainingBytes() / this.bufferSizeBytes));
        }
    }

    private void flushCurrentDataBuffer() {
        if (this.currentDataBuffer != null) {
            flushDataBuffer();
            this.currentDataBuffer = null;
        }
    }

    private void writeLargeRecord(ByteBuffer byteBuffer, int i, Buffer.DataType dataType) {
        Preconditions.checkState(dataType != Buffer.DataType.EVENT_BUFFER);
        while (byteBuffer.hasRemaining()) {
            int min = Math.min(byteBuffer.remaining(), this.bufferSizeBytes);
            MemorySegment memorySegment = requestBuffer().getMemorySegment();
            memorySegment.put(0, byteBuffer, min);
            int ceil = (int) Math.ceil(byteBuffer.remaining() / this.bufferSizeBytes);
            if (ceil == 0) {
                dataType = Buffer.DataType.DATA_BUFFER_WITH_CLEAR_END;
            }
            flushBuffer(new BufferWithSubpartition(new NetworkBuffer(memorySegment, (BufferRecycler) Preconditions.checkNotNull(this.bufferRecycler), dataType, min), i), ceil);
        }
        releaseFreeBuffers();
    }

    private MemorySegment getFreeSegment() {
        MemorySegment poll = this.freeSegments.poll();
        if (poll == null) {
            poll = requestBuffer().getMemorySegment();
        }
        return poll;
    }

    private void flushBuffer(BufferWithSubpartition bufferWithSubpartition, int i) {
        ((TriConsumer) Preconditions.checkNotNull(this.accumulatedBufferFlusher)).accept(new TieredStorageSubpartitionId(bufferWithSubpartition.getSubpartitionIndex()), bufferWithSubpartition.getBuffer(), Integer.valueOf(i));
    }

    private Buffer requestBuffer() {
        BufferBuilder requestBufferBlocking = this.memoryManager.requestBufferBlocking(this);
        BufferConsumer createBufferConsumerFromBeginning = requestBufferBlocking.createBufferConsumerFromBeginning();
        Buffer build = createBufferConsumerFromBeginning.build();
        requestBufferBlocking.close();
        createBufferConsumerFromBeginning.close();
        return build;
    }

    private void releaseFreeBuffers() {
        this.freeSegments.forEach(this::recycleBuffer);
        this.freeSegments.clear();
    }

    private void recycleBuffer(MemorySegment memorySegment) {
        ((BufferRecycler) Preconditions.checkNotNull(this.bufferRecycler)).recycle(memorySegment);
    }
}
