/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.disk.iomanager;

import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.BulkBlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.Channel;
import org.apache.flink.runtime.io.disk.iomanager.ReadRequest;
import org.apache.flink.runtime.io.disk.iomanager.RequestQueue;
import org.apache.flink.runtime.io.disk.iomanager.WriteRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IOManager
implements Thread.UncaughtExceptionHandler {
    private static final Logger LOG = LoggerFactory.getLogger(IOManager.class);
    private final String[] paths;
    private final Random random;
    private final WriterThread[] writers;
    private final ReaderThread[] readers;
    private volatile int nextPath;
    private volatile boolean isClosed = false;

    public IOManager() {
        this(System.getProperty("java.io.tmpdir"));
    }

    public IOManager(String tempDir) {
        this(new String[]{tempDir});
    }

    public IOManager(String[] paths) {
        Thread t;
        int i;
        this.paths = paths;
        this.random = new Random();
        this.nextPath = 0;
        this.writers = new WriterThread[paths.length];
        for (i = 0; i < this.writers.length; ++i) {
            t = new WriterThread();
            this.writers[i] = t;
            t.setName("IOManager writer thread #" + (i + 1));
            t.setDaemon(true);
            t.setUncaughtExceptionHandler(this);
            t.start();
        }
        this.readers = new ReaderThread[paths.length];
        for (i = 0; i < this.readers.length; ++i) {
            t = new ReaderThread();
            this.readers[i] = t;
            t.setName("IOManager reader thread #" + (i + 1));
            t.setDaemon(true);
            t.setUncaughtExceptionHandler(this);
            t.start();
        }
    }

    public final synchronized void shutdown() {
        if (!this.isClosed) {
            int i;
            this.isClosed = true;
            for (i = 0; i < this.readers.length; ++i) {
                try {
                    this.writers[i].shutdown();
                    continue;
                }
                catch (Throwable t) {
                    LOG.error("Error while shutting down IO Manager writer thread.", t);
                }
            }
            for (i = 0; i < this.readers.length; ++i) {
                try {
                    this.readers[i].shutdown();
                    continue;
                }
                catch (Throwable t) {
                    LOG.error("Error while shutting down IO Manager reader thread.", t);
                }
            }
            try {
                for (i = 0; i < this.readers.length; ++i) {
                    this.writers[i].join();
                }
                for (i = 0; i < this.readers.length; ++i) {
                    this.readers[i].join();
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    public final boolean isProperlyShutDown() {
        boolean readersShutDown = true;
        for (int i = 0; i < this.readers.length; ++i) {
            readersShutDown &= this.readers[i].getState() == Thread.State.TERMINATED;
        }
        boolean writersShutDown = true;
        for (int i = 0; i < this.writers.length; ++i) {
            readersShutDown &= this.writers[i].getState() == Thread.State.TERMINATED;
        }
        return this.isClosed && writersShutDown && readersShutDown;
    }

    @Override
    public void uncaughtException(Thread t, Throwable e) {
        LOG.error("IO Thread '" + t.getName() + "' terminated due to an exception. Closing I/O Manager.", e);
        this.shutdown();
    }

    public Channel.ID createChannel() {
        int num = this.getNextPathNum();
        return new Channel.ID(this.paths[num], num, this.random);
    }

    public Channel.Enumerator createChannelEnumerator() {
        return new Channel.Enumerator(this.paths, this.random);
    }

    public BlockChannelWriter createBlockChannelWriter(Channel.ID channelID, LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException {
        if (this.isClosed) {
            throw new IllegalStateException("I/O-Manger is closed.");
        }
        return new BlockChannelWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue, returnQueue, 1);
    }

    public BlockChannelWriter createBlockChannelWriter(Channel.ID channelID, LinkedBlockingQueue<MemorySegment> returnQueue, int numRequestsToCombine) throws IOException {
        if (this.isClosed) {
            throw new IllegalStateException("I/O-Manger is closed.");
        }
        return new BlockChannelWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue, returnQueue, numRequestsToCombine);
    }

    public BlockChannelWriter createBlockChannelWriter(Channel.ID channelID) throws IOException {
        if (this.isClosed) {
            throw new IllegalStateException("I/O-Manger is closed.");
        }
        return new BlockChannelWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue, new LinkedBlockingQueue<MemorySegment>(), 1);
    }

    public BlockChannelWriter createBlockChannelWriter(Channel.ID channelID, int numRequestsToCombine) throws IOException {
        if (this.isClosed) {
            throw new IllegalStateException("I/O-Manger is closed.");
        }
        return new BlockChannelWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue, new LinkedBlockingQueue<MemorySegment>(), numRequestsToCombine);
    }

    public BlockChannelReader createBlockChannelReader(Channel.ID channelID, LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException {
        if (this.isClosed) {
            throw new IllegalStateException("I/O-Manger is closed.");
        }
        return new BlockChannelReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, returnQueue, 1);
    }

    public BlockChannelReader createBlockChannelReader(Channel.ID channelID, LinkedBlockingQueue<MemorySegment> returnQueue, int numRequestsToCombine) throws IOException {
        if (this.isClosed) {
            throw new IllegalStateException("I/O-Manger is closed.");
        }
        return new BlockChannelReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, returnQueue, numRequestsToCombine);
    }

    public BlockChannelReader createBlockChannelReader(Channel.ID channelID) throws IOException {
        if (this.isClosed) {
            throw new IllegalStateException("I/O-Manger is closed.");
        }
        return new BlockChannelReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, new LinkedBlockingQueue<MemorySegment>(), 1);
    }

    public BlockChannelReader createBlockChannelReader(Channel.ID channelID, int numRequestsToCombine) throws IOException {
        if (this.isClosed) {
            throw new IllegalStateException("I/O-Manger is closed.");
        }
        return new BlockChannelReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, new LinkedBlockingQueue<MemorySegment>(), numRequestsToCombine);
    }

    public BulkBlockChannelReader createBulkBlockChannelReader(Channel.ID channelID, List<MemorySegment> targetSegments, int numBlocks) throws IOException {
        if (this.isClosed) {
            throw new IllegalStateException("I/O-Manger is closed.");
        }
        return new BulkBlockChannelReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, targetSegments, numBlocks);
    }

    private final int getNextPathNum() {
        int next = this.nextPath;
        int newNext = next + 1;
        this.nextPath = newNext >= this.paths.length ? 0 : newNext;
        return next;
    }

    private static final class WriterThread
    extends Thread {
        protected final RequestQueue<WriteRequest> requestQueue = new RequestQueue();
        private volatile boolean alive = true;

        protected WriterThread() {
        }

        protected void shutdown() {
            if (this.alive) {
                try {
                    this.alive = false;
                    this.requestQueue.close();
                    this.interrupt();
                }
                catch (Throwable t) {
                    // empty catch block
                }
                IOException ioex = new IOException("Writer thread has been closed.");
                while (!this.requestQueue.isEmpty()) {
                    WriteRequest request = (WriteRequest)this.requestQueue.poll();
                    request.requestDone(ioex);
                }
            }
        }

        @Override
        public void run() {
            while (this.alive) {
                WriteRequest request = null;
                while (request == null) {
                    try {
                        request = (WriteRequest)this.requestQueue.take();
                    }
                    catch (InterruptedException iex) {
                        if (this.alive) continue;
                        return;
                    }
                }
                IOException ioex = null;
                try {
                    request.write();
                }
                catch (IOException e) {
                    ioex = e;
                }
                catch (Throwable t) {
                    ioex = new IOException("The buffer could not be written: " + t.getMessage(), t);
                    LOG.error("I/O reading thread encountered an error" + t.getMessage() == null ? "." : ": ", t);
                }
                request.requestDone(ioex);
            }
        }
    }

    private static final class ReaderThread
    extends Thread {
        protected final RequestQueue<ReadRequest> requestQueue = new RequestQueue();
        private volatile boolean alive = true;

        protected ReaderThread() {
        }

        protected void shutdown() {
            if (this.alive) {
                try {
                    this.alive = false;
                    this.requestQueue.close();
                    this.interrupt();
                }
                catch (Throwable t) {
                    // empty catch block
                }
            }
            IOException ioex = new IOException("IO-Manager has been closed.");
            while (!this.requestQueue.isEmpty()) {
                ReadRequest request = (ReadRequest)this.requestQueue.poll();
                request.requestDone(ioex);
            }
        }

        @Override
        public void run() {
            while (this.alive) {
                ReadRequest request = null;
                while (request == null) {
                    try {
                        request = (ReadRequest)this.requestQueue.take();
                    }
                    catch (InterruptedException iex) {
                        if (this.alive) continue;
                        return;
                    }
                }
                IOException ioex = null;
                try {
                    request.read();
                }
                catch (IOException e) {
                    ioex = e;
                }
                catch (Throwable t) {
                    ioex = new IOException("The buffer could not be read: " + t.getMessage(), t);
                    LOG.error("I/O reading thread encountered an error" + t.getMessage() == null ? "." : ": ", t);
                }
                request.requestDone(ioex);
            }
        }
    }
}

