/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.ClientContext;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.RemotePeerFactory;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestDataNodeVolumeFailure {
    private final int block_size = 512;
    MiniDFSCluster cluster = null;
    private Configuration conf;
    final int dn_num = 2;
    final int blocks_num = 30;
    final short repl = (short)2;
    File dataDir = null;
    File data_fail = null;
    File failedDir = null;
    final Map<String, BlockLocs> block_map = new HashMap<String, BlockLocs>();

    @Before
    public void setUp() throws Exception {
        this.conf = new HdfsConfiguration();
        this.conf.setLong("dfs.blocksize", 512L);
        this.conf.setInt("dfs.datanode.failed.volumes.tolerated", 1);
        this.cluster = new MiniDFSCluster.Builder(this.conf).numDataNodes(2).build();
        this.cluster.waitActive();
    }

    @After
    public void tearDown() throws Exception {
        if (this.data_fail != null) {
            FileUtil.setWritable(this.data_fail, true);
        }
        if (this.failedDir != null) {
            FileUtil.setWritable(this.failedDir, true);
        }
        if (this.cluster != null) {
            this.cluster.shutdown();
        }
    }

    @Test
    public void testVolumeFailure() throws Exception {
        DistributedFileSystem fs = this.cluster.getFileSystem();
        this.dataDir = new File(this.cluster.getDataDirectory());
        System.out.println("Data dir: is " + this.dataDir.getPath());
        String filename = "/test.txt";
        Path filePath = new Path(filename);
        int filesize = 15360;
        DFSTestUtil.createFile(fs, filePath, filesize, (short)2, 1L);
        DFSTestUtil.waitReplication(fs, filePath, (short)2);
        System.out.println("file " + filename + "(size " + filesize + ") is created and replicated");
        this.data_fail = new File(this.dataDir, "data3");
        this.failedDir = MiniDFSCluster.getFinalizedDir(this.dataDir, this.cluster.getNamesystem().getBlockPoolId());
        if (this.failedDir.exists() && !this.deteteBlocks(this.failedDir)) {
            throw new IOException("Could not delete hdfs directory '" + this.failedDir + "'");
        }
        this.data_fail.setReadOnly();
        this.failedDir.setReadOnly();
        System.out.println("Deleteing " + this.failedDir.getPath() + "; exist=" + this.failedDir.exists());
        this.triggerFailure(filename, filesize);
        DataNode dn = this.cluster.getDataNodes().get(1);
        String bpid = this.cluster.getNamesystem().getBlockPoolId();
        DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid);
        Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists = dn.getFSDataset().getBlockReports(bpid);
        StorageBlockReport[] reports = new StorageBlockReport[perVolumeBlockLists.size()];
        int reportIndex = 0;
        for (Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
            DatanodeStorage dnStorage = kvPair.getKey();
            BlockListAsLongs blockList = kvPair.getValue();
            reports[reportIndex++] = new StorageBlockReport(dnStorage, blockList.getBlockListAsLongs());
        }
        this.cluster.getNameNodeRpc().blockReport(dnR, bpid, reports);
        this.verify(filename, filesize);
        System.out.println("creating file test1.txt");
        Path fileName1 = new Path("/test1.txt");
        DFSTestUtil.createFile(fs, fileName1, filesize, (short)2, 1L);
        DFSTestUtil.waitReplication(fs, fileName1, (short)2);
        System.out.println("file " + fileName1.getName() + " is created and replicated");
    }

    private void verify(String fn, int fs) throws IOException {
        int totalReal = this.countRealBlocks(this.block_map);
        System.out.println("countRealBlocks counted " + totalReal + " blocks");
        int totalNN = this.countNNBlocks(this.block_map, fn, fs);
        System.out.println("countNNBlocks counted " + totalNN + " blocks");
        for (String bid : this.block_map.keySet()) {
            BlockLocs bl = this.block_map.get(bid);
            Assert.assertEquals((String)"Num files should match num locations", (long)bl.num_files, (long)bl.num_locs);
        }
        Assert.assertEquals((String)"Num physical blocks should match num stored in the NN", (long)totalReal, (long)totalNN);
        FSNamesystem fsn = this.cluster.getNamesystem();
        BlockManagerTestUtil.getComputedDatanodeWork(fsn.getBlockManager());
        long underRepl = fsn.getUnderReplicatedBlocks();
        long pendRepl = fsn.getPendingReplicationBlocks();
        long totalRepl = underRepl + pendRepl;
        System.out.println("underreplicated after = " + underRepl + " and pending repl =" + pendRepl + "; total underRepl = " + totalRepl);
        System.out.println("total blocks (real and replicating):" + ((long)totalReal + totalRepl) + " vs. all files blocks " + 60);
        Assert.assertEquals((String)"Incorrect total block count", (long)((long)totalReal + totalRepl), (long)60L);
    }

    private void triggerFailure(String path, long size) throws IOException {
        NamenodeProtocols nn = this.cluster.getNameNodeRpc();
        List<LocatedBlock> locatedBlocks = nn.getBlockLocations(path, 0L, size).getLocatedBlocks();
        for (LocatedBlock lb : locatedBlocks) {
            DatanodeInfo dinfo = lb.getLocations()[1];
            ExtendedBlock b = lb.getBlock();
            try {
                this.accessBlock(dinfo, lb);
            }
            catch (IOException e) {
                System.out.println("Failure triggered, on block: " + b.getBlockId() + "; corresponding volume should be removed by now");
                break;
            }
        }
    }

    private boolean deteteBlocks(File dir) {
        File[] fileList;
        for (File f : fileList = dir.listFiles()) {
            if (!f.getName().startsWith("blk_") || f.delete()) continue;
            return false;
        }
        return true;
    }

    private void accessBlock(DatanodeInfo datanode, LocatedBlock lblock) throws IOException {
        InetSocketAddress targetAddr = null;
        ExtendedBlock block = lblock.getBlock();
        targetAddr = NetUtils.createSocketAddr(datanode.getXferAddr());
        BlockReader blockReader = new BlockReaderFactory(new DFSClient.Conf(this.conf)).setInetSocketAddress(targetAddr).setBlock(block).setFileName(BlockReaderFactory.getFileName(targetAddr, "test-blockpoolid", block.getBlockId())).setBlockToken(lblock.getBlockToken()).setStartOffset(0L).setLength(-1L).setVerifyChecksum(true).setClientName("TestDataNodeVolumeFailure").setDatanodeInfo(datanode).setCachingStrategy(CachingStrategy.newDefaultStrategy()).setClientCacheContext(ClientContext.getFromConf(this.conf)).setConfiguration(this.conf).setRemotePeerFactory(new RemotePeerFactory(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public Peer newConnectedPeer(InetSocketAddress addr) throws IOException {
                Peer peer = null;
                Socket sock = NetUtils.getDefaultSocketFactory(TestDataNodeVolumeFailure.this.conf).createSocket();
                try {
                    sock.connect(addr, 60000);
                    sock.setSoTimeout(60000);
                    peer = TcpPeerServer.peerFromSocket(sock);
                }
                finally {
                    if (peer == null) {
                        IOUtils.closeSocket(sock);
                    }
                }
                return peer;
            }
        }).build();
        blockReader.close();
    }

    private int countNNBlocks(Map<String, BlockLocs> map, String path, long size) throws IOException {
        int total = 0;
        NamenodeProtocols nn = this.cluster.getNameNodeRpc();
        List<LocatedBlock> locatedBlocks = nn.getBlockLocations(path, 0L, size).getLocatedBlocks();
        for (LocatedBlock lb : locatedBlocks) {
            String blockId = "" + lb.getBlock().getBlockId();
            DatanodeInfo[] dn_locs = lb.getLocations();
            BlockLocs bl = map.get(blockId);
            if (bl == null) {
                bl = new BlockLocs();
            }
            total += dn_locs.length;
            bl.num_locs += dn_locs.length;
            map.put(blockId, bl);
        }
        return total;
    }

    private int countRealBlocks(Map<String, BlockLocs> map) {
        int total = 0;
        String bpid = this.cluster.getNamesystem().getBlockPoolId();
        for (int i = 0; i < 2; ++i) {
            for (int j = 0; j <= 1; ++j) {
                File storageDir = this.cluster.getInstanceStorageDir(i, j);
                File dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
                if (dir == null) {
                    System.out.println("dir is null for dn=" + i + " and data_dir=" + j);
                    continue;
                }
                String[] res = this.metaFilesInDir(dir);
                if (res == null) {
                    System.out.println("res is null for dir = " + dir + " i=" + i + " and j=" + j);
                    continue;
                }
                for (String s : res) {
                    Assert.assertNotNull((String)"Block file name should not be null", (Object)s);
                    String bid = s.substring(s.indexOf("_") + 1, s.lastIndexOf("_"));
                    BlockLocs val = map.get(bid);
                    if (val == null) {
                        val = new BlockLocs();
                    }
                    ++val.num_files;
                    map.put(bid, val);
                }
                total += res.length;
            }
        }
        return total;
    }

    private String[] metaFilesInDir(File dir) {
        String[] res = dir.list(new FilenameFilter(){

            @Override
            public boolean accept(File dir, String name) {
                return name.startsWith("blk_") && name.endsWith(".meta");
            }
        });
        return res;
    }

    private class BlockLocs {
        public int num_files = 0;
        public int num_locs = 0;

        private BlockLocs() {
        }
    }
}

