package org.apache.hadoop.hdfs.server.datanode;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.nio.channels.ClosedChannelException;
import org.apache.commons.logging.Log;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;

/* loaded from: input_file:org/apache/hadoop/hdfs/server/datanode/DataXceiver.class */
class DataXceiver extends DataTransferProtocol.Receiver implements Runnable, FSConstants {
    public static final Log LOG;
    static final Log ClientTraceLog;
    private final Socket s;
    private final boolean isLocal;
    private final String remoteAddress;
    private final String localAddress;
    private final DataNode datanode;
    private final DataXceiverServer dataXceiverServer;
    private int socketKeepaliveTimeout;
    private long opStartTime;
    static final /* synthetic */ boolean $assertionsDisabled;

    public DataXceiver(Socket socket, DataNode dataNode, DataXceiverServer dataXceiverServer) {
        this.s = socket;
        this.isLocal = socket.getInetAddress().equals(socket.getLocalAddress());
        this.datanode = dataNode;
        this.dataXceiverServer = dataXceiverServer;
        this.remoteAddress = socket.getRemoteSocketAddress().toString();
        this.localAddress = socket.getLocalSocketAddress().toString();
        this.socketKeepaliveTimeout = dataNode.getConf().getInt(DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY, 1000);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Number of active connections is: " + dataNode.getXceiverCount());
        }
    }

    private void updateCurrentThreadName(String str) {
        StringBuilder sb = new StringBuilder();
        sb.append("DataXceiver for client ").append(this.remoteAddress);
        if (str != null) {
            sb.append(" [").append(str).append("]");
        }
        Thread.currentThread().setName(sb.toString());
    }

    DataNode getDataNode() {
        return this.datanode;
    }

    @Override // java.lang.Runnable
    public void run() {
        this.dataXceiverServer.childSockets.put(this.s, this.s);
        updateCurrentThreadName("Waiting for operation");
        int i = 0;
        try {
            try {
                DataInputStream dataInputStream = new DataInputStream(new BufferedInputStream(NetUtils.getInputStream(this.s), SMALL_BUFFER_SIZE));
                int soTimeout = this.s.getSoTimeout();
                do {
                    if (i != 0) {
                        try {
                            if (!$assertionsDisabled && this.socketKeepaliveTimeout <= 0) {
                                throw new AssertionError();
                            }
                            this.s.setSoTimeout(this.socketKeepaliveTimeout);
                        } catch (InterruptedIOException e) {
                        } catch (IOException e2) {
                            if (i <= 0 || !((e2 instanceof EOFException) || (e2 instanceof ClosedChannelException))) {
                                throw e2;
                            }
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Cached " + this.s.toString() + " closing after " + i + " ops");
                            }
                        }
                    }
                    DataTransferProtocol.Op readOp = readOp(dataInputStream);
                    if (i != 0) {
                        this.s.setSoTimeout(soTimeout);
                    }
                    int xceiverCount = this.datanode.getXceiverCount();
                    if (xceiverCount <= this.dataXceiverServer.maxXceiverCount) {
                        this.opStartTime = Util.now();
                        processOp(readOp, dataInputStream);
                        i++;
                        if (this.s.isClosed()) {
                            break;
                        }
                    } else {
                        throw new IOException("xceiverCount " + xceiverCount + " exceeds the limit of concurrent xcievers " + this.dataXceiverServer.maxXceiverCount);
                    }
                } while (this.socketKeepaliveTimeout > 0);
                if (LOG.isDebugEnabled()) {
                    LOG.debug(this.datanode.dnRegistration + ":Number of active connections is: " + this.datanode.getXceiverCount());
                }
                updateCurrentThreadName("Cleaning up");
                IOUtils.closeStream(dataInputStream);
                IOUtils.closeSocket(this.s);
                this.dataXceiverServer.childSockets.remove(this.s);
            } catch (Throwable th) {
                LOG.error(this.datanode.dnRegistration + ":DataXceiver, at " + this.s.toString(), th);
                if (LOG.isDebugEnabled()) {
                    LOG.debug(this.datanode.dnRegistration + ":Number of active connections is: " + this.datanode.getXceiverCount());
                }
                updateCurrentThreadName("Cleaning up");
                IOUtils.closeStream((Closeable) null);
                IOUtils.closeSocket(this.s);
                this.dataXceiverServer.childSockets.remove(this.s);
            }
        } catch (Throwable th2) {
            if (LOG.isDebugEnabled()) {
                LOG.debug(this.datanode.dnRegistration + ":Number of active connections is: " + this.datanode.getXceiverCount());
            }
            updateCurrentThreadName("Cleaning up");
            IOUtils.closeStream((Closeable) null);
            IOUtils.closeSocket(this.s);
            this.dataXceiverServer.childSockets.remove(this.s);
            throw th2;
        }
    }

    @Override // org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Receiver
    protected void opReadBlock(DataInputStream dataInputStream, Block block, long j, long j2, String str, Token<BlockTokenIdentifier> token) throws IOException {
        OutputStream outputStream = NetUtils.getOutputStream(this.s, this.datanode.socketWriteTimeout);
        DataOutputStream dataOutputStream = new DataOutputStream(new BufferedOutputStream(outputStream, SMALL_BUFFER_SIZE));
        if (this.datanode.isBlockTokenEnabled) {
            try {
                this.datanode.blockTokenSecretManager.checkAccess(token, (String) null, block, BlockTokenSecretManager.AccessMode.READ);
            } catch (SecretManager.InvalidToken e) {
                try {
                    DataTransferProtocol.Status.ERROR_ACCESS_TOKEN.write(dataOutputStream);
                    dataOutputStream.flush();
                    LOG.warn("Block token verification failed, for client " + this.remoteAddress + " for OP_READ_BLOCK for block " + block + " : " + e.getLocalizedMessage());
                    throw e;
                } catch (Throwable th) {
                    IOUtils.closeStream(dataOutputStream);
                    throw th;
                }
            }
        }
        String format = (str.length() <= 0 || !ClientTraceLog.isInfoEnabled()) ? this.datanode.dnRegistration + " Served block " + block + " to " + this.s.getInetAddress() : String.format(DataNode.DN_CLIENTTRACE_FORMAT, this.localAddress, this.remoteAddress, "%d", "HDFS_READ", str, "%d", this.datanode.dnRegistration.getStorageID(), block, "%d");
        updateCurrentThreadName("Sending block " + block);
        try {
            try {
                try {
                    BlockSender blockSender = new BlockSender(block, j, j2, true, true, false, this.datanode, format);
                    DataTransferProtocol.Status.SUCCESS.write(dataOutputStream);
                    long sendBlock = blockSender.sendBlock(dataOutputStream, outputStream, null);
                    if (blockSender.didSendEntireByteRange()) {
                        try {
                            DataTransferProtocol.Status read = DataTransferProtocol.Status.read(dataInputStream);
                            if (read == null) {
                                LOG.warn("Client " + this.s.getInetAddress() + "did not send a valid status code after reading. Will close connection.");
                                IOUtils.closeStream(dataOutputStream);
                            } else if (read == DataTransferProtocol.Status.CHECKSUM_OK && blockSender.isBlockReadFully() && this.datanode.blockScanner != null) {
                                this.datanode.blockScanner.verifiedByClient(block);
                            }
                        } catch (IOException e2) {
                            LOG.debug("Error reading client status response. Will close connection.", e2);
                            IOUtils.closeStream(dataOutputStream);
                        }
                    } else {
                        IOUtils.closeStream(dataOutputStream);
                    }
                    this.datanode.myMetrics.bytesRead.inc((int) sendBlock);
                    this.datanode.myMetrics.blocksRead.inc();
                    IOUtils.closeStream(blockSender);
                } catch (IOException e3) {
                    sendResponse(this.s, DataTransferProtocol.Status.ERROR, this.datanode.socketWriteTimeout);
                    throw e3;
                }
            } catch (SocketException e4) {
                this.datanode.myMetrics.blocksRead.inc();
                IOUtils.closeStream(dataOutputStream);
                IOUtils.closeStream((Closeable) null);
            } catch (IOException e5) {
                LOG.warn(this.datanode.dnRegistration + ":Got exception while serving " + block + " to " + this.s.getInetAddress() + ":\n" + StringUtils.stringifyException(e5));
                throw e5;
            }
            updateDuration(this.datanode.myMetrics.readBlockOp);
            updateCounter(this.datanode.myMetrics.readsFromLocalClient, this.datanode.myMetrics.readsFromRemoteClient);
        } catch (Throwable th2) {
            IOUtils.closeStream((Closeable) null);
            throw th2;
        }
    }

    @Override // org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Receiver
    protected void opWriteBlock(DataInputStream dataInputStream, Block block, int i, DataTransferProtocol.BlockConstructionStage blockConstructionStage, long j, long j2, long j3, String str, DatanodeInfo datanodeInfo, DatanodeInfo[] datanodeInfoArr, Token<BlockTokenIdentifier> token) throws IOException {
        updateCurrentThreadName("Receiving block " + block + " client=" + str);
        if (LOG.isDebugEnabled()) {
            LOG.debug("writeBlock receive buf size " + this.s.getReceiveBufferSize() + " tcp no delay " + this.s.getTcpNoDelay());
        }
        Block block2 = new Block(block);
        block.setNumBytes(this.dataXceiverServer.estimateBlockSize);
        LOG.info("Receiving block " + block + " src: " + this.remoteAddress + " dest: " + this.localAddress);
        DataOutputStream dataOutputStream = new DataOutputStream(NetUtils.getOutputStream(this.s, this.datanode.socketWriteTimeout));
        if (this.datanode.isBlockTokenEnabled) {
            try {
                this.datanode.blockTokenSecretManager.checkAccess(token, (String) null, block, BlockTokenSecretManager.AccessMode.WRITE);
            } catch (SecretManager.InvalidToken e) {
                try {
                    if (str.length() != 0) {
                        DataTransferProtocol.Status.ERROR_ACCESS_TOKEN.write(dataOutputStream);
                        Text.writeString(dataOutputStream, this.datanode.dnRegistration.getName());
                        dataOutputStream.flush();
                    }
                    LOG.warn("Block token verification failed, for client " + this.remoteAddress + " for OP_WRITE_BLOCK for block " + block + " : " + e.getLocalizedMessage());
                    throw e;
                } catch (Throwable th) {
                    IOUtils.closeStream(dataOutputStream);
                    throw th;
                }
            }
        }
        DataOutputStream dataOutputStream2 = null;
        DataInputStream dataInputStream2 = null;
        Socket socket = null;
        BlockReceiver blockReceiver = null;
        String str2 = null;
        String str3 = "";
        DataTransferProtocol.Status status = DataTransferProtocol.Status.SUCCESS;
        try {
            try {
                if (str.length() == 0 || blockConstructionStage != DataTransferProtocol.BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
                    blockReceiver = new BlockReceiver(block, dataInputStream, this.s.getRemoteSocketAddress().toString(), this.s.getLocalSocketAddress().toString(), blockConstructionStage, j, j2, j3, str, datanodeInfo, this.datanode);
                } else {
                    this.datanode.data.recoverClose(block, j, j2);
                }
                if (datanodeInfoArr.length > 0) {
                    str2 = datanodeInfoArr[0].getName();
                    InetSocketAddress createSocketAddr = NetUtils.createSocketAddr(str2);
                    socket = this.datanode.newSocket();
                    try {
                        int length = this.datanode.socketTimeout + (5000 * datanodeInfoArr.length);
                        int length2 = this.datanode.socketWriteTimeout + (5000 * datanodeInfoArr.length);
                        NetUtils.connect(socket, createSocketAddr, length);
                        socket.setSoTimeout(length);
                        socket.setSendBufferSize(FSConstants.DEFAULT_DATA_SOCKET_SIZE);
                        dataOutputStream2 = new DataOutputStream(new BufferedOutputStream(NetUtils.getOutputStream(socket, length2), SMALL_BUFFER_SIZE));
                        dataInputStream2 = new DataInputStream(NetUtils.getInputStream(socket));
                        DataTransferProtocol.Sender.opWriteBlock(dataOutputStream2, block2, i, blockConstructionStage, j, j2, j3, str, datanodeInfo, datanodeInfoArr, token);
                        if (blockReceiver != null) {
                            blockReceiver.writeChecksumHeader(dataOutputStream2);
                        }
                        dataOutputStream2.flush();
                        if (str.length() != 0) {
                            status = DataTransferProtocol.Status.read(dataInputStream2);
                            str3 = Text.readString(dataInputStream2);
                            if (LOG.isDebugEnabled() || status != DataTransferProtocol.Status.SUCCESS) {
                                LOG.info("Datanode " + datanodeInfoArr.length + " got response for connect ack  from downstream datanode with firstbadlink as " + str3);
                            }
                        }
                    } catch (IOException e2) {
                        if (str.length() != 0) {
                            DataTransferProtocol.Status.ERROR.write(dataOutputStream);
                            Text.writeString(dataOutputStream, str2);
                            dataOutputStream.flush();
                        }
                        IOUtils.closeStream(dataOutputStream2);
                        dataOutputStream2 = null;
                        IOUtils.closeStream(dataInputStream2);
                        dataInputStream2 = null;
                        IOUtils.closeSocket(socket);
                        socket = null;
                        if (str.length() > 0) {
                            throw e2;
                        }
                        LOG.info(this.datanode.dnRegistration + ":Exception transfering block " + block + " to mirror " + str2 + ". continuing without the mirror.\n" + StringUtils.stringifyException(e2));
                    }
                }
                if (str.length() != 0) {
                    if (LOG.isDebugEnabled() || status != DataTransferProtocol.Status.SUCCESS) {
                        LOG.info("Datanode " + datanodeInfoArr.length + " forwarding connect ack to upstream firstbadlink is " + str3);
                    }
                    status.write(dataOutputStream);
                    Text.writeString(dataOutputStream, str3);
                    dataOutputStream.flush();
                }
                if (blockReceiver != null) {
                    blockReceiver.receiveBlock(dataOutputStream2, dataInputStream2, dataOutputStream, socket == null ? null : str2, null, datanodeInfoArr.length);
                }
                if (str.length() != 0 && blockConstructionStage == DataTransferProtocol.BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
                    block.setGenerationStamp(j);
                    block.setNumBytes(j2);
                }
                if (str.length() == 0 || blockConstructionStage == DataTransferProtocol.BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
                    this.datanode.closeBlock(block, "");
                    LOG.info("Received block " + block + " src: " + this.remoteAddress + " dest: " + this.localAddress + " of size " + block.getNumBytes());
                }
                IOUtils.closeStream(dataOutputStream2);
                IOUtils.closeStream(dataInputStream2);
                IOUtils.closeStream(dataOutputStream);
                IOUtils.closeSocket(socket);
                IOUtils.closeStream(blockReceiver);
                updateDuration(this.datanode.myMetrics.writeBlockOp);
                updateCounter(this.datanode.myMetrics.writesFromLocalClient, this.datanode.myMetrics.writesFromRemoteClient);
            } catch (Throwable th2) {
                IOUtils.closeStream((Closeable) null);
                IOUtils.closeStream((Closeable) null);
                IOUtils.closeStream(dataOutputStream);
                IOUtils.closeSocket((Socket) null);
                IOUtils.closeStream((Closeable) null);
                throw th2;
            }
        } catch (IOException e3) {
            LOG.info("writeBlock " + block + " received exception " + e3);
            throw e3;
        }
    }

    /* JADX WARN: Finally extract failed */
    @Override // org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Receiver
    protected void opBlockChecksum(DataInputStream dataInputStream, Block block, Token<BlockTokenIdentifier> token) throws IOException {
        DataOutputStream dataOutputStream = new DataOutputStream(NetUtils.getOutputStream(this.s, this.datanode.socketWriteTimeout));
        if (this.datanode.isBlockTokenEnabled) {
            try {
                this.datanode.blockTokenSecretManager.checkAccess(token, (String) null, block, BlockTokenSecretManager.AccessMode.READ);
            } catch (SecretManager.InvalidToken e) {
                try {
                    DataTransferProtocol.Status.ERROR_ACCESS_TOKEN.write(dataOutputStream);
                    dataOutputStream.flush();
                    LOG.warn("Block token verification failed, for client " + this.remoteAddress + " for OP_BLOCK_CHECKSUM for block " + block + " : " + e.getLocalizedMessage());
                    throw e;
                } catch (Throwable th) {
                    IOUtils.closeStream(dataOutputStream);
                    throw th;
                }
            }
        }
        updateCurrentThreadName("Reading metadata for block " + block);
        FSDatasetInterface.MetaDataInputStream metaDataInputStream = this.datanode.data.getMetaDataInputStream(block);
        DataInputStream dataInputStream2 = new DataInputStream(new BufferedInputStream(metaDataInputStream, BUFFER_SIZE));
        updateCurrentThreadName("Getting checksum for block " + block);
        try {
            int bytesPerChecksum = BlockMetadataHeader.readHeader(dataInputStream2).getChecksum().getBytesPerChecksum();
            long length = (metaDataInputStream.getLength() - BlockMetadataHeader.getHeaderSize()) / r0.getChecksumSize();
            MD5Hash digest = MD5Hash.digest(dataInputStream2);
            if (LOG.isDebugEnabled()) {
                LOG.debug("block=" + block + ", bytesPerCRC=" + bytesPerChecksum + ", crcPerBlock=" + length + ", md5=" + digest);
            }
            DataTransferProtocol.Status.SUCCESS.write(dataOutputStream);
            dataOutputStream.writeInt(bytesPerChecksum);
            dataOutputStream.writeLong(length);
            digest.write(dataOutputStream);
            dataOutputStream.flush();
            IOUtils.closeStream(dataOutputStream);
            IOUtils.closeStream(dataInputStream2);
            IOUtils.closeStream(metaDataInputStream);
            updateDuration(this.datanode.myMetrics.blockChecksumOp);
        } catch (Throwable th2) {
            IOUtils.closeStream(dataOutputStream);
            IOUtils.closeStream(dataInputStream2);
            IOUtils.closeStream(metaDataInputStream);
            throw th2;
        }
    }

    @Override // org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Receiver
    protected void opCopyBlock(DataInputStream dataInputStream, Block block, Token<BlockTokenIdentifier> token) throws IOException {
        updateCurrentThreadName("Copying block " + block);
        if (this.datanode.isBlockTokenEnabled) {
            try {
                this.datanode.blockTokenSecretManager.checkAccess(token, (String) null, block, BlockTokenSecretManager.AccessMode.COPY);
            } catch (SecretManager.InvalidToken e) {
                LOG.warn("Invalid access token in request from " + this.remoteAddress + " for OP_COPY_BLOCK for block " + block + " : " + e.getLocalizedMessage());
                sendResponse(this.s, DataTransferProtocol.Status.ERROR_ACCESS_TOKEN, this.datanode.socketWriteTimeout);
                return;
            }
        }
        if (!this.dataXceiverServer.balanceThrottler.acquire()) {
            LOG.info("Not able to copy block " + block.getBlockId() + " to " + this.s.getRemoteSocketAddress() + " because threads quota is exceeded.");
            sendResponse(this.s, DataTransferProtocol.Status.ERROR, this.datanode.socketWriteTimeout);
            return;
        }
        BlockSender blockSender = null;
        DataOutputStream dataOutputStream = null;
        try {
            try {
                blockSender = new BlockSender(block, 0L, -1L, false, false, false, this.datanode);
                dataOutputStream = new DataOutputStream(new BufferedOutputStream(NetUtils.getOutputStream(this.s, this.datanode.socketWriteTimeout), SMALL_BUFFER_SIZE));
                DataTransferProtocol.Status.SUCCESS.write(dataOutputStream);
                this.datanode.myMetrics.bytesRead.inc((int) blockSender.sendBlock(dataOutputStream, r0, this.dataXceiverServer.balanceThrottler));
                this.datanode.myMetrics.blocksRead.inc();
                LOG.info("Copied block " + block + " to " + this.s.getRemoteSocketAddress());
                this.dataXceiverServer.balanceThrottler.release();
                if (1 != 0) {
                    try {
                        dataOutputStream.writeChar(100);
                    } catch (IOException e2) {
                    }
                }
                IOUtils.closeStream(dataOutputStream);
                IOUtils.closeStream(blockSender);
                updateDuration(this.datanode.myMetrics.copyBlockOp);
            } catch (Throwable th) {
                this.dataXceiverServer.balanceThrottler.release();
                if (1 != 0) {
                    try {
                        dataOutputStream.writeChar(100);
                    } catch (IOException e3) {
                    }
                }
                IOUtils.closeStream(dataOutputStream);
                IOUtils.closeStream(blockSender);
                throw th;
            }
        } catch (IOException e4) {
            throw e4;
        }
    }

    @Override // org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Receiver
    protected void opReplaceBlock(DataInputStream dataInputStream, Block block, String str, DatanodeInfo datanodeInfo, Token<BlockTokenIdentifier> token) throws IOException {
        updateCurrentThreadName("Replacing block " + block + " from " + str);
        block.setNumBytes(this.dataXceiverServer.estimateBlockSize);
        if (this.datanode.isBlockTokenEnabled) {
            try {
                this.datanode.blockTokenSecretManager.checkAccess(token, (String) null, block, BlockTokenSecretManager.AccessMode.REPLACE);
            } catch (SecretManager.InvalidToken e) {
                LOG.warn("Invalid access token in request from " + this.remoteAddress + " for OP_REPLACE_BLOCK for block " + block + " : " + e.getLocalizedMessage());
                sendResponse(this.s, DataTransferProtocol.Status.ERROR_ACCESS_TOKEN, this.datanode.socketWriteTimeout);
                return;
            }
        }
        if (!this.dataXceiverServer.balanceThrottler.acquire()) {
            LOG.warn("Not able to receive block " + block.getBlockId() + " from " + this.s.getRemoteSocketAddress() + " because threads quota is exceeded.");
            sendResponse(this.s, DataTransferProtocol.Status.ERROR, this.datanode.socketWriteTimeout);
            return;
        }
        DataTransferProtocol.Status status = DataTransferProtocol.Status.SUCCESS;
        DataInputStream dataInputStream2 = null;
        try {
            try {
                InetSocketAddress createSocketAddr = NetUtils.createSocketAddr(datanodeInfo.getName());
                Socket newSocket = this.datanode.newSocket();
                NetUtils.connect(newSocket, createSocketAddr, this.datanode.socketTimeout);
                newSocket.setSoTimeout(this.datanode.socketTimeout);
                DataOutputStream dataOutputStream = new DataOutputStream(new BufferedOutputStream(NetUtils.getOutputStream(newSocket, this.datanode.socketWriteTimeout), SMALL_BUFFER_SIZE));
                DataTransferProtocol.Sender.opCopyBlock(dataOutputStream, block, token);
                DataInputStream dataInputStream3 = new DataInputStream(new BufferedInputStream(NetUtils.getInputStream(newSocket), BUFFER_SIZE));
                DataTransferProtocol.Status read = DataTransferProtocol.Status.read(dataInputStream3);
                if (read != DataTransferProtocol.Status.SUCCESS) {
                    if (read != DataTransferProtocol.Status.ERROR_ACCESS_TOKEN) {
                        throw new IOException("Copy block " + block + " from " + newSocket.getRemoteSocketAddress() + " failed");
                    }
                    throw new IOException("Copy block " + block + " from " + newSocket.getRemoteSocketAddress() + " failed due to access token error");
                }
                BlockReceiver blockReceiver = new BlockReceiver(block, dataInputStream3, newSocket.getRemoteSocketAddress().toString(), newSocket.getLocalSocketAddress().toString(), null, 0L, 0L, 0L, "", null, this.datanode);
                blockReceiver.receiveBlock(null, null, null, null, this.dataXceiverServer.balanceThrottler, -1);
                this.datanode.notifyNamenodeReceivedBlock(block, str);
                LOG.info("Moved block " + block + " from " + this.s.getRemoteSocketAddress());
                if (status == DataTransferProtocol.Status.SUCCESS) {
                    try {
                        dataInputStream3.readChar();
                    } catch (IOException e2) {
                    }
                }
                this.dataXceiverServer.balanceThrottler.release();
                try {
                    sendResponse(this.s, status, this.datanode.socketWriteTimeout);
                } catch (IOException e3) {
                    LOG.warn("Error writing reply back to " + this.s.getRemoteSocketAddress());
                }
                IOUtils.closeStream(dataOutputStream);
                IOUtils.closeStream(blockReceiver);
                IOUtils.closeStream(dataInputStream3);
                updateDuration(this.datanode.myMetrics.replaceBlockOp);
            } catch (IOException e4) {
                DataTransferProtocol.Status status2 = DataTransferProtocol.Status.ERROR;
                throw e4;
            }
        } catch (Throwable th) {
            if (status == DataTransferProtocol.Status.SUCCESS) {
                try {
                    dataInputStream2.readChar();
                } catch (IOException e5) {
                }
            }
            this.dataXceiverServer.balanceThrottler.release();
            try {
                sendResponse(this.s, status, this.datanode.socketWriteTimeout);
            } catch (IOException e6) {
                LOG.warn("Error writing reply back to " + this.s.getRemoteSocketAddress());
            }
            IOUtils.closeStream((Closeable) null);
            IOUtils.closeStream((Closeable) null);
            IOUtils.closeStream((Closeable) null);
            throw th;
        }
    }

    private void updateDuration(MetricsTimeVaryingRate metricsTimeVaryingRate) {
        metricsTimeVaryingRate.inc(Util.now() - this.opStartTime);
    }

    private void updateCounter(MetricsTimeVaryingInt metricsTimeVaryingInt, MetricsTimeVaryingInt metricsTimeVaryingInt2) {
        (this.isLocal ? metricsTimeVaryingInt : metricsTimeVaryingInt2).inc();
    }

    private void sendResponse(Socket socket, DataTransferProtocol.Status status, long j) throws IOException {
        DataOutputStream dataOutputStream = new DataOutputStream(NetUtils.getOutputStream(socket, j));
        status.write(dataOutputStream);
        dataOutputStream.flush();
    }

    static {
        $assertionsDisabled = !DataXceiver.class.desiredAssertionStatus();
        LOG = DataNode.LOG;
        ClientTraceLog = DataNode.ClientTraceLog;
    }
}
