/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.PeerCache;
import org.apache.hadoop.hdfs.RemoteBlockReader2;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;

@Deprecated
@InterfaceAudience.Private
public class RemoteBlockReader
extends FSInputChecker
implements BlockReader {
    private final Peer peer;
    private final DatanodeID datanodeID;
    private final DataInputStream in;
    private DataChecksum checksum;
    private long lastChunkOffset = -1L;
    private long lastChunkLen = -1L;
    private long lastSeqNo = -1L;
    private long startOffset;
    private final long firstChunkOffset;
    private final int bytesPerChecksum;
    private final int checksumSize;
    private final long bytesNeededToFinish;
    private final boolean isLocal;
    private boolean eos = false;
    private boolean sentStatusCode = false;
    byte[] skipBuf = null;
    ByteBuffer checksumBytes = null;
    int dataLeft = 0;
    private final PeerCache peerCache;

    @Override
    public synchronized int read(byte[] buf, int off, int len) throws IOException {
        boolean eosBefore = this.eos;
        if (this.lastChunkLen < 0L && this.startOffset > this.firstChunkOffset && len > 0) {
            int toSkip = (int)(this.startOffset - this.firstChunkOffset);
            if (this.skipBuf == null) {
                this.skipBuf = new byte[this.bytesPerChecksum];
            }
            if (super.read(this.skipBuf, 0, toSkip) != toSkip) {
                throw new IOException("Could not skip required number of bytes");
            }
        }
        int nRead = super.read(buf, off, len);
        if (this.eos && !eosBefore && nRead >= 0) {
            if (this.needChecksum()) {
                this.sendReadResult(this.peer, DataTransferProtos.Status.CHECKSUM_OK);
            } else {
                this.sendReadResult(this.peer, DataTransferProtos.Status.SUCCESS);
            }
        }
        return nRead;
    }

    @Override
    public synchronized long skip(long n) throws IOException {
        long nSkipped;
        int ret;
        if (this.skipBuf == null) {
            this.skipBuf = new byte[this.bytesPerChecksum];
        }
        for (nSkipped = 0L; nSkipped < n; nSkipped += (long)ret) {
            int toSkip = (int)Math.min(n - nSkipped, (long)this.skipBuf.length);
            ret = this.read(this.skipBuf, 0, toSkip);
            if (ret > 0) continue;
            return nSkipped;
        }
        return nSkipped;
    }

    @Override
    public int read() throws IOException {
        throw new IOException("read() is not expected to be invoked. Use read(buf, off, len) instead.");
    }

    @Override
    public boolean seekToNewSource(long targetPos) throws IOException {
        return false;
    }

    @Override
    public void seek(long pos) throws IOException {
        throw new IOException("Seek() is not supported in BlockInputChecker");
    }

    @Override
    protected long getChunkPosition(long pos) {
        throw new RuntimeException("getChunkPosition() is not supported, since seek is not required");
    }

    private void adjustChecksumBytes(int dataLen) {
        int requiredSize = (dataLen + this.bytesPerChecksum - 1) / this.bytesPerChecksum * this.checksumSize;
        if (this.checksumBytes == null || requiredSize > this.checksumBytes.capacity()) {
            this.checksumBytes = ByteBuffer.wrap(new byte[requiredSize]);
        } else {
            this.checksumBytes.clear();
        }
        this.checksumBytes.limit(requiredSize);
    }

    @Override
    protected synchronized int readChunk(long pos, byte[] buf, int offset, int len, byte[] checksumBuf) throws IOException {
        int bytesToRead;
        int checksumsToRead;
        if (this.eos) {
            return -1;
        }
        long chunkOffset = this.lastChunkOffset;
        if (this.lastChunkLen > 0L) {
            chunkOffset += this.lastChunkLen;
        }
        if (pos + this.firstChunkOffset != chunkOffset) {
            throw new IOException("Mismatch in pos : " + pos + " + " + this.firstChunkOffset + " != " + chunkOffset);
        }
        if (this.dataLeft <= 0) {
            PacketHeader header = new PacketHeader();
            header.readFields(this.in);
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("DFSClient readChunk got header " + header));
            }
            if (!header.sanityCheck(this.lastSeqNo)) {
                throw new IOException("BlockReader: error in packet header " + header);
            }
            this.lastSeqNo = header.getSeqno();
            this.dataLeft = header.getDataLen();
            this.adjustChecksumBytes(header.getDataLen());
            if (header.getDataLen() > 0) {
                IOUtils.readFully(this.in, this.checksumBytes.array(), 0, this.checksumBytes.limit());
            }
        }
        assert (len >= this.bytesPerChecksum);
        assert (this.checksum != null);
        assert (this.checksumSize == 0 || checksumBuf.length % this.checksumSize == 0);
        if (this.checksumSize > 0) {
            int chunksLeft = (this.dataLeft - 1) / this.bytesPerChecksum + 1;
            int chunksCanFit = Math.min(len / this.bytesPerChecksum, checksumBuf.length / this.checksumSize);
            checksumsToRead = Math.min(chunksLeft, chunksCanFit);
            bytesToRead = Math.min(checksumsToRead * this.bytesPerChecksum, this.dataLeft);
        } else {
            bytesToRead = Math.min(this.dataLeft, len);
            checksumsToRead = 0;
        }
        if (bytesToRead > 0) {
            assert (bytesToRead <= len);
            assert (this.checksumBytes.remaining() >= this.checksumSize * checksumsToRead);
            assert (checksumBuf.length >= this.checksumSize * checksumsToRead);
            IOUtils.readFully(this.in, buf, offset, bytesToRead);
            this.checksumBytes.get(checksumBuf, 0, this.checksumSize * checksumsToRead);
        }
        this.dataLeft -= bytesToRead;
        assert (this.dataLeft >= 0);
        this.lastChunkOffset = chunkOffset;
        this.lastChunkLen = bytesToRead;
        if (this.dataLeft == 0 && pos + (long)bytesToRead >= this.bytesNeededToFinish) {
            PacketHeader hdr = new PacketHeader();
            hdr.readFields(this.in);
            if (!hdr.isLastPacketInBlock() || hdr.getDataLen() != 0) {
                throw new IOException("Expected empty end-of-read packet! Header: " + hdr);
            }
            this.eos = true;
        }
        if (bytesToRead == 0) {
            return -1;
        }
        return bytesToRead;
    }

    private RemoteBlockReader(String file, String bpid, long blockId, DataInputStream in, DataChecksum checksum, boolean verifyChecksum, long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, DatanodeID datanodeID, PeerCache peerCache) {
        super(new Path("/blk_" + blockId + ":" + bpid + ":of:" + file), 1, verifyChecksum, checksum.getChecksumSize() > 0 ? checksum : null, checksum.getBytesPerChecksum(), checksum.getChecksumSize());
        this.isLocal = DFSClient.isLocalAddress(NetUtils.createSocketAddr(datanodeID.getXferAddr()));
        this.peer = peer;
        this.datanodeID = datanodeID;
        this.in = in;
        this.checksum = checksum;
        this.startOffset = Math.max(startOffset, 0L);
        this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
        this.firstChunkOffset = firstChunkOffset;
        this.lastChunkOffset = firstChunkOffset;
        this.lastChunkLen = -1L;
        this.bytesPerChecksum = this.checksum.getBytesPerChecksum();
        this.checksumSize = this.checksum.getChecksumSize();
        this.peerCache = peerCache;
    }

    public static RemoteBlockReader newBlockReader(String file, ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, long startOffset, long len, int bufferSize, boolean verifyChecksum, String clientName, Peer peer, DatanodeID datanodeID, PeerCache peerCache, CachingStrategy cachingStrategy) throws IOException {
        DataOutputStream out = new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
        new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, verifyChecksum, cachingStrategy);
        DataInputStream in = new DataInputStream(new BufferedInputStream(peer.getInputStream(), bufferSize));
        DataTransferProtos.BlockOpResponseProto status = DataTransferProtos.BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
        RemoteBlockReader2.checkSuccess(status, peer, block, file);
        DataTransferProtos.ReadOpChecksumInfoProto checksumInfo = status.getReadOpChecksumInfo();
        DataChecksum checksum = DataTransferProtoUtil.fromProto(checksumInfo.getChecksum());
        long firstChunkOffset = checksumInfo.getChunkOffset();
        if (firstChunkOffset < 0L || firstChunkOffset > startOffset || firstChunkOffset <= startOffset - (long)checksum.getBytesPerChecksum()) {
            throw new IOException("BlockReader: error in first chunk offset (" + firstChunkOffset + ") startOffset is " + startOffset + " for file " + file);
        }
        return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(), in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID, peerCache);
    }

    @Override
    public synchronized void close() throws IOException {
        this.startOffset = -1L;
        this.checksum = null;
        if (this.peerCache != null & this.sentStatusCode) {
            this.peerCache.put(this.datanodeID, this.peer);
        } else {
            this.peer.close();
        }
    }

    @Override
    public void readFully(byte[] buf, int readOffset, int amtToRead) throws IOException {
        IOUtils.readFully(this, buf, readOffset, amtToRead);
    }

    @Override
    public int readAll(byte[] buf, int offset, int len) throws IOException {
        return RemoteBlockReader.readFully(this, buf, offset, len);
    }

    void sendReadResult(Peer peer, DataTransferProtos.Status statusCode) {
        assert (!this.sentStatusCode) : "already sent status code to " + peer;
        try {
            RemoteBlockReader2.writeReadResult(peer.getOutputStream(), statusCode);
            this.sentStatusCode = true;
        }
        catch (IOException e) {
            LOG.info((Object)("Could not send read status (" + (Object)((Object)statusCode) + ") to datanode " + peer.getRemoteAddressString() + ": " + e.getMessage()));
        }
    }

    @Override
    public int read(ByteBuffer buf) throws IOException {
        throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
    }

    @Override
    public int available() throws IOException {
        return 131072;
    }

    @Override
    public boolean isLocal() {
        return this.isLocal;
    }

    @Override
    public boolean isShortCircuit() {
        return false;
    }
}

