/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.regionserver.wal;

import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.log4j.Level;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class TestLogRolling {
    private static final Log LOG = LogFactory.getLog(TestLogRolling.class);
    private HRegionServer server;
    private HLog log;
    private String tableName;
    private byte[] value;
    private static FileSystem fs;
    private static MiniDFSCluster dfsCluster;
    private static HBaseAdmin admin;
    private static MiniHBaseCluster cluster;
    private static final HBaseTestingUtility TEST_UTIL;

    public TestLogRolling() {
        ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
        ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
        ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
        ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
        ((Log4JLogger)HRegionServer.LOG).getLogger().setLevel(Level.ALL);
        ((Log4JLogger)HRegion.LOG).getLogger().setLevel(Level.ALL);
        ((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL);
        this.server = null;
        this.log = null;
        this.tableName = null;
        this.value = null;
        String className = this.getClass().getName();
        StringBuilder v = new StringBuilder(className);
        while (v.length() < 1000) {
            v.append(className);
        }
        this.value = Bytes.toBytes((String)v.toString());
    }

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        TEST_UTIL.getConfiguration().setLong("hbase.hregion.max.filesize", 786432L);
        TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32);
        TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.errors.tolerated", 2);
        TEST_UTIL.getConfiguration().setInt("ipc.ping.interval", 10000);
        TEST_UTIL.getConfiguration().setInt("ipc.socket.timeout", 10000);
        TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10000);
        TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.optionalflushcount", 2);
        TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.flush.size", 8192);
        TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 10000L);
        TEST_UTIL.getConfiguration().setInt("hbase.server.thread.wakefrequency", 2000);
        TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
        TEST_UTIL.getConfiguration().setInt("heartbeat.recheck.interval", 5000);
        TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
        TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30);
        TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2);
        TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3);
        TEST_UTIL.startMiniCluster(2);
        cluster = TEST_UTIL.getHBaseCluster();
        dfsCluster = TEST_UTIL.getDFSCluster();
        fs = TEST_UTIL.getTestFileSystem();
        admin = TEST_UTIL.getHBaseAdmin();
        cluster.getMaster().balanceSwitch(false);
    }

    @AfterClass
    public static void tearDown() throws IOException {
        TEST_UTIL.cleanupTestDir();
        TEST_UTIL.shutdownMiniCluster();
    }

    private void startAndWriteData() throws IOException {
        new HTable(TEST_UTIL.getConfiguration(), HConstants.META_TABLE_NAME);
        this.server = cluster.getRegionServerThreads().get(0).getRegionServer();
        this.log = this.server.getWAL();
        HTableDescriptor desc = new HTableDescriptor(this.tableName);
        desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
        admin.createTable(desc);
        HTable table = new HTable(TEST_UTIL.getConfiguration(), this.tableName);
        this.server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes((String)this.tableName));
        this.log = this.server.getWAL();
        for (int i = 1; i <= 256; ++i) {
            Put put = new Put(Bytes.toBytes((String)("row" + String.format("%1$04d", i))));
            put.add(HConstants.CATALOG_FAMILY, null, this.value);
            table.put(put);
            if (i % 32 != 0) continue;
            try {
                Thread.sleep(2000L);
                continue;
            }
            catch (InterruptedException e) {
                // empty catch block
            }
        }
    }

    @Test
    public void testLogRolling() throws FailedLogCloseException, IOException {
        this.tableName = TestLogRolling.getName();
        this.startAndWriteData();
        LOG.info((Object)("after writing there are " + this.log.getNumLogFiles() + " log files"));
        ArrayList regions = new ArrayList(this.server.getOnlineRegionsLocalContext());
        for (HRegion r : regions) {
            r.flushcache();
        }
        this.log.rollWriter();
        int count = this.log.getNumLogFiles();
        LOG.info((Object)("after flushing all regions and rolling logs there are " + this.log.getNumLogFiles() + " log files"));
        Assert.assertTrue((String)("actual count: " + count), (count <= 2 ? 1 : 0) != 0);
    }

    private static String getName() {
        return "TestLogRolling";
    }

    void writeData(HTable table, int rownum) throws IOException {
        Put put = new Put(Bytes.toBytes((String)("row" + String.format("%1$04d", rownum))));
        put.add(HConstants.CATALOG_FAMILY, null, this.value);
        table.put(put);
        try {
            Thread.sleep(2000L);
        }
        catch (InterruptedException e) {
            // empty catch block
        }
    }

    void batchWriteAndWait(HTable table, int start, boolean expect, int timeout) throws IOException {
        for (int i = 0; i < 10; ++i) {
            Put put = new Put(Bytes.toBytes((String)("row" + String.format("%1$04d", start + i))));
            put.add(HConstants.CATALOG_FAMILY, null, this.value);
            table.put(put);
        }
        long startTime = System.currentTimeMillis();
        long remaining = timeout;
        while (remaining > 0L && this.log.isLowReplicationRollEnabled() != expect) {
            try {
                Thread.sleep(200L);
            }
            catch (InterruptedException e) {
                // empty catch block
            }
            remaining = (long)timeout - (System.currentTimeMillis() - startTime);
        }
    }

    DatanodeInfo[] getPipeline(HLog log) throws IllegalArgumentException, IllegalAccessException, InvocationTargetException {
        OutputStream stm = log.getOutputStream();
        Method getPipeline = null;
        for (Method m : stm.getClass().getDeclaredMethods()) {
            if (!m.getName().endsWith("getPipeline")) continue;
            getPipeline = m;
            getPipeline.setAccessible(true);
            break;
        }
        Assert.assertTrue((String)"Need DFSOutputStream.getPipeline() for this test", (null != getPipeline ? 1 : 0) != 0);
        Object repl = getPipeline.invoke((Object)stm, new Object[0]);
        return (DatanodeInfo[])repl;
    }

    @Test
    public void testLogRollOnDatanodeDeath() throws IOException, InterruptedException, IllegalArgumentException, IllegalAccessException, InvocationTargetException {
        Assert.assertTrue((String)"This test requires HLog file replication.", (fs.getDefaultReplication() > 1 ? 1 : 0) != 0);
        LOG.info((Object)("Replication=" + fs.getDefaultReplication()));
        new HTable(TEST_UTIL.getConfiguration(), HConstants.META_TABLE_NAME);
        this.server = cluster.getRegionServer(0);
        this.log = this.server.getWAL();
        String tableName = TestLogRolling.getName();
        HTableDescriptor desc = new HTableDescriptor(tableName);
        desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
        if (admin.tableExists(tableName)) {
            admin.disableTable(tableName);
            admin.deleteTable(tableName);
        }
        admin.createTable(desc);
        HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
        this.server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes((String)tableName));
        this.log = this.server.getWAL();
        Assert.assertTrue((String)"Need HDFS-826 for this test", (boolean)this.log.canGetCurReplicas());
        Assert.assertTrue((String)"Need append support for this test", (boolean)FSUtils.isAppendSupported((Configuration)TEST_UTIL.getConfiguration()));
        dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null);
        dfsCluster.waitActive();
        Assert.assertTrue((dfsCluster.getDataNodes().size() >= fs.getDefaultReplication() + 1 ? 1 : 0) != 0);
        this.writeData(table, 2);
        table.setAutoFlush(true);
        long curTime = System.currentTimeMillis();
        long oldFilenum = this.log.getFilenum();
        Assert.assertTrue((String)"Log should have a timestamp older than now", (curTime > oldFilenum && oldFilenum != -1L ? 1 : 0) != 0);
        Assert.assertTrue((String)"The log shouldn't have rolled yet", (oldFilenum == this.log.getFilenum() ? 1 : 0) != 0);
        DatanodeInfo[] pipeline = this.getPipeline(this.log);
        Assert.assertTrue((pipeline.length == fs.getDefaultReplication() ? 1 : 0) != 0);
        Assert.assertTrue((dfsCluster.stopDataNode(pipeline[0].getName()) != null ? 1 : 0) != 0);
        Thread.sleep(10000L);
        this.writeData(table, 2);
        long newFilenum = this.log.getFilenum();
        Assert.assertTrue((String)"Missing datanode should've triggered a log roll", (newFilenum > oldFilenum && newFilenum > curTime ? 1 : 0) != 0);
        this.writeData(table, 3);
        Assert.assertTrue((String)"The log should not roll again.", (this.log.getFilenum() == newFilenum ? 1 : 0) != 0);
        Assert.assertTrue((dfsCluster.stopDataNode(pipeline[1].getName()) != null ? 1 : 0) != 0);
        Thread.sleep(10000L);
        this.batchWriteAndWait(table, 4, false, 10000);
        Assert.assertTrue((String)"LowReplication Roller should've been disabled", (!this.log.isLowReplicationRollEnabled() ? 1 : 0) != 0);
        dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null);
        dfsCluster.waitActive();
        this.log.rollWriter(true);
        this.batchWriteAndWait(table, 13, true, 10000);
        Assert.assertTrue((String)"New log file should have the default replication", (this.log.getLogReplication() == fs.getDefaultReplication() ? 1 : 0) != 0);
        Assert.assertTrue((String)"LowReplication Roller should've been enabled", (boolean)this.log.isLowReplicationRollEnabled());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLogRollOnPipelineRestart() throws Exception {
        LOG.info((Object)"Starting testLogRollOnPipelineRestart");
        Assert.assertTrue((String)"This test requires HLog file replication.", (fs.getDefaultReplication() > 1 ? 1 : 0) != 0);
        LOG.info((Object)("Replication=" + fs.getDefaultReplication()));
        new HTable(TEST_UTIL.getConfiguration(), HConstants.META_TABLE_NAME);
        this.server = cluster.getRegionServer(0);
        this.log = this.server.getWAL();
        String tableName = TestLogRolling.getName();
        HTableDescriptor desc = new HTableDescriptor(tableName);
        desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
        if (admin.tableExists(tableName)) {
            admin.disableTable(tableName);
            admin.deleteTable(tableName);
        }
        admin.createTable(desc);
        HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
        this.server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes((String)tableName));
        this.log = this.server.getWAL();
        final ArrayList<Path> paths = new ArrayList<Path>();
        paths.add(this.log.computeFilename());
        this.log.registerWALActionsListener(new WALObserver(){

            public void logRolled(Path newFile) {
                paths.add(newFile);
            }

            public void logRollRequested() {
            }

            public void logCloseRequested() {
            }

            public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) {
            }
        });
        Assert.assertTrue((String)"Need HDFS-826 for this test", (boolean)this.log.canGetCurReplicas());
        Assert.assertTrue((String)"Need append support for this test", (boolean)FSUtils.isAppendSupported((Configuration)TEST_UTIL.getConfiguration()));
        this.writeData(table, 1002);
        table.setAutoFlush(true);
        long curTime = System.currentTimeMillis();
        long oldFilenum = this.log.getFilenum();
        Assert.assertTrue((String)"Log should have a timestamp older than now", (curTime > oldFilenum && oldFilenum != -1L ? 1 : 0) != 0);
        Assert.assertTrue((String)"The log shouldn't have rolled yet", (oldFilenum == this.log.getFilenum() ? 1 : 0) != 0);
        dfsCluster.restartDataNodes();
        Thread.sleep(10000L);
        dfsCluster.waitActive();
        LOG.info((Object)"Data Nodes restarted");
        this.writeData(table, 1003);
        long newFilenum = this.log.getFilenum();
        Assert.assertTrue((String)"Missing datanode should've triggered a log roll", (newFilenum > oldFilenum && newFilenum > curTime ? 1 : 0) != 0);
        this.writeData(table, 1004);
        dfsCluster.restartDataNodes();
        Thread.sleep(10000L);
        dfsCluster.waitActive();
        LOG.info((Object)"Data Nodes restarted");
        this.writeData(table, 1005);
        this.log.rollWriter(true);
        HashSet<String> loggedRows = new HashSet<String>();
        for (Path p : paths) {
            LOG.debug((Object)("Reading HLog " + FSUtils.getPath((Path)p)));
            HLog.Reader reader = null;
            try {
                HLog.Entry entry;
                reader = HLog.getReader((FileSystem)fs, (Path)p, (Configuration)TEST_UTIL.getConfiguration());
                while ((entry = reader.next()) != null) {
                    LOG.debug((Object)("#" + entry.getKey().getLogSeqNum() + ": " + entry.getEdit().getKeyValues()));
                    for (KeyValue kv : entry.getEdit().getKeyValues()) {
                        loggedRows.add(Bytes.toStringBinary((byte[])kv.getRow()));
                    }
                }
            }
            catch (EOFException e) {
                LOG.debug((Object)("EOF reading file " + FSUtils.getPath((Path)p)));
            }
            finally {
                if (reader == null) continue;
                reader.close();
            }
        }
        Assert.assertTrue((boolean)loggedRows.contains("row1002"));
        Assert.assertTrue((boolean)loggedRows.contains("row1003"));
        Assert.assertTrue((boolean)loggedRows.contains("row1004"));
        Assert.assertTrue((boolean)loggedRows.contains("row1005"));
        ArrayList regions = new ArrayList(this.server.getOnlineRegionsLocalContext());
        for (HRegion r : regions) {
            r.flushcache();
        }
        ResultScanner scanner = table.getScanner(new Scan());
        try {
            for (int i = 2; i <= 5; ++i) {
                Result r = scanner.next();
                Assert.assertNotNull((Object)r);
                Assert.assertFalse((boolean)r.isEmpty());
                Assert.assertEquals((Object)("row100" + i), (Object)Bytes.toString((byte[])r.getRow()));
            }
        }
        finally {
            scanner.close();
        }
    }

    static {
        TEST_UTIL = new HBaseTestingUtility();
    }
}

