/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.SpillableSubpartition;
import org.apache.flink.runtime.io.network.partition.SpilledSubpartitionView;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SpillableSubpartitionView
implements ResultSubpartitionView {
    private static final Logger LOG = LoggerFactory.getLogger(SpillableSubpartitionView.class);
    private final SpillableSubpartition parent;
    private final ArrayDeque<Buffer> buffers;
    private final IOManager ioManager;
    private final int memorySegmentSize;
    private final BufferAvailabilityListener listener;
    private final AtomicBoolean isReleased = new AtomicBoolean(false);
    private final long numBuffers;
    private Buffer nextBuffer;
    private volatile SpilledSubpartitionView spilledView;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    SpillableSubpartitionView(SpillableSubpartition parent, ArrayDeque<Buffer> buffers, IOManager ioManager, int memorySegmentSize, BufferAvailabilityListener listener) {
        this.parent = (SpillableSubpartition)Preconditions.checkNotNull((Object)parent);
        this.buffers = (ArrayDeque)Preconditions.checkNotNull(buffers);
        this.ioManager = (IOManager)Preconditions.checkNotNull((Object)ioManager);
        this.memorySegmentSize = memorySegmentSize;
        this.listener = (BufferAvailabilityListener)Preconditions.checkNotNull((Object)listener);
        ArrayDeque<Buffer> arrayDeque = buffers;
        synchronized (arrayDeque) {
            this.numBuffers = buffers.size();
            this.nextBuffer = buffers.poll();
        }
        if (this.nextBuffer != null) {
            listener.notifyBuffersAvailable(1L);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    int releaseMemory() throws IOException {
        ArrayDeque<Buffer> arrayDeque = this.buffers;
        synchronized (arrayDeque) {
            if (this.spilledView != null || this.nextBuffer == null) {
                return 0;
            }
            BufferFileWriter spillWriter = this.ioManager.createBufferFileWriter(this.ioManager.createChannel());
            long spilledBytes = 0L;
            int numBuffers = this.buffers.size();
            for (int i = 0; i < numBuffers; ++i) {
                Buffer buffer = this.buffers.remove();
                spilledBytes += (long)buffer.getSize();
                try {
                    spillWriter.writeBlock(buffer);
                    continue;
                }
                finally {
                    buffer.recycle();
                }
            }
            this.spilledView = new SpilledSubpartitionView(this.parent, this.memorySegmentSize, spillWriter, numBuffers, this.listener);
            LOG.debug("Spilling {} bytes for sub partition {} of {}.", new Object[]{spilledBytes, this.parent.index, this.parent.parent.getPartitionId()});
            return numBuffers;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Buffer getNextBuffer() throws IOException, InterruptedException {
        ArrayDeque<Buffer> arrayDeque = this.buffers;
        synchronized (arrayDeque) {
            if (this.isReleased.get()) {
                return null;
            }
            if (this.nextBuffer != null) {
                Buffer current = this.nextBuffer;
                this.nextBuffer = this.buffers.poll();
                if (this.nextBuffer != null) {
                    this.listener.notifyBuffersAvailable(1L);
                }
                return current;
            }
        }
        SpilledSubpartitionView spilled = this.spilledView;
        if (spilled != null) {
            return spilled.getNextBuffer();
        }
        throw new IllegalStateException("No in-memory buffers available, but also nothing spilled.");
    }

    @Override
    public void notifyBuffersAvailable(long buffers) throws IOException {
    }

    @Override
    public void releaseAllResources() throws IOException {
        SpilledSubpartitionView spilled;
        if (this.isReleased.compareAndSet(false, true) && (spilled = this.spilledView) != null) {
            spilled.releaseAllResources();
        }
    }

    @Override
    public void notifySubpartitionConsumed() throws IOException {
        SpilledSubpartitionView spilled = this.spilledView;
        if (spilled != null) {
            spilled.notifySubpartitionConsumed();
        } else {
            this.parent.onConsumedSubpartition();
        }
    }

    @Override
    public boolean isReleased() {
        SpilledSubpartitionView spilled = this.spilledView;
        if (spilled != null) {
            return spilled.isReleased();
        }
        return this.parent.isReleased() || this.isReleased.get();
    }

    @Override
    public Throwable getFailureCause() {
        SpilledSubpartitionView spilled = this.spilledView;
        if (spilled != null) {
            return spilled.getFailureCause();
        }
        return this.parent.getFailureCause();
    }

    public String toString() {
        boolean hasSpilled = this.spilledView != null;
        return String.format("SpillableSubpartitionView(index: %d, buffers: %d, spilled? %b) of ResultPartition %s", this.parent.index, this.numBuffers, hasSpilled, this.parent.parent.getPartitionId());
    }
}

