/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.PartitionedFile;
import org.apache.flink.runtime.io.network.partition.PartitionedFileReader;
import org.apache.flink.runtime.io.network.partition.SortMergeSubpartitionReader;
import org.apache.flink.runtime.util.FatalExitExceptionHandler;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SortMergeResultPartitionReadScheduler
implements Runnable,
BufferRecycler {
    private static final Logger LOG = LoggerFactory.getLogger(SortMergeResultPartitionReadScheduler.class);
    private static final Duration BUFFER_REQUEST_TIMEOUT = Duration.ofMinutes(5L);
    private final Object lock;
    private final CompletableFuture<?> releaseFuture = new CompletableFuture();
    private final BatchShuffleReadBufferPool bufferPool;
    private final Executor ioExecutor;
    private final int maxRequestedBuffers;
    @GuardedBy(value="lock")
    private final Set<SortMergeSubpartitionReader> failedReaders = new HashSet<SortMergeSubpartitionReader>();
    @GuardedBy(value="lock")
    private final Set<SortMergeSubpartitionReader> allReaders = new HashSet<SortMergeSubpartitionReader>();
    @GuardedBy(value="lock")
    private FileChannel dataFileChannel;
    @GuardedBy(value="lock")
    private FileChannel indexFileChannel;
    @GuardedBy(value="lock")
    private boolean isRunning;
    @GuardedBy(value="lock")
    private volatile int numRequestedBuffers;
    @GuardedBy(value="lock")
    private volatile boolean isReleased;

    SortMergeResultPartitionReadScheduler(BatchShuffleReadBufferPool bufferPool, Executor ioExecutor, Object lock) {
        this.lock = Preconditions.checkNotNull((Object)lock);
        this.bufferPool = (BatchShuffleReadBufferPool)Preconditions.checkNotNull((Object)bufferPool);
        this.ioExecutor = (Executor)Preconditions.checkNotNull((Object)ioExecutor);
        this.maxRequestedBuffers = Math.max(1, 4 * bufferPool.getNumBuffersPerRequest());
        bufferPool.initialize();
    }

    @Override
    public synchronized void run() {
        Queue<SortMergeSubpartitionReader> availableReaders = this.getAvailableReaders();
        Queue<MemorySegment> buffers = this.allocateBuffers(availableReaders);
        int numBuffersAllocated = buffers.size();
        Set<SortMergeSubpartitionReader> finishedReaders = this.readData(availableReaders, buffers);
        int numBuffersRead = numBuffersAllocated - buffers.size();
        this.releaseBuffers(buffers);
        this.removeFinishedAndFailedReaders(numBuffersRead, finishedReaders);
    }

    private Queue<MemorySegment> allocateBuffers(Queue<SortMergeSubpartitionReader> availableReaders) {
        if (availableReaders.isEmpty()) {
            return new ArrayDeque<MemorySegment>();
        }
        try {
            Deadline deadline = Deadline.fromNow((Duration)BUFFER_REQUEST_TIMEOUT);
            while (deadline.hasTimeLeft()) {
                List<MemorySegment> buffers = this.bufferPool.requestBuffers();
                if (!buffers.isEmpty()) {
                    return new ArrayDeque<MemorySegment>(buffers);
                }
                Preconditions.checkState((!this.isReleased ? 1 : 0) != 0, (Object)"Result partition has been already released.");
            }
            if (this.numRequestedBuffers <= 0) {
                throw new TimeoutException(String.format("Buffer request timeout, this means there is a fierce contention of the batch shuffle read memory, please increase '%s'.", TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key()));
            }
        }
        catch (Throwable throwable) {
            this.failSubpartitionReaders(availableReaders, throwable);
            LOG.error("Failed to request buffers for data reading.", throwable);
        }
        return new ArrayDeque<MemorySegment>();
    }

    private void releaseBuffers(Queue<MemorySegment> buffers) {
        if (!buffers.isEmpty()) {
            try {
                this.bufferPool.recycle(buffers);
                buffers.clear();
            }
            catch (Throwable throwable) {
                FatalExitExceptionHandler.INSTANCE.uncaughtException(Thread.currentThread(), throwable);
            }
        }
    }

    private Set<SortMergeSubpartitionReader> readData(Queue<SortMergeSubpartitionReader> availableReaders, Queue<MemorySegment> buffers) {
        HashSet<SortMergeSubpartitionReader> finishedReaders = new HashSet<SortMergeSubpartitionReader>();
        while (!availableReaders.isEmpty() && !buffers.isEmpty()) {
            SortMergeSubpartitionReader subpartitionReader = availableReaders.poll();
            try {
                if (subpartitionReader.readBuffers(buffers, this)) continue;
                finishedReaders.add(subpartitionReader);
            }
            catch (Throwable throwable) {
                this.failSubpartitionReaders(Collections.singletonList(subpartitionReader), throwable);
                LOG.debug("Failed to read shuffle data.", throwable);
            }
        }
        return finishedReaders;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void failSubpartitionReaders(Collection<SortMergeSubpartitionReader> readers, Throwable failureCause) {
        Iterator<SortMergeSubpartitionReader> iterator = this.lock;
        synchronized (iterator) {
            this.failedReaders.addAll(readers);
        }
        for (SortMergeSubpartitionReader reader : readers) {
            try {
                reader.fail(failureCause);
            }
            catch (Throwable throwable) {
                FatalExitExceptionHandler.INSTANCE.uncaughtException(Thread.currentThread(), throwable);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeFinishedAndFailedReaders(int numBuffersRead, Set<SortMergeSubpartitionReader> finishedReaders) {
        Object object = this.lock;
        synchronized (object) {
            for (SortMergeSubpartitionReader reader : finishedReaders) {
                this.allReaders.remove(reader);
            }
            finishedReaders.clear();
            for (SortMergeSubpartitionReader reader : this.failedReaders) {
                this.allReaders.remove(reader);
            }
            this.failedReaders.clear();
            if (this.allReaders.isEmpty()) {
                this.closeFileChannels();
            }
            this.numRequestedBuffers += numBuffersRead;
            this.isRunning = false;
            this.mayTriggerReading();
            this.mayNotifyReleased();
        }
    }

    private void mayNotifyReleased() {
        assert (Thread.holdsLock(this.lock));
        if (this.isReleased && this.allReaders.isEmpty()) {
            this.releaseFuture.complete(null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Queue<SortMergeSubpartitionReader> getAvailableReaders() {
        Object object = this.lock;
        synchronized (object) {
            if (this.isReleased) {
                return new ArrayDeque<SortMergeSubpartitionReader>();
            }
            return new PriorityQueue<SortMergeSubpartitionReader>(this.allReaders);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    SortMergeSubpartitionReader crateSubpartitionReader(BufferAvailabilityListener availabilityListener, int targetSubpartition, PartitionedFile resultFile) throws IOException {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((!this.isReleased ? 1 : 0) != 0, (Object)"Partition is already released.");
            PartitionedFileReader fileReader = this.createFileReader(resultFile, targetSubpartition);
            SortMergeSubpartitionReader subpartitionReader = new SortMergeSubpartitionReader(availabilityListener, fileReader);
            this.allReaders.add(subpartitionReader);
            subpartitionReader.getReleaseFuture().thenRun(() -> this.releaseSubpartitionReader(subpartitionReader));
            this.mayTriggerReading();
            return subpartitionReader;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void releaseSubpartitionReader(SortMergeSubpartitionReader subpartitionReader) {
        Object object = this.lock;
        synchronized (object) {
            if (this.allReaders.contains(subpartitionReader)) {
                this.failedReaders.add(subpartitionReader);
            }
        }
    }

    private PartitionedFileReader createFileReader(PartitionedFile resultFile, int targetSubpartition) throws IOException {
        assert (Thread.holdsLock(this.lock));
        try {
            if (this.allReaders.isEmpty()) {
                this.openFileChannels(resultFile);
            }
            return new PartitionedFileReader(resultFile, targetSubpartition, this.dataFileChannel, this.indexFileChannel);
        }
        catch (Throwable throwable) {
            if (this.allReaders.isEmpty()) {
                this.closeFileChannels();
            }
            throw throwable;
        }
    }

    private void openFileChannels(PartitionedFile resultFile) throws IOException {
        assert (Thread.holdsLock(this.lock));
        this.closeFileChannels();
        this.dataFileChannel = SortMergeResultPartitionReadScheduler.openFileChannel(resultFile.getDataFilePath());
        this.indexFileChannel = SortMergeResultPartitionReadScheduler.openFileChannel(resultFile.getIndexFilePath());
    }

    private void closeFileChannels() {
        assert (Thread.holdsLock(this.lock));
        IOUtils.closeAllQuietly((AutoCloseable[])new AutoCloseable[]{this.dataFileChannel, this.indexFileChannel});
        this.dataFileChannel = null;
        this.indexFileChannel = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void recycle(MemorySegment segment) {
        Object object = this.lock;
        synchronized (object) {
            this.bufferPool.recycle(segment);
            --this.numRequestedBuffers;
            this.mayTriggerReading();
        }
    }

    private void mayTriggerReading() {
        assert (Thread.holdsLock(this.lock));
        if (!this.isRunning && !this.allReaders.isEmpty() && this.numRequestedBuffers + this.bufferPool.getNumBuffersPerRequest() <= this.maxRequestedBuffers) {
            this.isRunning = true;
            this.ioExecutor.execute(this);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    CompletableFuture<?> release() {
        ArrayList<SortMergeSubpartitionReader> pendingReaders;
        Object object = this.lock;
        synchronized (object) {
            if (this.isReleased) {
                return this.releaseFuture;
            }
            this.isReleased = true;
            this.failedReaders.addAll(this.allReaders);
            pendingReaders = new ArrayList<SortMergeSubpartitionReader>(this.allReaders);
            this.mayNotifyReleased();
        }
        this.failSubpartitionReaders(pendingReaders, new IllegalStateException("Result partition has been already released."));
        return this.releaseFuture;
    }

    private static FileChannel openFileChannel(Path path) throws IOException {
        return FileChannel.open(path, StandardOpenOption.READ);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    int getNumPendingReaders() {
        Object object = this.lock;
        synchronized (object) {
            return this.allReaders.size();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    FileChannel getDataFileChannel() {
        Object object = this.lock;
        synchronized (object) {
            return this.dataFileChannel;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    FileChannel getIndexFileChannel() {
        Object object = this.lock;
        synchronized (object) {
            return this.indexFileChannel;
        }
    }

    @VisibleForTesting
    CompletableFuture<?> getReleaseFuture() {
        return this.releaseFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    boolean isRunning() {
        Object object = this.lock;
        synchronized (object) {
            return this.isRunning;
        }
    }
}

