/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.SynchronousBufferFileReader;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.util.event.NotificationListener;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SpilledSubpartitionView
implements ResultSubpartitionView,
NotificationListener {
    private static final Logger LOG = LoggerFactory.getLogger(SpilledSubpartitionView.class);
    private final ResultSubpartition parent;
    private final BufferFileWriter spillWriter;
    private final BufferFileReader fileReader;
    private final SpillReadBufferPool bufferPool;
    private final BufferAvailabilityListener availabilityListener;
    private final long numberOfSpilledBuffers;
    private AtomicBoolean isReleased = new AtomicBoolean();
    private volatile boolean isSpillInProgress = true;

    SpilledSubpartitionView(ResultSubpartition parent, int memorySegmentSize, BufferFileWriter spillWriter, long numberOfSpilledBuffers, BufferAvailabilityListener availabilityListener) throws IOException {
        this.parent = (ResultSubpartition)Preconditions.checkNotNull((Object)parent);
        this.bufferPool = new SpillReadBufferPool(2, memorySegmentSize);
        this.spillWriter = (BufferFileWriter)Preconditions.checkNotNull((Object)spillWriter);
        this.fileReader = new SynchronousBufferFileReader(spillWriter.getChannelID(), false);
        Preconditions.checkArgument((numberOfSpilledBuffers >= 0L ? 1 : 0) != 0);
        this.numberOfSpilledBuffers = numberOfSpilledBuffers;
        this.availabilityListener = (BufferAvailabilityListener)Preconditions.checkNotNull((Object)availabilityListener);
        if (!spillWriter.registerAllRequestsProcessedListener(this)) {
            this.isSpillInProgress = false;
            availabilityListener.notifyBuffersAvailable(numberOfSpilledBuffers);
            LOG.debug("No spilling in progress. Notified about {} available buffers.", (Object)numberOfSpilledBuffers);
        } else {
            LOG.debug("Spilling in progress. Waiting with notification about {} available buffers.", (Object)numberOfSpilledBuffers);
        }
    }

    @Override
    public void onNotification() {
        this.isSpillInProgress = false;
        this.availabilityListener.notifyBuffersAvailable(this.numberOfSpilledBuffers);
        LOG.debug("Finished spilling. Notified about {} available buffers.", (Object)this.numberOfSpilledBuffers);
    }

    @Override
    public Buffer getNextBuffer() throws IOException, InterruptedException {
        if (this.fileReader.hasReachedEndOfFile() || this.isSpillInProgress) {
            return null;
        }
        Buffer buffer = this.bufferPool.requestBufferBlocking();
        this.fileReader.readInto(buffer);
        return buffer;
    }

    @Override
    public void notifyBuffersAvailable(long buffers) throws IOException {
    }

    @Override
    public void notifySubpartitionConsumed() throws IOException {
        this.parent.onConsumedSubpartition();
    }

    @Override
    public void releaseAllResources() throws IOException {
        if (this.isReleased.compareAndSet(false, true)) {
            this.spillWriter.closeAndDelete();
            this.fileReader.close();
            this.bufferPool.destroy();
        }
    }

    @Override
    public boolean isReleased() {
        return this.parent.isReleased() || this.isReleased.get();
    }

    @Override
    public Throwable getFailureCause() {
        return this.parent.getFailureCause();
    }

    public String toString() {
        return String.format("SpilledSubpartitionView(index: %d, buffers: %d) of ResultPartition %s", this.parent.index, this.numberOfSpilledBuffers, this.parent.parent.getPartitionId());
    }

    private static class SpillReadBufferPool
    implements BufferRecycler {
        private final Queue<Buffer> buffers;
        private boolean isDestroyed;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        SpillReadBufferPool(int numberOfBuffers, int memorySegmentSize) {
            Queue<Buffer> queue = this.buffers = new ArrayDeque<Buffer>(numberOfBuffers);
            synchronized (queue) {
                for (int i = 0; i < numberOfBuffers; ++i) {
                    this.buffers.add(new Buffer(MemorySegmentFactory.allocateUnpooledSegment((int)memorySegmentSize), this));
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void recycle(MemorySegment memorySegment) {
            Queue<Buffer> queue = this.buffers;
            synchronized (queue) {
                if (this.isDestroyed) {
                    memorySegment.free();
                } else {
                    this.buffers.add(new Buffer(memorySegment, this));
                    this.buffers.notifyAll();
                }
            }
        }

        private Buffer requestBufferBlocking() throws InterruptedException {
            Queue<Buffer> queue = this.buffers;
            synchronized (queue) {
                while (true) {
                    if (this.isDestroyed) {
                        return null;
                    }
                    Buffer buffer = this.buffers.poll();
                    if (buffer != null) {
                        return buffer;
                    }
                    this.buffers.wait();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void destroy() {
            Queue<Buffer> queue = this.buffers;
            synchronized (queue) {
                this.isDestroyed = true;
                this.buffers.notifyAll();
            }
        }
    }
}

