package org.apache.flink.runtime.iterative.io;

import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memorymanager.AbstractPagedInputView;
import org.apache.flink.runtime.memorymanager.AbstractPagedOutputView;

/* loaded from: input_file:org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.class */
public class SerializedUpdateBuffer extends AbstractPagedOutputView {
    private static final int HEADER_LENGTH = 4;
    private static final float SPILL_THRESHOLD = 0.95f;
    private final LinkedBlockingQueue<MemorySegment> emptyBuffers;
    private ArrayDeque<MemorySegment> fullBuffers;
    private BlockChannelWriter currentWriter;
    private final IOManager ioManager;
    private final FileIOChannel.Enumerator channelEnumerator;
    private final int numSegmentsSpillingThreshold;
    private int numBuffersSpilled;
    private final int minBuffersForWriteEnd;
    private final int minBuffersForSpilledReadEnd;
    private final List<ReadEnd> readEnds;
    private final int totalNumBuffers;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer$ReadEnd.class */
    public static final class ReadEnd extends AbstractPagedInputView {
        private final LinkedBlockingQueue<MemorySegment> emptyBufferTarget;
        private final Deque<MemorySegment> fullBufferSource;
        private final BlockChannelReader spilledBufferSource;
        private int spilledBuffersRemaining;
        private int requestsRemaining;

        private ReadEnd(MemorySegment memorySegment, LinkedBlockingQueue<MemorySegment> linkedBlockingQueue, Deque<MemorySegment> deque, BlockChannelReader blockChannelReader, List<MemorySegment> list, int i) throws IOException {
            super(memorySegment, memorySegment.getInt(0), SerializedUpdateBuffer.HEADER_LENGTH);
            this.emptyBufferTarget = linkedBlockingQueue;
            this.fullBufferSource = deque;
            this.spilledBufferSource = blockChannelReader;
            this.requestsRemaining = i;
            this.spilledBuffersRemaining = i;
            while (this.requestsRemaining > 0 && list.size() > 0) {
                this.spilledBufferSource.readBlock(list.remove(list.size() - 1));
                this.requestsRemaining--;
            }
        }

        @Override // org.apache.flink.runtime.memorymanager.AbstractPagedInputView
        protected MemorySegment nextSegment(MemorySegment memorySegment) throws IOException {
            if (this.requestsRemaining > 0) {
                this.requestsRemaining--;
                this.spilledBufferSource.readBlock(memorySegment);
            } else {
                this.emptyBufferTarget.add(memorySegment);
            }
            if (this.spilledBuffersRemaining > 0) {
                this.spilledBuffersRemaining--;
                try {
                    return this.spilledBufferSource.getReturnQueue().take();
                } catch (InterruptedException e) {
                    throw new RuntimeException("Read End was interrupted while waiting for spilled buffer.", e);
                }
            }
            if (this.fullBufferSource.size() > 0) {
                return this.fullBufferSource.removeFirst();
            }
            clear();
            if (this.spilledBufferSource != null) {
                this.spilledBufferSource.closeAndDelete();
            }
            throw new EOFException();
        }

        @Override // org.apache.flink.runtime.memorymanager.AbstractPagedInputView
        protected int getLimitForSegment(MemorySegment memorySegment) {
            return memorySegment.getInt(0);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean disposeIfDone() {
            if (!this.fullBufferSource.isEmpty() || this.spilledBuffersRemaining != 0) {
                return false;
            }
            if (getCurrentSegment() != null && getCurrentPositionInSegment() < getCurrentSegmentLimit()) {
                return false;
            }
            if (getCurrentSegment() != null) {
                this.emptyBufferTarget.add(getCurrentSegment());
                clear();
            }
            if (this.spilledBufferSource == null) {
                return true;
            }
            try {
                this.spilledBufferSource.closeAndDelete();
                return true;
            } catch (Throwable th) {
                return true;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void forceDispose(List<MemorySegment> list) throws InterruptedException {
            MemorySegment currentSegment = getCurrentSegment();
            clear();
            if (currentSegment != null) {
                list.add(currentSegment);
            }
            list.addAll(this.fullBufferSource);
            for (int i = this.spilledBuffersRemaining - this.requestsRemaining; i > 0; i--) {
                list.add(this.emptyBufferTarget.take());
            }
            if (this.spilledBufferSource != null) {
                try {
                    this.spilledBufferSource.closeAndDelete();
                } catch (Throwable th) {
                }
            }
        }
    }

    public SerializedUpdateBuffer() {
        super(-1, HEADER_LENGTH);
        this.emptyBuffers = null;
        this.fullBuffers = null;
        this.ioManager = null;
        this.channelEnumerator = null;
        this.numSegmentsSpillingThreshold = -1;
        this.minBuffersForWriteEnd = -1;
        this.minBuffersForSpilledReadEnd = -1;
        this.totalNumBuffers = -1;
        this.readEnds = Collections.emptyList();
    }

    public SerializedUpdateBuffer(List<MemorySegment> list, int i, IOManager iOManager) {
        super(list.remove(list.size() - 1), i, HEADER_LENGTH);
        this.totalNumBuffers = list.size() + 1;
        if (this.totalNumBuffers < 3) {
            throw new IllegalArgumentException("SerializedUpdateBuffer needs at least 3 memory segments.");
        }
        this.emptyBuffers = new LinkedBlockingQueue<>(this.totalNumBuffers);
        this.fullBuffers = new ArrayDeque<>(64);
        this.emptyBuffers.addAll(list);
        int i2 = (int) (0.050000012f * this.totalNumBuffers);
        this.numSegmentsSpillingThreshold = i2 > 0 ? i2 : 0;
        this.minBuffersForWriteEnd = Math.max(2, Math.min(16, this.totalNumBuffers / 2));
        this.minBuffersForSpilledReadEnd = Math.max(1, Math.min(16, this.totalNumBuffers / HEADER_LENGTH));
        if (this.minBuffersForSpilledReadEnd + this.minBuffersForWriteEnd > this.totalNumBuffers) {
            throw new IllegalArgumentException("BUG: Unfulfillable memory assignment.");
        }
        this.ioManager = iOManager;
        this.channelEnumerator = iOManager.createChannelEnumerator();
        this.readEnds = new ArrayList();
    }

    @Override // org.apache.flink.runtime.memorymanager.AbstractPagedOutputView
    protected MemorySegment nextSegment(MemorySegment memorySegment, int i) throws IOException {
        memorySegment.putInt(0, i);
        if (this.emptyBuffers.size() > this.numSegmentsSpillingThreshold) {
            this.fullBuffers.addLast(memorySegment);
        } else {
            if (this.currentWriter == null) {
                this.currentWriter = this.ioManager.createBlockChannelWriter(this.channelEnumerator.next(), this.emptyBuffers);
            }
            this.numBuffersSpilled += this.fullBuffers.size();
            while (this.fullBuffers.size() > 0) {
                this.currentWriter.writeBlock(this.fullBuffers.removeFirst());
            }
            this.currentWriter.writeBlock(memorySegment);
            this.numBuffersSpilled++;
        }
        try {
            return this.emptyBuffers.take();
        } catch (InterruptedException e) {
            throw new RuntimeException("Spilling Fifo Queue was interrupted while waiting for next buffer.");
        }
    }

    public void flush() throws IOException {
        advance();
    }

    public ReadEnd switchBuffers() throws IOException {
        ReadEnd readEnd;
        for (int size = this.readEnds.size() - 1; size >= 0; size--) {
            if (this.readEnds.get(size).disposeIfDone()) {
                this.readEnds.remove(size);
            }
        }
        MemorySegment currentSegment = getCurrentSegment();
        currentSegment.putInt(0, getCurrentPositionInSegment());
        this.fullBuffers.addLast(currentSegment);
        if (this.numBuffersSpilled != 0 || this.emptyBuffers.size() < this.minBuffersForWriteEnd) {
            int min = Math.min((this.minBuffersForSpilledReadEnd + this.minBuffersForWriteEnd) - this.emptyBuffers.size(), this.fullBuffers.size());
            if (min > 0) {
                if (this.currentWriter == null) {
                    this.currentWriter = this.ioManager.createBlockChannelWriter(this.channelEnumerator.next(), this.emptyBuffers);
                }
                for (int i = 0; i < min; i++) {
                    this.currentWriter.writeBlock(this.fullBuffers.removeFirst());
                }
                this.numBuffersSpilled += min;
            }
            this.currentWriter.close();
            BlockChannelReader createBlockChannelReader = this.ioManager.createBlockChannelReader(this.currentWriter.getChannelID());
            ArrayList arrayList = new ArrayList();
            while (arrayList.size() < this.minBuffersForSpilledReadEnd) {
                try {
                    arrayList.add(this.emptyBuffers.take());
                } catch (InterruptedException e) {
                    throw new RuntimeException("SerializedUpdateBuffer was interrupted while reclaiming memory by spilling.", e);
                }
            }
            createBlockChannelReader.readBlock((MemorySegment) arrayList.remove(arrayList.size() - 1));
            readEnd = new ReadEnd(createBlockChannelReader.getReturnQueue().take(), this.emptyBuffers, this.fullBuffers, createBlockChannelReader, arrayList, this.numBuffersSpilled - 1);
        } else {
            readEnd = new ReadEnd(this.fullBuffers.removeFirst(), this.emptyBuffers, this.fullBuffers, null, null, 0);
        }
        this.fullBuffers = new ArrayDeque<>(64);
        this.currentWriter = null;
        this.numBuffersSpilled = 0;
        try {
            seekOutput(this.emptyBuffers.take(), HEADER_LENGTH);
            this.readEnds.add(readEnd);
            return readEnd;
        } catch (InterruptedException e2) {
            throw new RuntimeException("SerializedUpdateBuffer was interrupted while reclaiming memory by spilling.", e2);
        }
    }

    public List<MemorySegment> close() {
        if (this.currentWriter != null) {
            try {
                this.currentWriter.closeAndDelete();
            } catch (Throwable th) {
            }
        }
        ArrayList arrayList = new ArrayList(64);
        arrayList.add(getCurrentSegment());
        clear();
        arrayList.addAll(this.fullBuffers);
        this.fullBuffers = null;
        try {
            for (int size = this.readEnds.size() - 1; size >= 0; size--) {
                this.readEnds.remove(size).forceDispose(arrayList);
            }
            while (arrayList.size() < this.totalNumBuffers) {
                arrayList.add(this.emptyBuffers.take());
            }
            return arrayList;
        } catch (InterruptedException e) {
            throw new RuntimeException("Retrieving memory back from asynchronous I/O was interrupted.", e);
        }
    }
}
