package tachyon.worker.netty;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tachyon.Constants;
import tachyon.worker.BlockHandler;
import tachyon.worker.BlocksLocker;
import tachyon.worker.hierarchy.StorageDir;

@ChannelHandler.Sharable
/* loaded from: input_file:tachyon/worker/netty/DataServerHandler.class */
public final class DataServerHandler extends ChannelInboundHandlerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);
    private final BlocksLocker mLocker;

    public DataServerHandler(BlocksLocker blocksLocker) {
        this.mLocker = blocksLocker;
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        BlockRequest blockRequest = (BlockRequest) obj;
        long blockId = blockRequest.getBlockId();
        long offset = blockRequest.getOffset();
        long length = blockRequest.getLength();
        int lockId = this.mLocker.getLockId();
        StorageDir lock = this.mLocker.lock(blockId, lockId);
        BlockHandler blockHandler = null;
        try {
            try {
                validateInput(blockRequest);
                blockHandler = lock.getBlockHandler(blockId);
                long length2 = blockHandler.getLength();
                validateBounds(blockRequest, length2);
                ChannelFuture writeAndFlush = channelHandlerContext.writeAndFlush(new BlockResponse(blockId, offset, returnLength(offset, length, length2), blockHandler));
                writeAndFlush.addListener(ChannelFutureListener.CLOSE);
                writeAndFlush.addListener(new ClosableResourceChannelListener(blockHandler));
                lock.accessBlock(blockId);
                LOG.info("Response remote request by reading from {}, preparation done.", lock.getBlockFilePath(blockId));
                this.mLocker.unlock(blockId, lockId);
            } catch (Exception e) {
                LOG.error("The file is not here : " + e.getMessage(), e);
                channelHandlerContext.writeAndFlush(BlockResponse.createErrorResponse(blockId)).addListener(ChannelFutureListener.CLOSE);
                if (blockHandler != null) {
                    blockHandler.close();
                }
                this.mLocker.unlock(blockId, lockId);
            }
        } catch (Throwable th) {
            this.mLocker.unlock(blockId, lockId);
            throw th;
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        LOG.warn("Exception thrown while processing request", th);
        channelHandlerContext.close();
    }

    private long returnLength(long j, long j2, long j3) {
        return j2 == -1 ? j3 - j : j2;
    }

    private void validateBounds(BlockRequest blockRequest, long j) {
        if (blockRequest.getOffset() > j) {
            throw new IllegalArgumentException(String.format("Offset(%d) is larger than file length(%d)", Long.valueOf(blockRequest.getOffset()), Long.valueOf(j)));
        }
        if (blockRequest.getLength() != -1 && blockRequest.getOffset() + blockRequest.getLength() > j) {
            throw new IllegalArgumentException(String.format("Offset(%d) plus length(%d) is larger than file length(%d)", Long.valueOf(blockRequest.getOffset()), Long.valueOf(blockRequest.getLength()), Long.valueOf(j)));
        }
    }

    private void validateInput(BlockRequest blockRequest) {
        if (blockRequest.getOffset() < 0) {
            throw new IllegalArgumentException("Offset can not be negative: " + blockRequest.getOffset());
        }
        if (blockRequest.getLength() < 0 && blockRequest.getLength() != -1) {
            throw new IllegalArgumentException("Length can not be negative except -1: " + blockRequest.getLength());
        }
    }
}
