/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.regionserver.wal;

import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={LargeTests.class})
public class TestLogRolling {
    private static final Log LOG = LogFactory.getLog(TestLogRolling.class);
    private HRegionServer server;
    private HLog log;
    private String tableName;
    private byte[] value;
    private FileSystem fs;
    private MiniDFSCluster dfsCluster;
    private HBaseAdmin admin;
    private MiniHBaseCluster cluster;
    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();

    public TestLogRolling() {
        ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
        ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
        ((Log4JLogger)LogFactory.getLog((String)"org.apache.hadoop.hdfs.server.namenode.FSNamesystem")).getLogger().setLevel(Level.ALL);
        ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
        ((Log4JLogger)HRegionServer.LOG).getLogger().setLevel(Level.ALL);
        ((Log4JLogger)HRegion.LOG).getLogger().setLevel(Level.ALL);
        ((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL);
        this.server = null;
        this.log = null;
        this.tableName = null;
        String className = this.getClass().getName();
        StringBuilder v = new StringBuilder(className);
        while (v.length() < 1000) {
            v.append(className);
        }
        this.value = Bytes.toBytes((String)v.toString());
    }

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
        TEST_UTIL.getConfiguration().setLong("hbase.hregion.max.filesize", 786432L);
        TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32);
        TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.errors.tolerated", 2);
        TEST_UTIL.getConfiguration().setInt("ipc.ping.interval", 10000);
        TEST_UTIL.getConfiguration().setInt("ipc.socket.timeout", 10000);
        TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10000);
        TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.optionalflushcount", 2);
        TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.flush.size", 8192);
        TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 10000L);
        TEST_UTIL.getConfiguration().setInt("hbase.server.thread.wakefrequency", 2000);
        TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
        TEST_UTIL.getConfiguration().setInt("heartbeat.recheck.interval", 5000);
        TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
        TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30);
        TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2);
        TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3);
    }

    @Before
    public void setUp() throws Exception {
        TEST_UTIL.startMiniCluster(1, 1, 2);
        this.cluster = TEST_UTIL.getHBaseCluster();
        this.dfsCluster = TEST_UTIL.getDFSCluster();
        this.fs = TEST_UTIL.getTestFileSystem();
        this.admin = TEST_UTIL.getHBaseAdmin();
        this.cluster.getMaster().balanceSwitch(false);
    }

    @After
    public void tearDown() throws Exception {
        TEST_UTIL.shutdownMiniCluster();
    }

    private void startAndWriteData() throws IOException, InterruptedException {
        new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);
        this.server = this.cluster.getRegionServerThreads().get(0).getRegionServer();
        this.log = this.server.getWAL();
        HTable table = this.createTestTable(this.tableName);
        this.server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes((String)this.tableName));
        this.log = this.server.getWAL();
        for (int i = 1; i <= 256; ++i) {
            this.doPut(table, i);
            if (i % 32 != 0) continue;
            try {
                Thread.sleep(2000L);
                continue;
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    @Test
    public void testLogRolling() throws Exception {
        this.tableName = TestLogRolling.getName();
        this.startAndWriteData();
        LOG.info((Object)("after writing there are " + ((FSHLog)this.log).getNumRolledLogFiles() + " log files"));
        ArrayList regions = new ArrayList(this.server.getOnlineRegionsLocalContext());
        for (HRegion r : regions) {
            r.flushcache();
        }
        this.log.rollWriter();
        int count = ((FSHLog)this.log).getNumRolledLogFiles();
        LOG.info((Object)("after flushing all regions and rolling logs there are " + ((FSHLog)this.log).getNumRolledLogFiles() + " log files"));
        Assert.assertTrue((String)("actual count: " + count), (count <= 2 ? 1 : 0) != 0);
    }

    private static String getName() {
        return "TestLogRolling";
    }

    void writeData(HTable table, int rownum) throws IOException {
        this.doPut(table, rownum);
        try {
            Thread.sleep(2000L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    void validateData(HTable table, int rownum) throws IOException {
        String row = "row" + String.format("%1$04d", rownum);
        Get get = new Get(Bytes.toBytes((String)row));
        get.addFamily(HConstants.CATALOG_FAMILY);
        Result result = table.get(get);
        Assert.assertTrue((result.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)Bytes.equals((byte[])this.value, (byte[])result.getValue(HConstants.CATALOG_FAMILY, null)));
        LOG.info((Object)("Validated row " + row));
    }

    void batchWriteAndWait(HTable table, int start, boolean expect, int timeout) throws IOException {
        for (int i = 0; i < 10; ++i) {
            Put put = new Put(Bytes.toBytes((String)("row" + String.format("%1$04d", start + i))));
            put.add(HConstants.CATALOG_FAMILY, null, this.value);
            table.put(put);
        }
        Put tmpPut = new Put(Bytes.toBytes((String)"tmprow"));
        tmpPut.add(HConstants.CATALOG_FAMILY, null, this.value);
        long startTime = System.currentTimeMillis();
        long remaining = timeout;
        while (remaining > 0L && this.log.isLowReplicationRollEnabled() != expect) {
            table.put(tmpPut);
            try {
                Thread.sleep(200L);
            }
            catch (InterruptedException e) {
                // empty catch block
            }
            remaining = (long)timeout - (System.currentTimeMillis() - startTime);
        }
    }

    DatanodeInfo[] getPipeline(HLog log) throws IllegalArgumentException, IllegalAccessException, InvocationTargetException {
        OutputStream stm = ((FSHLog)log).getOutputStream();
        Method getPipeline = null;
        for (Method m : stm.getClass().getDeclaredMethods()) {
            if (!m.getName().endsWith("getPipeline")) continue;
            getPipeline = m;
            getPipeline.setAccessible(true);
            break;
        }
        Assert.assertTrue((String)"Need DFSOutputStream.getPipeline() for this test", (null != getPipeline ? 1 : 0) != 0);
        Object repl = getPipeline.invoke((Object)stm, new Object[0]);
        return (DatanodeInfo[])repl;
    }

    @Test
    public void testLogRollOnDatanodeDeath() throws Exception {
        Assert.assertTrue((String)"This test requires HLog file replication set to 2.", (this.fs.getDefaultReplication() == 2 ? 1 : 0) != 0);
        LOG.info((Object)("Replication=" + this.fs.getDefaultReplication()));
        this.server = this.cluster.getRegionServer(0);
        this.log = this.server.getWAL();
        String tableName = TestLogRolling.getName();
        HTableDescriptor desc = new HTableDescriptor(TableName.valueOf((String)tableName));
        desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
        this.admin.createTable(desc);
        HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
        Assert.assertTrue((boolean)table.isAutoFlush());
        this.server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes((String)tableName));
        this.log = this.server.getWAL();
        Assert.assertTrue((String)"Need HDFS-826 for this test", (boolean)((FSHLog)this.log).canGetCurReplicas());
        Assert.assertTrue((String)"Need append support for this test", (boolean)FSUtils.isAppendSupported((Configuration)TEST_UTIL.getConfiguration()));
        ArrayList existingNodes = this.dfsCluster.getDataNodes();
        int numDataNodes = 3;
        this.dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), numDataNodes, true, null, null);
        ArrayList allNodes = this.dfsCluster.getDataNodes();
        for (int i = allNodes.size() - 1; i >= 0; --i) {
            if (!existingNodes.contains(allNodes.get(i))) continue;
            this.dfsCluster.stopDataNode(i);
        }
        Assert.assertTrue((String)("DataNodes " + this.dfsCluster.getDataNodes().size() + " default replication " + this.fs.getDefaultReplication()), (this.dfsCluster.getDataNodes().size() >= this.fs.getDefaultReplication() + 1 ? 1 : 0) != 0);
        this.writeData(table, 2);
        long curTime = System.currentTimeMillis();
        long oldFilenum = ((FSHLog)this.log).getFilenum();
        Assert.assertTrue((String)"Log should have a timestamp older than now", (curTime > oldFilenum && oldFilenum != -1L ? 1 : 0) != 0);
        Assert.assertTrue((String)"The log shouldn't have rolled yet", (oldFilenum == ((FSHLog)this.log).getFilenum() ? 1 : 0) != 0);
        DatanodeInfo[] pipeline = this.getPipeline(this.log);
        Assert.assertTrue((pipeline.length == this.fs.getDefaultReplication() ? 1 : 0) != 0);
        Assert.assertTrue((this.dfsCluster.stopDataNode(pipeline[0].getName()) != null ? 1 : 0) != 0);
        this.writeData(table, 2);
        long newFilenum = ((FSHLog)this.log).getFilenum();
        Assert.assertTrue((String)"Missing datanode should've triggered a log roll", (newFilenum > oldFilenum && newFilenum > curTime ? 1 : 0) != 0);
        this.writeData(table, 3);
        Assert.assertTrue((String)"The log should not roll again.", (((FSHLog)this.log).getFilenum() == newFilenum ? 1 : 0) != 0);
        Assert.assertTrue((this.dfsCluster.stopDataNode(pipeline[1].getName()) != null ? 1 : 0) != 0);
        this.batchWriteAndWait(table, 3, false, 14000);
        Assert.assertTrue((String)("LowReplication Roller should've been disabled, current replication=" + ((FSHLog)this.log).getLogReplication()), (!this.log.isLowReplicationRollEnabled() ? 1 : 0) != 0);
        this.dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null);
        this.log.rollWriter(true);
        this.batchWriteAndWait(table, 13, true, 10000);
        Assert.assertTrue((String)("New log file should have the default replication instead of " + ((FSHLog)this.log).getLogReplication()), (((FSHLog)this.log).getLogReplication() == this.fs.getDefaultReplication() ? 1 : 0) != 0);
        Assert.assertTrue((String)"LowReplication Roller should've been enabled", (boolean)this.log.isLowReplicationRollEnabled());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLogRollOnPipelineRestart() throws Exception {
        LOG.info((Object)"Starting testLogRollOnPipelineRestart");
        Assert.assertTrue((String)"This test requires HLog file replication.", (this.fs.getDefaultReplication() > 1 ? 1 : 0) != 0);
        LOG.info((Object)("Replication=" + this.fs.getDefaultReplication()));
        new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);
        this.server = this.cluster.getRegionServer(0);
        this.log = this.server.getWAL();
        String tableName = TestLogRolling.getName();
        HTableDescriptor desc = new HTableDescriptor(TableName.valueOf((String)tableName));
        desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
        this.admin.createTable(desc);
        HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
        this.server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes((String)tableName));
        this.log = this.server.getWAL();
        final ArrayList<Path> paths = new ArrayList<Path>();
        final ArrayList preLogRolledCalled = new ArrayList();
        paths.add(((FSHLog)this.log).computeFilename());
        this.log.registerWALActionsListener(new WALActionsListener(){

            public void preLogRoll(Path oldFile, Path newFile) {
                LOG.debug((Object)("preLogRoll: oldFile=" + oldFile + " newFile=" + newFile));
                preLogRolledCalled.add(new Integer(1));
            }

            public void postLogRoll(Path oldFile, Path newFile) {
                paths.add(newFile);
            }

            public void preLogArchive(Path oldFile, Path newFile) {
            }

            public void postLogArchive(Path oldFile, Path newFile) {
            }

            public void logRollRequested() {
            }

            public void logCloseRequested() {
            }

            public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) {
            }

            public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) {
            }
        });
        Assert.assertTrue((String)"Need HDFS-826 for this test", (boolean)((FSHLog)this.log).canGetCurReplicas());
        Assert.assertTrue((String)"Need append support for this test", (boolean)FSUtils.isAppendSupported((Configuration)TEST_UTIL.getConfiguration()));
        this.writeData(table, 1002);
        table.setAutoFlush(true, true);
        long curTime = System.currentTimeMillis();
        long oldFilenum = this.log.getFilenum();
        Assert.assertTrue((String)"Log should have a timestamp older than now", (curTime > oldFilenum && oldFilenum != -1L ? 1 : 0) != 0);
        Assert.assertTrue((String)"The log shouldn't have rolled yet", (oldFilenum == this.log.getFilenum() ? 1 : 0) != 0);
        this.dfsCluster.restartDataNodes();
        Thread.sleep(1000L);
        this.dfsCluster.waitActive();
        LOG.info((Object)"Data Nodes restarted");
        this.validateData(table, 1002);
        this.writeData(table, 1003);
        long newFilenum = this.log.getFilenum();
        Assert.assertTrue((String)"Missing datanode should've triggered a log roll", (newFilenum > oldFilenum && newFilenum > curTime ? 1 : 0) != 0);
        this.validateData(table, 1003);
        this.writeData(table, 1004);
        this.dfsCluster.restartDataNodes();
        Thread.sleep(1000L);
        this.dfsCluster.waitActive();
        LOG.info((Object)"Data Nodes restarted");
        this.validateData(table, 1004);
        this.writeData(table, 1005);
        this.log.rollWriter(true);
        Assert.assertTrue((String)("preLogRolledCalled has size of " + preLogRolledCalled.size()), (preLogRolledCalled.size() >= 1 ? 1 : 0) != 0);
        HashSet<String> loggedRows = new HashSet<String>();
        FSUtils fsUtils = FSUtils.getInstance((FileSystem)this.fs, (Configuration)TEST_UTIL.getConfiguration());
        for (Path p : paths) {
            LOG.debug((Object)("recovering lease for " + p));
            fsUtils.recoverFileLease(((HFileSystem)this.fs).getBackingFs(), p, TEST_UTIL.getConfiguration(), null);
            LOG.debug((Object)("Reading HLog " + FSUtils.getPath((Path)p)));
            HLog.Reader reader = null;
            try {
                HLog.Entry entry;
                reader = HLogFactory.createReader((FileSystem)this.fs, (Path)p, (Configuration)TEST_UTIL.getConfiguration());
                while ((entry = reader.next()) != null) {
                    LOG.debug((Object)("#" + entry.getKey().getLogSeqNum() + ": " + entry.getEdit().getKeyValues()));
                    for (KeyValue kv : entry.getEdit().getKeyValues()) {
                        loggedRows.add(Bytes.toStringBinary((byte[])kv.getRow()));
                    }
                }
            }
            catch (EOFException e) {
                LOG.debug((Object)("EOF reading file " + FSUtils.getPath((Path)p)));
            }
            finally {
                if (reader == null) continue;
                reader.close();
            }
        }
        Assert.assertTrue((boolean)loggedRows.contains("row1002"));
        Assert.assertTrue((boolean)loggedRows.contains("row1003"));
        Assert.assertTrue((boolean)loggedRows.contains("row1004"));
        Assert.assertTrue((boolean)loggedRows.contains("row1005"));
        ArrayList regions = new ArrayList(this.server.getOnlineRegionsLocalContext());
        for (HRegion r : regions) {
            r.flushcache();
        }
        ResultScanner scanner = table.getScanner(new Scan());
        try {
            for (int i = 2; i <= 5; ++i) {
                Result r = scanner.next();
                Assert.assertNotNull((Object)r);
                Assert.assertFalse((boolean)r.isEmpty());
                Assert.assertEquals((Object)("row100" + i), (Object)Bytes.toString((byte[])r.getRow()));
            }
        }
        finally {
            scanner.close();
        }
        for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster().getRegionServerThreads()) {
            Assert.assertFalse((boolean)rsThread.getRegionServer().isAborted());
        }
    }

    @Test
    public void testCompactionRecordDoesntBlockRolling() throws Exception {
        new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);
        String tableName = TestLogRolling.getName();
        HTable table = this.createTestTable(tableName);
        String tableName2 = tableName + "1";
        HTable table2 = this.createTestTable(tableName2);
        this.server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes((String)tableName));
        this.log = this.server.getWAL();
        FSHLog fshLog = (FSHLog)this.log;
        HRegion region = (HRegion)this.server.getOnlineRegions(table2.getName()).get(0);
        Store s = region.getStore(HConstants.CATALOG_FAMILY);
        this.admin.flush(TableName.NAMESPACE_TABLE_NAME.getName());
        for (int i = 1; i <= 2; ++i) {
            this.doPut(table2, i);
            this.admin.flush(table2.getTableName());
        }
        this.doPut(table2, 3);
        Assert.assertEquals((String)"Should have no WAL after initial writes", (long)0L, (long)fshLog.getNumRolledLogFiles());
        Assert.assertEquals((long)2L, (long)s.getStorefilesCount());
        fshLog.rollWriter();
        Assert.assertEquals((String)"Should have WAL; one table is not flushed", (long)1L, (long)fshLog.getNumRolledLogFiles());
        this.admin.flush(table2.getTableName());
        region.compactStores();
        Assert.assertNotNull((Object)s);
        for (int waitTime = 3000; s.getStorefilesCount() > 1 && waitTime > 0; waitTime -= 200) {
            Threads.sleepWithoutInterrupt((long)200L);
        }
        Assert.assertEquals((String)"Compaction didn't happen", (long)1L, (long)s.getStorefilesCount());
        this.doPut(table, 0);
        fshLog.rollWriter();
        Assert.assertEquals((String)"Should have WAL; one table is not flushed", (long)1L, (long)fshLog.getNumRolledLogFiles());
        this.admin.flush(table.getTableName());
        this.doPut(table, 1);
        fshLog.rollWriter();
        Assert.assertEquals((String)"Should have 1 WALs at the end", (long)1L, (long)fshLog.getNumRolledLogFiles());
        table.close();
        table2.close();
    }

    private void doPut(HTable table, int i) throws IOException {
        Put put = new Put(Bytes.toBytes((String)("row" + String.format("%1$04d", i))));
        put.add(HConstants.CATALOG_FAMILY, null, this.value);
        table.put(put);
    }

    private HTable createTestTable(String tableName) throws IOException {
        HTableDescriptor desc = new HTableDescriptor(TableName.valueOf((String)tableName));
        desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
        this.admin.createTable(desc);
        return new HTable(TEST_UTIL.getConfiguration(), tableName);
    }
}

