/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClientAdapter;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.ReplicaMap;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;

public class TestInterDatanodeProtocol {
    private static final String ADDRESS = "0.0.0.0";
    private static final int PING_INTERVAL = 1000;
    private static final int MIN_SLEEP_TIME = 1000;
    private static final Configuration conf = new HdfsConfiguration();

    public static void checkMetaInfo(ExtendedBlock b, DataNode dn) throws IOException {
        Block metainfo = DataNodeTestUtils.getFSDataset(dn).getStoredBlock(b.getBlockPoolId(), b.getBlockId());
        Assert.assertEquals((long)b.getBlockId(), (long)metainfo.getBlockId());
        Assert.assertEquals((long)b.getNumBytes(), (long)metainfo.getNumBytes());
    }

    public static LocatedBlock getLastLocatedBlock(ClientProtocol namenode, String src) throws IOException {
        LocatedBlocks locations = namenode.getBlockLocations(src, 0L, Long.MAX_VALUE);
        List<LocatedBlock> blocks = locations.getLocatedBlocks();
        DataNode.LOG.info((Object)("blocks.size()=" + blocks.size()));
        Assert.assertTrue((blocks.size() > 0 ? 1 : 0) != 0);
        return blocks.get(blocks.size() - 1);
    }

    @Test
    public void testBlockMetaDataInfo() throws Exception {
        this.checkBlockMetaDataInfo(false);
    }

    @Test
    public void testBlockMetaDataInfoWithHostname() throws Exception {
        Assume.assumeTrue((boolean)System.getProperty("os.name").startsWith("Linux"));
        this.checkBlockMetaDataInfo(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkBlockMetaDataInfo(boolean useDnHostname) throws Exception {
        MiniDFSCluster cluster = null;
        conf.setBoolean("dfs.datanode.use.datanode.hostname", useDnHostname);
        if (useDnHostname) {
            conf.set("dfs.datanode.hostname", "localhost");
        }
        try {
            cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).checkDataNodeHostConfig(true).build();
            cluster.waitActive();
            DistributedFileSystem dfs = cluster.getFileSystem();
            String filestr = "/foo";
            Path filepath = new Path(filestr);
            DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L);
            Assert.assertTrue((boolean)dfs.exists(filepath));
            LocatedBlock locatedblock = TestInterDatanodeProtocol.getLastLocatedBlock(DFSClientAdapter.getDFSClient(dfs).getNamenode(), filestr);
            DatanodeInfo[] datanodeinfo = locatedblock.getLocations();
            Assert.assertTrue((datanodeinfo.length > 0 ? 1 : 0) != 0);
            DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort());
            InterDatanodeProtocol idp = DataNodeTestUtils.createInterDatanodeProtocolProxy(datanode, datanodeinfo[0], conf, useDnHostname);
            DataNodeTestUtils.shutdownBlockScanner(datanode);
            ExtendedBlock b = locatedblock.getBlock();
            InterDatanodeProtocol.LOG.info((Object)("b=" + b + ", " + b.getClass()));
            TestInterDatanodeProtocol.checkMetaInfo(b, datanode);
            long recoveryId = b.getGenerationStamp() + 1L;
            idp.initReplicaRecovery(new BlockRecoveryCommand.RecoveringBlock(b, locatedblock.getLocations(), recoveryId));
            ExtendedBlock newblock = new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(), b.getNumBytes() / 2L, b.getGenerationStamp() + 1L);
            idp.updateReplicaUnderRecovery(b, recoveryId, newblock.getNumBytes());
            TestInterDatanodeProtocol.checkMetaInfo(newblock, datanode);
            ExtendedBlock badBlock = new ExtendedBlock("fake-pool", b.getBlockId(), 0L, 0L);
            Assert.assertNull((Object)idp.initReplicaRecovery(new BlockRecoveryCommand.RecoveringBlock(badBlock, locatedblock.getLocations(), recoveryId)));
        }
        finally {
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }

    private static ReplicaInfo createReplicaInfo(Block b) {
        return new FinalizedReplica(b, null, null);
    }

    private static void assertEquals(ReplicaInfo originalInfo, ReplicaRecoveryInfo recoveryInfo) {
        Assert.assertEquals((long)originalInfo.getBlockId(), (long)recoveryInfo.getBlockId());
        Assert.assertEquals((long)originalInfo.getGenerationStamp(), (long)recoveryInfo.getGenerationStamp());
        Assert.assertEquals((long)originalInfo.getBytesOnDisk(), (long)recoveryInfo.getNumBytes());
        Assert.assertEquals((Object)((Object)originalInfo.getState()), (Object)((Object)recoveryInfo.getOriginalReplicaState()));
    }

    @Test
    public void testInitReplicaRecovery() throws IOException {
        long firstblockid = 10000L;
        long gs = 7777L;
        long length = 22L;
        ReplicaMap map = new ReplicaMap(this);
        String bpid = "BP-TEST";
        Block[] blocks = new Block[5];
        for (int i = 0; i < blocks.length; ++i) {
            blocks[i] = new Block(10000L + (long)i, 22L, 7777L);
            map.add(bpid, TestInterDatanodeProtocol.createReplicaInfo(blocks[i]));
        }
        Block b = blocks[0];
        ReplicaInfo originalInfo = map.get(bpid, b);
        long recoveryid = 7778L;
        ReplicaRecoveryInfo recoveryInfo = FsDatasetImpl.initReplicaRecovery(bpid, map, blocks[0], 7778L, 60000L);
        TestInterDatanodeProtocol.assertEquals(originalInfo, recoveryInfo);
        ReplicaUnderRecovery updatedInfo = (ReplicaUnderRecovery)map.get(bpid, b);
        Assert.assertEquals((long)originalInfo.getBlockId(), (long)updatedInfo.getBlockId());
        Assert.assertEquals((long)7778L, (long)updatedInfo.getRecoveryID());
        long recoveryid2 = 7779L;
        ReplicaRecoveryInfo recoveryInfo2 = FsDatasetImpl.initReplicaRecovery(bpid, map, blocks[0], 7779L, 60000L);
        TestInterDatanodeProtocol.assertEquals(originalInfo, recoveryInfo2);
        ReplicaUnderRecovery updatedInfo2 = (ReplicaUnderRecovery)map.get(bpid, b);
        Assert.assertEquals((long)originalInfo.getBlockId(), (long)updatedInfo2.getBlockId());
        Assert.assertEquals((long)7779L, (long)updatedInfo2.getRecoveryID());
        try {
            FsDatasetImpl.initReplicaRecovery(bpid, map, b, 7778L, 60000L);
            Assert.fail();
        }
        catch (RecoveryInProgressException ripe) {
            System.out.println("GOOD: getting " + ripe);
        }
        long recoveryid3 = 7778L;
        Block b2 = new Block(9999L, 22L, 7777L);
        ReplicaRecoveryInfo r = FsDatasetImpl.initReplicaRecovery(bpid, map, b2, 7778L, 60000L);
        Assert.assertNull((String)"Data-node should not have this replica.", (Object)r);
        recoveryid3 = 7776L;
        b2 = new Block(10001L, 22L, 7777L);
        try {
            FsDatasetImpl.initReplicaRecovery(bpid, map, b2, 7776L, 60000L);
            Assert.fail();
        }
        catch (IOException ioe) {
            System.out.println("GOOD: getting " + ioe);
        }
        recoveryid3 = 7778L;
        b2 = new Block(10000L, 22L, 7778L);
        try {
            FsDatasetImpl.initReplicaRecovery(bpid, map, b2, 7778L, 60000L);
            Assert.fail((String)"InitReplicaRecovery should fail because replica's gs is less than the block's gs");
        }
        catch (IOException e) {
            e.getMessage().startsWith("replica.getGenerationStamp() < block.getGenerationStamp(), block=");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testUpdateReplicaUnderRecovery() throws IOException {
        MiniDFSCluster cluster = null;
        try {
            cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
            cluster.waitActive();
            String bpid = cluster.getNamesystem().getBlockPoolId();
            DistributedFileSystem dfs = cluster.getFileSystem();
            String filestr = "/foo";
            Path filepath = new Path(filestr);
            DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L);
            LocatedBlock locatedblock = TestInterDatanodeProtocol.getLastLocatedBlock(DFSClientAdapter.getDFSClient(dfs).getNamenode(), filestr);
            DatanodeInfo[] datanodeinfo = locatedblock.getLocations();
            Assert.assertTrue((datanodeinfo.length > 0 ? 1 : 0) != 0);
            DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort());
            Assert.assertTrue((datanode != null ? 1 : 0) != 0);
            ExtendedBlock b = locatedblock.getBlock();
            long recoveryid = b.getGenerationStamp() + 1L;
            long newlength = b.getNumBytes() - 1L;
            FsDatasetSpi<?> fsdataset = DataNodeTestUtils.getFSDataset(datanode);
            ReplicaRecoveryInfo rri = fsdataset.initReplicaRecovery(new BlockRecoveryCommand.RecoveringBlock(b, null, recoveryid));
            ReplicaInfo replica = FsDatasetTestUtil.fetchReplicaInfo(fsdataset, bpid, b.getBlockId());
            Assert.assertEquals((Object)((Object)HdfsServerConstants.ReplicaState.RUR), (Object)((Object)replica.getState()));
            FsDatasetImpl.checkReplicaFiles(replica);
            ExtendedBlock tmp = new ExtendedBlock(b.getBlockPoolId(), rri.getBlockId(), rri.getNumBytes() - 1L, rri.getGenerationStamp());
            try {
                fsdataset.updateReplicaUnderRecovery(tmp, recoveryid, newlength);
                Assert.fail();
            }
            catch (IOException ioe) {
                System.out.println("GOOD: getting " + ioe);
            }
            String storageID = fsdataset.updateReplicaUnderRecovery(new ExtendedBlock(b.getBlockPoolId(), rri), recoveryid, newlength);
            Assert.assertTrue((storageID != null ? 1 : 0) != 0);
        }
        finally {
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(expected=SocketTimeoutException.class)
    public void testInterDNProtocolTimeout() throws Throwable {
        TestServer server = new TestServer(1, true);
        server.start();
        InetSocketAddress addr = NetUtils.getConnectAddress(server);
        DatanodeID fakeDnId = DFSTestUtil.getLocalDatanodeID(addr.getPort());
        DatanodeInfo dInfo = new DatanodeInfo(fakeDnId);
        InterDatanodeProtocol proxy = null;
        try {
            proxy = DataNode.createInterDataNodeProtocolProxy(dInfo, conf, 500, false);
            proxy.initReplicaRecovery(new BlockRecoveryCommand.RecoveringBlock(new ExtendedBlock("bpid", 1L), null, 100L));
            Assert.fail((String)"Expected SocketTimeoutException exception, but did not get.");
        }
        finally {
            if (proxy != null) {
                RPC.stopProxy(proxy);
            }
            server.stop();
        }
    }

    private static class TestServer
    extends Server {
        private boolean sleep;
        private Class<? extends Writable> responseClass;

        public TestServer(int handlerCount, boolean sleep) throws IOException {
            this(handlerCount, sleep, LongWritable.class, null);
        }

        public TestServer(int handlerCount, boolean sleep, Class<? extends Writable> paramClass, Class<? extends Writable> responseClass) throws IOException {
            super(TestInterDatanodeProtocol.ADDRESS, 0, paramClass, handlerCount, conf);
            this.sleep = sleep;
            this.responseClass = responseClass;
        }

        @Override
        public Writable call(RPC.RpcKind rpcKind, String protocol, Writable param, long receiveTime) throws IOException {
            if (this.sleep) {
                try {
                    Thread.sleep(2000L);
                }
                catch (InterruptedException e) {
                    // empty catch block
                }
            }
            if (this.responseClass != null) {
                try {
                    return this.responseClass.newInstance();
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
            return param;
        }
    }
}

