package alluxio.client.file;

import alluxio.client.AlluxioStorageType;
import alluxio.client.BoundedStream;
import alluxio.client.Seekable;
import alluxio.client.block.BlockInStream;
import alluxio.client.block.BufferedBlockOutStream;
import alluxio.client.block.LocalBlockInStream;
import alluxio.client.block.UnderStoreBlockInStream;
import alluxio.client.file.options.InStreamOptions;
import alluxio.client.file.policy.FileWriteLocationPolicy;
import alluxio.exception.AlluxioException;
import alluxio.exception.ExceptionMessage;
import alluxio.master.block.BlockId;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.InputStream;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:alluxio/client/file/FileInStream.class */
public class FileInStream extends InputStream implements BoundedStream, Seekable {
    private static final Logger LOG = LoggerFactory.getLogger("alluxio.logger.type");
    private final AlluxioStorageType mAlluxioStorageType;
    private final long mBlockSize;
    private final FileWriteLocationPolicy mLocationPolicy;
    private final long mFileLength;
    private final URIStatus mStatus;
    private static final String BLOCK_ID_NOT_CACHED = "The block with ID {} could not be cached into Alluxio storage.";
    private boolean mShouldCacheCurrentBlock;
    private long mPos;
    private BlockInStream mCurrentBlockInStream;
    private BufferedBlockOutStream mCurrentCacheStream;
    private final FileSystemContext mContext = FileSystemContext.INSTANCE;
    private boolean mClosed = false;

    public FileInStream(URIStatus uRIStatus, InStreamOptions inStreamOptions) {
        this.mStatus = uRIStatus;
        this.mBlockSize = uRIStatus.getBlockSizeBytes();
        this.mFileLength = uRIStatus.getLength();
        this.mAlluxioStorageType = inStreamOptions.getAlluxioStorageType();
        this.mShouldCacheCurrentBlock = this.mAlluxioStorageType.isStore();
        this.mLocationPolicy = inStreamOptions.getLocationPolicy();
        if (this.mShouldCacheCurrentBlock) {
            Preconditions.checkNotNull(inStreamOptions.getLocationPolicy(), "The location policy is not specified");
        }
    }

    @Override // java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.mClosed) {
            return;
        }
        if (this.mCurrentBlockInStream != null) {
            this.mCurrentBlockInStream.close();
        }
        closeCacheStream();
        this.mClosed = true;
    }

    @Override // java.io.InputStream
    public int read() throws IOException {
        if (this.mPos >= this.mFileLength) {
            return -1;
        }
        checkAndAdvanceBlockInStream();
        int read = this.mCurrentBlockInStream.read();
        this.mPos++;
        if (this.mShouldCacheCurrentBlock) {
            try {
                this.mCurrentCacheStream.write(read);
            } catch (IOException e) {
                LOG.warn(BLOCK_ID_NOT_CACHED, Long.valueOf(getCurrentBlockId()), e);
                this.mShouldCacheCurrentBlock = false;
            }
        }
        return read;
    }

    @Override // java.io.InputStream
    public int read(byte[] bArr) throws IOException {
        return read(bArr, 0, bArr.length);
    }

    @Override // java.io.InputStream
    public int read(byte[] bArr, int i, int i2) throws IOException {
        Preconditions.checkArgument(bArr != null, "Read buffer cannot be null");
        Preconditions.checkArgument(i >= 0 && i2 >= 0 && i2 + i <= bArr.length, "Buffer length: %s, offset: %s, len: %s", new Object[]{Integer.valueOf(bArr.length), Integer.valueOf(i), Integer.valueOf(i2)});
        if (i2 == 0) {
            return 0;
        }
        if (this.mPos >= this.mFileLength) {
            return -1;
        }
        int i3 = i;
        int i4 = i2;
        while (i4 > 0 && this.mPos < this.mFileLength) {
            checkAndAdvanceBlockInStream();
            int read = this.mCurrentBlockInStream.read(bArr, i3, (int) Math.min(i4, this.mCurrentBlockInStream.remaining()));
            if (read > 0 && this.mShouldCacheCurrentBlock) {
                try {
                    this.mCurrentCacheStream.write(bArr, i3, read);
                } catch (IOException e) {
                    LOG.warn(BLOCK_ID_NOT_CACHED, Long.valueOf(getCurrentBlockId()), e);
                    this.mShouldCacheCurrentBlock = false;
                }
            }
            if (read != -1) {
                this.mPos += read;
                i4 -= read;
                i3 += read;
            }
        }
        return i2 - i4;
    }

    @Override // alluxio.client.BoundedStream
    public long remaining() {
        return this.mFileLength - this.mPos;
    }

    @Override // alluxio.client.Seekable
    public void seek(long j) throws IOException {
        if (this.mPos == j) {
            return;
        }
        Preconditions.checkArgument(j >= 0, "Seek position is negative: %s", new Object[]{Long.valueOf(j)});
        Preconditions.checkArgument(j < this.mFileLength, "Seek position past end of file: %s", new Object[]{Long.valueOf(j)});
        seekBlockInStream(j);
        checkAndAdvanceBlockInStream();
        this.mCurrentBlockInStream.seek(this.mPos % this.mBlockSize);
    }

    @Override // java.io.InputStream
    public long skip(long j) throws IOException {
        if (j <= 0) {
            return 0L;
        }
        long min = Math.min(j, this.mFileLength - this.mPos);
        long j2 = this.mPos + min;
        long j3 = j2 / this.mBlockSize > this.mPos / this.mBlockSize ? j2 % this.mBlockSize : min;
        seekBlockInStream(j2);
        checkAndAdvanceBlockInStream();
        if (j3 != this.mCurrentBlockInStream.skip(j3)) {
            throw new IOException(ExceptionMessage.INSTREAM_CANNOT_SKIP.getMessage(new Object[]{Long.valueOf(min)}));
        }
        return min;
    }

    private void checkAndAdvanceBlockInStream() throws IOException {
        long currentBlockId = getCurrentBlockId();
        if (this.mCurrentBlockInStream == null || this.mCurrentBlockInStream.remaining() == 0) {
            closeCacheStream();
            updateBlockInStream(currentBlockId);
            if (this.mShouldCacheCurrentBlock) {
                try {
                    long currentBlockSize = getCurrentBlockSize();
                    this.mCurrentCacheStream = this.mContext.getAluxioBlockStore().getOutStream(currentBlockId, currentBlockSize, this.mLocationPolicy.getWorkerForNextBlock(this.mContext.getAluxioBlockStore().getWorkerInfoList(), currentBlockSize));
                } catch (AlluxioException e) {
                    LOG.warn(BLOCK_ID_NOT_CACHED, Long.valueOf(currentBlockId), e);
                    throw new IOException((Throwable) e);
                } catch (IOException e2) {
                    LOG.warn(BLOCK_ID_NOT_CACHED, Long.valueOf(currentBlockId), e2);
                    this.mShouldCacheCurrentBlock = false;
                }
            }
        }
    }

    private void closeCacheStream() throws IOException {
        if (this.mCurrentCacheStream == null) {
            return;
        }
        if (this.mCurrentCacheStream.remaining() == 0) {
            this.mCurrentCacheStream.close();
        } else {
            this.mCurrentCacheStream.cancel();
        }
        this.mShouldCacheCurrentBlock = false;
    }

    private long getCurrentBlockId() {
        if (this.mPos == this.mFileLength) {
            return -1L;
        }
        int i = (int) (this.mPos / this.mBlockSize);
        Preconditions.checkState(i < this.mStatus.getBlockIds().size(), "Current block index exceeds max index");
        return this.mStatus.getBlockIds().get(i).longValue();
    }

    private long getCurrentBlockSize() {
        long j = this.mFileLength % this.mBlockSize;
        return this.mFileLength - this.mPos > j ? this.mBlockSize : j;
    }

    private void seekBlockInStream(long j) throws IOException {
        long currentBlockId = getCurrentBlockId();
        this.mPos = j;
        closeCacheStream();
        long currentBlockId2 = getCurrentBlockId();
        if (currentBlockId != currentBlockId2) {
            updateBlockInStream(currentBlockId2);
            if (this.mPos % this.mBlockSize != 0 || !this.mShouldCacheCurrentBlock) {
                this.mShouldCacheCurrentBlock = false;
                return;
            }
            try {
                long currentBlockSize = getCurrentBlockSize();
                this.mCurrentCacheStream = this.mContext.getAluxioBlockStore().getOutStream(currentBlockId2, currentBlockSize, this.mLocationPolicy.getWorkerForNextBlock(this.mContext.getAluxioBlockStore().getWorkerInfoList(), currentBlockSize));
            } catch (IOException e) {
                LOG.warn(BLOCK_ID_NOT_CACHED, Long.valueOf(getCurrentBlockId()), e);
                this.mShouldCacheCurrentBlock = false;
            } catch (AlluxioException e2) {
                LOG.warn(BLOCK_ID_NOT_CACHED, Long.valueOf(currentBlockId2), e2);
                throw new IOException((Throwable) e2);
            }
        }
    }

    private void updateBlockInStream(long j) throws IOException {
        if (this.mCurrentBlockInStream != null) {
            this.mCurrentBlockInStream.close();
        }
        try {
            if (this.mAlluxioStorageType.isPromote()) {
                try {
                    this.mContext.getAluxioBlockStore().promote(j);
                } catch (IOException e) {
                    LOG.warn("Promotion of block with ID {} failed.", Long.valueOf(j), e);
                }
            }
            this.mCurrentBlockInStream = this.mContext.getAluxioBlockStore().getInStream(j);
            this.mShouldCacheCurrentBlock = !(this.mCurrentBlockInStream instanceof LocalBlockInStream) && this.mAlluxioStorageType.isStore();
        } catch (IOException e2) {
            LOG.debug("Failed to get BlockInStream for block with ID {}, using UFS instead. {}", Long.valueOf(j), e2);
            if (!this.mStatus.isPersisted()) {
                LOG.error("Could not obtain data for block with ID {} from Alluxio. The block will not be persisted in the under file storage.", Long.valueOf(j));
                throw e2;
            }
            this.mCurrentBlockInStream = new UnderStoreBlockInStream(BlockId.getSequenceNumber(j) * this.mBlockSize, this.mBlockSize, this.mStatus.getUfsPath());
            this.mShouldCacheCurrentBlock = this.mAlluxioStorageType.isStore();
        }
    }
}
