package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.Iterator;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/SpillableSubpartition.class */
public class SpillableSubpartition extends ResultSubpartition {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SpillableSubpartition.class);
    private final IOManager ioManager;
    private BufferFileWriter spillWriter;
    private boolean isFinished;
    private volatile boolean isReleased;
    private ResultSubpartitionView readView;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SpillableSubpartition(int i, ResultPartition resultPartition, IOManager iOManager) {
        super(i, resultPartition);
        this.ioManager = (IOManager) Preconditions.checkNotNull(iOManager);
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartition
    public synchronized boolean add(BufferConsumer bufferConsumer) throws IOException {
        return add(bufferConsumer, false);
    }

    private boolean add(BufferConsumer bufferConsumer, boolean z) throws IOException {
        Preconditions.checkNotNull(bufferConsumer);
        synchronized (this.buffers) {
            if (this.isFinished || this.isReleased) {
                bufferConsumer.close();
                return false;
            }
            this.buffers.add(bufferConsumer);
            updateStatistics(bufferConsumer);
            increaseBuffersInBacklog(bufferConsumer);
            if (this.spillWriter != null) {
                spillFinishedBufferConsumers(z);
            }
            return true;
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartition
    public void flush() {
        synchronized (this.buffers) {
            if (this.readView != null) {
                this.readView.notifyDataAvailable();
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartition
    public synchronized void finish() throws IOException {
        synchronized (this.buffers) {
            if (add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true)) {
                this.isFinished = true;
            }
            flush();
        }
        if (this.spillWriter != null) {
            this.spillWriter.close();
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartition
    public synchronized void release() throws IOException {
        synchronized (this.buffers) {
            if (this.isReleased) {
                return;
            }
            Iterator<BufferConsumer> it = this.buffers.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            this.buffers.clear();
            ResultSubpartitionView resultSubpartitionView = this.readView;
            if (resultSubpartitionView == null && this.spillWriter != null) {
                this.spillWriter.closeAndDelete();
            }
            this.isReleased = true;
            if (resultSubpartitionView != null) {
                resultSubpartitionView.releaseAllResources();
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartition
    public ResultSubpartitionView createReadView(BufferAvailabilityListener bufferAvailabilityListener) throws IOException {
        ResultSubpartitionView resultSubpartitionView;
        synchronized (this.buffers) {
            if (!this.isFinished) {
                throw new IllegalStateException("Subpartition has not been finished yet, but blocking subpartitions can only be consumed after they have been finished.");
            }
            if (this.readView != null) {
                throw new IllegalStateException("Subpartition is being or already has been consumed, but we currently allow subpartitions to only be consumed once.");
            }
            if (this.spillWriter != null) {
                this.readView = new SpilledSubpartitionView(this, this.parent.getBufferProvider().getMemorySegmentSize(), this.spillWriter, getTotalNumberOfBuffers(), bufferAvailabilityListener);
            } else {
                this.readView = new SpillableSubpartitionView(this, this.buffers, this.ioManager, this.parent.getBufferProvider().getMemorySegmentSize(), bufferAvailabilityListener);
            }
            resultSubpartitionView = this.readView;
        }
        return resultSubpartitionView;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartition
    public int releaseMemory() throws IOException {
        synchronized (this.buffers) {
            ResultSubpartitionView resultSubpartitionView = this.readView;
            if (resultSubpartitionView != null && resultSubpartitionView.getClass() == SpillableSubpartitionView.class) {
                return ((SpillableSubpartitionView) resultSubpartitionView).releaseMemory();
            }
            if (this.spillWriter != null) {
                return 0;
            }
            this.spillWriter = this.ioManager.createBufferFileWriter(this.ioManager.createChannel());
            int size = this.buffers.size();
            long spillFinishedBufferConsumers = spillFinishedBufferConsumers(this.isFinished);
            int size2 = size - this.buffers.size();
            LOG.debug("Spilling {} bytes ({} buffers} for sub partition {} of {}.", Long.valueOf(spillFinishedBufferConsumers), Integer.valueOf(size2), Integer.valueOf(this.index), this.parent.getPartitionId());
            return size2;
        }
    }

    @VisibleForTesting
    long spillFinishedBufferConsumers(boolean z) throws IOException {
        long j = 0;
        while (!this.buffers.isEmpty()) {
            BufferConsumer first = this.buffers.getFirst();
            Buffer build = first.build();
            updateStatistics(build);
            int size = build.getSize();
            j += size;
            if (!first.isFinished() && !z) {
                if (size > 0) {
                    this.spillWriter.writeBlock(build);
                    increaseBuffersInBacklog(first);
                } else {
                    build.recycleBuffer();
                }
                return j;
            }
            if (size > 0) {
                this.spillWriter.writeBlock(build);
            } else {
                decreaseBuffersInBacklog(build);
                build.recycleBuffer();
            }
            first.close();
            this.buffers.poll();
        }
        return j;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartition
    public boolean isReleased() {
        return this.isReleased;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartition
    public int unsynchronizedGetNumberOfQueuedBuffers() {
        return Math.max(this.buffers.size(), 0);
    }

    public String toString() {
        Object[] objArr = new Object[6];
        objArr[0] = Long.valueOf(getTotalNumberOfBuffers());
        objArr[1] = Long.valueOf(getTotalNumberOfBytes());
        objArr[2] = Integer.valueOf(getBuffersInBacklog());
        objArr[3] = Boolean.valueOf(this.isFinished);
        objArr[4] = Boolean.valueOf(this.readView != null);
        objArr[5] = Boolean.valueOf(this.spillWriter != null);
        return String.format("SpillableSubpartition [%d number of buffers (%d bytes),%d number of buffers in backlog, finished? %s, read view? %s, spilled? %s]", objArr);
    }
}
