/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.mapreduce.Job;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class TestReplication {
    private static final Log LOG = LogFactory.getLog(TestReplication.class);
    private static Configuration conf1;
    private static Configuration conf2;
    private static ZooKeeperWatcher zkw1;
    private static ZooKeeperWatcher zkw2;
    private static ReplicationAdmin admin;
    private static String slaveClusterKey;
    private static HTable htable1;
    private static HTable htable2;
    private static HBaseTestingUtility utility1;
    private static HBaseTestingUtility utility2;
    private static final int NB_ROWS_IN_BATCH = 100;
    private static final int NB_ROWS_IN_BIG_BATCH = 1000;
    private static final long SLEEP_TIME = 500L;
    private static final int NB_RETRIES = 10;
    private static final byte[] tableName;
    private static final byte[] famName;
    private static final byte[] row;
    private static final byte[] noRepfamName;

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        conf1 = HBaseConfiguration.create();
        conf1.set("zookeeper.znode.parent", "/1");
        conf1.setInt("hbase.regionserver.hlog.blocksize", 20480);
        conf1.setInt("replication.source.size.capacity", 1024);
        conf1.setLong("replication.source.sleepforretries", 100L);
        conf1.setInt("hbase.regionserver.maxlogs", 10);
        conf1.setLong("hbase.master.logcleaner.ttl", 10L);
        conf1.setBoolean("hbase.replication", true);
        conf1.setBoolean("dfs.support.append", true);
        conf1.setLong("hbase.server.thread.wakefrequency", 100L);
        utility1 = new HBaseTestingUtility(conf1);
        utility1.startMiniZKCluster();
        MiniZooKeeperCluster miniZK = utility1.getZkCluster();
        conf1 = utility1.getConfiguration();
        zkw1 = new ZooKeeperWatcher(conf1, "cluster1", null);
        admin = new ReplicationAdmin(conf1);
        LOG.info((Object)"Setup first Zk");
        conf2 = HBaseConfiguration.create((Configuration)conf1);
        conf2.set("zookeeper.znode.parent", "/2");
        conf2.setInt("hbase.client.retries.number", 6);
        conf2.setBoolean("hbase.replication", true);
        conf2.setBoolean("dfs.support.append", true);
        utility2 = new HBaseTestingUtility(conf2);
        utility2.setZkCluster(miniZK);
        zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null);
        slaveClusterKey = conf2.get("hbase.zookeeper.quorum") + ":" + conf2.get("hbase.zookeeper.property.clientPort") + ":/2";
        admin.addPeer("2", slaveClusterKey);
        TestReplication.setIsReplication(true);
        LOG.info((Object)"Setup second Zk");
        utility1.startMiniCluster(2);
        utility2.startMiniCluster(2);
        HTableDescriptor table = new HTableDescriptor(tableName);
        HColumnDescriptor fam = new HColumnDescriptor(famName);
        fam.setScope(1);
        table.addFamily(fam);
        fam = new HColumnDescriptor(noRepfamName);
        table.addFamily(fam);
        HBaseAdmin admin1 = new HBaseAdmin(conf1);
        HBaseAdmin admin2 = new HBaseAdmin(conf2);
        admin1.createTable(table);
        admin2.createTable(table);
        htable1 = new HTable(conf1, tableName);
        htable1.setWriteBufferSize(1024L);
        htable2 = new HTable(conf2, tableName);
    }

    private static void setIsReplication(boolean rep) throws Exception {
        LOG.info((Object)("Set rep " + rep));
        admin.setReplicating(rep);
        Thread.sleep(500L);
    }

    @Before
    public void setUp() throws Exception {
        for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster().getRegionServerThreads()) {
            r.getRegionServer().getWAL().rollWriter();
        }
        utility1.truncateTable(tableName);
        Scan scan = new Scan();
        int lastCount = 0;
        for (int i = 0; i < 10; ++i) {
            if (i == 9) {
                Assert.fail((String)"Waited too much time for truncate");
            }
            ResultScanner scanner = htable2.getScanner(scan);
            Result[] res = scanner.next(1000);
            scanner.close();
            if (res.length == 0) break;
            if (res.length < lastCount) {
                --i;
            }
            lastCount = res.length;
            LOG.info((Object)("Still got " + res.length + " rows"));
            Thread.sleep(500L);
        }
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        utility2.shutdownMiniCluster();
        utility1.shutdownMiniCluster();
    }

    @Test
    public void testSimplePutDelete() throws Exception {
        LOG.info((Object)"testSimplePutDelete");
        Put put = new Put(row);
        put.add(famName, row, row);
        htable1 = new HTable(conf1, tableName);
        htable1.put(put);
        Get get = new Get(row);
        for (int i = 0; i < 10; ++i) {
            Result res;
            if (i == 9) {
                Assert.fail((String)"Waited too much time for put replication");
            }
            if ((res = htable2.get(get)).size() != 0) {
                Assert.assertArrayEquals((byte[])res.value(), (byte[])row);
                break;
            }
            LOG.info((Object)"Row not available");
            Thread.sleep(500L);
        }
        Delete del = new Delete(row);
        htable1.delete(del);
        get = new Get(row);
        for (int i = 0; i < 10; ++i) {
            Result res;
            if (i == 9) {
                Assert.fail((String)"Waited too much time for del replication");
            }
            if ((res = htable2.get(get)).size() < 1) break;
            LOG.info((Object)"Row not deleted");
            Thread.sleep(500L);
        }
    }

    @Test
    public void testSmallBatch() throws Exception {
        LOG.info((Object)"testSmallBatch");
        htable1.setAutoFlush(false);
        for (int i = 0; i < 100; ++i) {
            Put put = new Put(Bytes.toBytes((int)i));
            put.add(famName, row, row);
            htable1.put(put);
        }
        htable1.flushCommits();
        Scan scan = new Scan();
        ResultScanner scanner1 = htable1.getScanner(scan);
        Result[] res1 = scanner1.next(100);
        scanner1.close();
        Assert.assertEquals((long)100L, (long)res1.length);
        for (int i = 0; i < 10; ++i) {
            if (i == 9) {
                Assert.fail((String)"Waited too much time for normal batch replication");
            }
            ResultScanner scanner = htable2.getScanner(scan);
            Result[] res = scanner.next(100);
            scanner.close();
            if (res.length == 100) break;
            LOG.info((Object)("Only got " + res.length + " rows"));
            Thread.sleep(500L);
        }
        htable1.setAutoFlush(true);
    }

    @Test
    public void testStartStop() throws Exception {
        Result res;
        int i;
        TestReplication.setIsReplication(false);
        Put put = new Put(Bytes.toBytes((String)"stop start"));
        put.add(famName, row, row);
        htable1.put(put);
        Get get = new Get(Bytes.toBytes((String)"stop start"));
        for (i = 0; i < 10 && i != 9; ++i) {
            res = htable2.get(get);
            if (res.size() >= 1) {
                Assert.fail((String)"Replication wasn't stopped");
                continue;
            }
            LOG.info((Object)"Row not replicated, let's wait a bit more...");
            Thread.sleep(500L);
        }
        TestReplication.setIsReplication(true);
        htable1.put(put);
        for (i = 0; i < 10; ++i) {
            if (i == 9) {
                Assert.fail((String)"Waited too much time for put replication");
            }
            if ((res = htable2.get(get)).size() != 0) {
                Assert.assertArrayEquals((byte[])res.value(), (byte[])row);
                break;
            }
            LOG.info((Object)"Row not available");
            Thread.sleep(500L);
        }
        put = new Put(Bytes.toBytes((String)"do not rep"));
        put.add(noRepfamName, row, row);
        htable1.put(put);
        get = new Get(Bytes.toBytes((String)"do not rep"));
        for (i = 0; i < 10 && i != 9; ++i) {
            res = htable2.get(get);
            if (res.size() >= 1) {
                Assert.fail((String)"Not supposed to be replicated");
                continue;
            }
            LOG.info((Object)"Row not replicated, let's wait a bit more...");
            Thread.sleep(500L);
        }
    }

    @Test
    public void testAddAndRemoveClusters() throws Exception {
        Result res;
        int i;
        LOG.info((Object)"testAddAndRemoveClusters");
        admin.removePeer("2");
        Thread.sleep(500L);
        byte[] rowKey = Bytes.toBytes((String)"Won't be replicated");
        Put put = new Put(rowKey);
        put.add(famName, row, row);
        htable1.put(put);
        Get get = new Get(rowKey);
        for (i = 0; i < 10 && i != 9; ++i) {
            res = htable2.get(get);
            if (res.size() >= 1) {
                Assert.fail((String)"Not supposed to be replicated");
                continue;
            }
            LOG.info((Object)"Row not replicated, let's wait a bit more...");
            Thread.sleep(500L);
        }
        admin.addPeer("2", slaveClusterKey);
        Thread.sleep(500L);
        rowKey = Bytes.toBytes((String)"do rep");
        put = new Put(rowKey);
        put.add(famName, row, row);
        LOG.info((Object)"Adding new row");
        htable1.put(put);
        get = new Get(rowKey);
        for (i = 0; i < 10; ++i) {
            if (i == 9) {
                Assert.fail((String)"Waited too much time for put replication");
            }
            if ((res = htable2.get(get)).size() != 0) {
                Assert.assertArrayEquals((byte[])res.value(), (byte[])row);
                break;
            }
            LOG.info((Object)"Row not available");
            Thread.sleep(500L * (long)i);
        }
    }

    @Test
    public void loadTesting() throws Exception {
        htable1.setWriteBufferSize(1024L);
        htable1.setAutoFlush(false);
        for (int i = 0; i < 1000; ++i) {
            Put put = new Put(Bytes.toBytes((int)i));
            put.add(famName, row, row);
            htable1.put(put);
        }
        htable1.flushCommits();
        Scan scan = new Scan();
        ResultScanner scanner = htable1.getScanner(scan);
        Result[] res = scanner.next(1000);
        scanner.close();
        Assert.assertEquals((long)1000L, (long)res.length);
        scan = new Scan();
        for (int i = 0; i < 10; ++i) {
            scanner = htable2.getScanner(scan);
            res = scanner.next(1000);
            scanner.close();
            if (res.length == 1000) break;
            if (i == 9) {
                int lastRow = -1;
                for (Result result : res) {
                    int currentRow = Bytes.toInt((byte[])result.getRow());
                    for (int row = lastRow + 1; row < currentRow; ++row) {
                        LOG.error((Object)("Row missing: " + row));
                    }
                    lastRow = currentRow;
                }
                LOG.error((Object)("Last row: " + lastRow));
                Assert.fail((String)("Waited too much time for normal batch replication, " + res.length + " instead of " + 1000));
                continue;
            }
            LOG.info((Object)("Only got " + res.length + " rows"));
            Thread.sleep(500L);
        }
    }

    @Test
    public void testVerifyRepJob() throws Exception {
        this.testSmallBatch();
        String[] args = new String[]{"2", Bytes.toString((byte[])tableName)};
        Job job = VerifyReplication.createSubmittableJob((Configuration)conf1, (String[])args);
        if (job == null) {
            Assert.fail((String)"Job wasn't created, see the log");
        }
        if (!job.waitForCompletion(true)) {
            Assert.fail((String)"Job failed, see the log");
        }
        Assert.assertEquals((long)100L, (long)job.getCounters().findCounter((Enum)VerifyReplication.Verifier.Counters.GOODROWS).getValue());
        Assert.assertEquals((long)0L, (long)job.getCounters().findCounter((Enum)VerifyReplication.Verifier.Counters.BADROWS).getValue());
        Scan scan = new Scan();
        ResultScanner rs = htable2.getScanner(scan);
        Put put = null;
        for (Result result : rs) {
            put = new Put(result.getRow());
            KeyValue firstVal = result.raw()[0];
            put.add(firstVal.getFamily(), firstVal.getQualifier(), Bytes.toBytes((String)"diff data"));
            htable2.put(put);
        }
        Delete delete = new Delete(put.getRow());
        htable2.delete(delete);
        job = VerifyReplication.createSubmittableJob((Configuration)conf1, (String[])args);
        if (job == null) {
            Assert.fail((String)"Job wasn't created, see the log");
        }
        if (!job.waitForCompletion(true)) {
            Assert.fail((String)"Job failed, see the log");
        }
        Assert.assertEquals((long)0L, (long)job.getCounters().findCounter((Enum)VerifyReplication.Verifier.Counters.GOODROWS).getValue());
        Assert.assertEquals((long)100L, (long)job.getCounters().findCounter((Enum)VerifyReplication.Verifier.Counters.BADROWS).getValue());
    }

    @Test
    public void queueFailover() throws Exception {
        Result[] res;
        utility1.createMultiRegions(htable1, famName);
        int rsToKill1 = utility1.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0;
        int rsToKill2 = utility2.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0;
        Thread killer1 = TestReplication.killARegionServer(utility1, 7500L, rsToKill1);
        Thread killer2 = TestReplication.killARegionServer(utility2, 10000L, rsToKill2);
        LOG.info((Object)"Start loading table");
        int initialCount = utility1.loadTable(htable1, famName);
        LOG.info((Object)"Done loading table");
        killer1.join(5000L);
        killer2.join(5000L);
        LOG.info((Object)"Done waiting for threads");
        while (true) {
            try {
                Scan scan = new Scan();
                ResultScanner scanner = htable1.getScanner(scan);
                res = scanner.next(initialCount);
                scanner.close();
            }
            catch (UnknownScannerException ex) {
                LOG.info((Object)"Cluster wasn't ready yet, restarting scanner");
                continue;
            }
            break;
        }
        if (res.length != initialCount) {
            LOG.warn((Object)"We lost some rows on the master cluster!");
            initialCount = res.length;
        }
        Scan scan2 = new Scan();
        int lastCount = 0;
        for (int i = 0; i < 10; ++i) {
            if (i == 9) {
                Assert.fail((String)"Waited too much time for queueFailover replication");
            }
            ResultScanner scanner2 = htable2.getScanner(scan2);
            Result[] res2 = scanner2.next(initialCount * 2);
            scanner2.close();
            if (res2.length >= initialCount) break;
            if (lastCount < res2.length) {
                --i;
            }
            lastCount = res2.length;
            LOG.info((Object)("Only got " + lastCount + " rows instead of " + initialCount + " current i=" + i));
            Thread.sleep(1000L);
        }
    }

    private static Thread killARegionServer(final HBaseTestingUtility utility, final long timeout, final int rs) {
        Thread killer = new Thread(){

            @Override
            public void run() {
                try {
                    Thread.sleep(timeout);
                    utility.expireRegionServerSession(rs);
                }
                catch (Exception e) {
                    LOG.error((Object)e);
                }
            }
        };
        killer.start();
        return killer;
    }

    static {
        tableName = Bytes.toBytes((String)"test");
        famName = Bytes.toBytes((String)"f");
        row = Bytes.toBytes((String)"row");
        noRepfamName = Bytes.toBytes((String)"norep");
    }
}

