package org.apache.hama.bsp.message.io;

import java.io.DataInput;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.BitSet;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/apache/hama/bsp/message/io/SpilledDataInputBuffer.class */
public class SpilledDataInputBuffer extends DataInputStream implements DataInput {
    private static final Log LOG = LogFactory.getLog(SpilledDataInputBuffer.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hama/bsp/message/io/SpilledDataInputBuffer$SpillReadThread.class */
    public static class SpillReadThread implements Callable<Boolean> {
        private String fileName;
        private List<SpilledByteBuffer> bufferList_;
        private long bytesToRead_;
        private long bytesWrittenInFile_;
        private SpilledDataReadStatus status_;
        private boolean closed_ = false;

        public SpillReadThread(String str, List<SpilledByteBuffer> list, SpilledDataReadStatus spilledDataReadStatus) {
            this.fileName = str;
            this.bufferList_ = list;
            this.status_ = spilledDataReadStatus;
        }

        private void keepReadingFromFile() throws IOException {
            int fileBufferIndex;
            FileChannel channel = new RandomAccessFile(this.fileName, "r").getChannel();
            this.bytesToRead_ = channel.size();
            this.bytesWrittenInFile_ = this.bytesToRead_;
            long j = 0;
            do {
                try {
                    fileBufferIndex = this.status_.getFileBufferIndex();
                    if (fileBufferIndex >= 0) {
                        SpilledByteBuffer spilledByteBuffer = this.bufferList_.get(fileBufferIndex);
                        spilledByteBuffer.clear();
                        Math.min(spilledByteBuffer.remaining(), this.bytesWrittenInFile_ - j);
                        long read = channel.read(spilledByteBuffer.getByteBuffer());
                        if (read >= 0) {
                            spilledByteBuffer.flip();
                            this.bytesToRead_ -= read;
                            j += read;
                            if (this.closed_ || this.bytesToRead_ <= 0 || fileBufferIndex < 0) {
                                break;
                            }
                        } else {
                            break;
                        }
                    } else {
                        break;
                    }
                } catch (InterruptedException e) {
                    throw new IOException(e);
                }
            } while (fileBufferIndex < this.bufferList_.size());
            channel.close();
            this.closed_ = true;
            this.status_.closedBySpiller();
        }

        public void completeRead() {
            this.closed_ = true;
        }

        public boolean isClosed() {
            return this.closed_;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Boolean call() throws Exception {
            try {
                keepReadingFromFile();
                return Boolean.TRUE;
            } catch (Exception e) {
                SpilledDataInputBuffer.LOG.error("Error reading from file: " + this.fileName, e);
                this.status_.notifyError();
                return Boolean.FALSE;
            }
        }
    }

    /* loaded from: input_file:org/apache/hama/bsp/message/io/SpilledDataInputBuffer$SpilledInputStream.class */
    static class SpilledInputStream extends InputStream {
        private String fileName_;
        private List<SpilledByteBuffer> bufferList_;
        private boolean spilledAlready_;
        ReadIndexStatus status_;
        private final byte[] readByte = new byte[1];
        private int count;
        private final byte[] buf;
        private int pos;
        private Callable<Boolean> spillReadThread_;
        private Future<Boolean> spillReadState_;
        private ExecutorService spillThreadService_;
        private SpilledByteBuffer currentReadBuffer_;
        private BitSet bufferBitState_;
        private boolean closed_;

        public SpilledInputStream(String str, boolean z, List<SpilledByteBuffer> list, boolean z2) throws IOException {
            this.fileName_ = str;
            this.bufferList_ = list;
            this.spilledAlready_ = z2;
            this.bufferBitState_ = new BitSet(list.size());
            if (this.spilledAlready_) {
                this.status_ = new SpilledDataReadStatus(list.size(), this.bufferBitState_);
            } else {
                this.status_ = new BufferReadStatus(list.size());
            }
            this.buf = new byte[8192];
            this.count = 0;
            this.pos = 0;
            this.closed_ = false;
        }

        public void prepareRead() throws IOException {
            if (this.spilledAlready_) {
                this.spillReadThread_ = new SpillReadThread(this.fileName_, this.bufferList_, (SpilledDataReadStatus) this.status_);
                this.spillThreadService_ = Executors.newFixedThreadPool(1);
                this.spillReadState_ = this.spillThreadService_.submit(this.spillReadThread_);
                if (!this.status_.startReading()) {
                    throw new IOException("Failed to read the spilled file: " + this.fileName_);
                }
            }
            try {
                this.currentReadBuffer_ = getNextBuffer();
                if (this.currentReadBuffer_ == null) {
                    try {
                        if (this.spilledAlready_) {
                            try {
                                try {
                                    this.spillReadState_.get();
                                    this.spillThreadService_.shutdownNow();
                                } catch (ExecutionException e) {
                                    throw new IOException(e);
                                }
                            } catch (InterruptedException e2) {
                                throw new IOException(e2);
                            }
                        }
                        throw new IOException("Could not initialize the buffer for reading");
                    } catch (Throwable th) {
                        this.spillThreadService_.shutdownNow();
                        throw th;
                    }
                }
            } catch (InterruptedException e3) {
                throw new IOException(e3);
            }
        }

        public SpilledByteBuffer getNextBuffer() throws InterruptedException {
            int readBufferIndex = this.status_.getReadBufferIndex();
            if (readBufferIndex < 0 || readBufferIndex >= this.bufferList_.size()) {
                return null;
            }
            return this.bufferList_.get(readBufferIndex);
        }

        @Override // java.io.InputStream
        public int read() throws IOException {
            if (this.count <= 0) {
                if (-1 == read(this.readByte, 0, 1)) {
                    return -1;
                }
                return this.readByte[0] & 255;
            }
            this.count--;
            byte[] bArr = this.buf;
            int i = this.pos;
            this.pos = i + 1;
            return bArr[i] & 255;
        }

        @Override // java.io.InputStream
        public int read(byte[] bArr, int i, int i2) throws IOException {
            if (this.count >= i2) {
                System.arraycopy(this.buf, this.pos, bArr, i, i2);
                this.count -= i2;
                this.pos += i2;
                return i2;
            }
            int i3 = 0;
            while (i2 > 0) {
                if (this.count == 0) {
                    this.count = readInternal(this.buf, 0, this.buf.length);
                    if (this.count == -1) {
                        return this.count;
                    }
                    this.pos = 0;
                }
                int min = Math.min(this.count, i2);
                System.arraycopy(this.buf, this.pos, bArr, i, min);
                i2 -= min;
                i += min;
                i3 += min;
                this.count -= min;
                this.pos += min;
            }
            return i3;
        }

        public int readInternal(byte[] bArr, int i, int i2) throws IOException {
            if (this.currentReadBuffer_ == null) {
                return -1;
            }
            int i3 = 0;
            while (true) {
                int i4 = i3;
                if (i2 <= 0) {
                    return i4;
                }
                int remaining = this.currentReadBuffer_.remaining();
                if (remaining == 0) {
                    try {
                        this.currentReadBuffer_ = getNextBuffer();
                        if (this.currentReadBuffer_ == null) {
                            return i4;
                        }
                        remaining = this.currentReadBuffer_.remaining();
                    } catch (InterruptedException e) {
                        throw new IOException(e);
                    }
                }
                int min = Math.min(remaining, i2);
                this.currentReadBuffer_.get(bArr, i, min);
                i2 -= min;
                i += remaining;
                i3 = i4 + min;
            }
        }

        public void clear() throws IOException {
            close();
            this.bufferBitState_.clear();
            if (this.spilledAlready_) {
                this.status_ = new SpilledDataReadStatus(this.bufferList_.size(), this.bufferBitState_);
            } else {
                this.status_ = new BufferReadStatus(this.bufferList_.size());
            }
            this.count = 0;
            this.pos = 0;
            this.closed_ = false;
            prepareRead();
        }

        @Override // java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            if (this.closed_) {
                return;
            }
            this.status_.completeReading();
            try {
                if (this.spilledAlready_) {
                    try {
                        try {
                            this.spillReadState_.get();
                            this.spillThreadService_.shutdownNow();
                        } catch (ExecutionException e) {
                            throw new IOException(e);
                        }
                    } catch (InterruptedException e2) {
                        throw new IOException(e2);
                    }
                }
            } catch (Throwable th) {
                this.spillThreadService_.shutdownNow();
                throw th;
            }
        }

        public String getFileName() {
            return this.fileName_;
        }
    }

    public void completeReading(boolean z) throws IOException {
        this.in.close();
        if (z) {
            File file = new File(((SpilledInputStream) this.in).getFileName());
            if (file.exists()) {
                file.delete();
            }
        }
    }

    public SpilledDataInputBuffer(InputStream inputStream) {
        super(inputStream);
    }

    public void clear() throws IOException {
        ((SpilledInputStream) this.in).clear();
    }

    public static SpilledDataInputBuffer getSpilledDataInputBuffer(String str, boolean z, List<SpilledByteBuffer> list) throws IOException {
        SpilledInputStream spilledInputStream = new SpilledInputStream(str, z, list, true);
        spilledInputStream.prepareRead();
        return new SpilledDataInputBuffer(spilledInputStream);
    }
}
