/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSink;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class TestReplicationSink {
    private static final Log LOG = LogFactory.getLog(TestReplicationSink.class);
    private static final int BATCH_SIZE = 10;
    private static final long SLEEP_TIME = 500L;
    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
    private static ReplicationSink SINK;
    private static final byte[] TABLE_NAME1;
    private static final byte[] TABLE_NAME2;
    private static final byte[] FAM_NAME1;
    private static final byte[] FAM_NAME2;
    private static HTable table1;
    private static Stoppable STOPPABLE;
    private static HTable table2;

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
        TEST_UTIL.getConfiguration().setBoolean("hbase.replication", true);
        TEST_UTIL.startMiniCluster(3);
        SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE);
        table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
        table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        STOPPABLE.stop("Shutting down");
        TEST_UTIL.shutdownMiniCluster();
    }

    @Before
    public void setUp() throws Exception {
        table1 = TEST_UTIL.truncateTable(TABLE_NAME1);
        table2 = TEST_UTIL.truncateTable(TABLE_NAME2);
        Thread.sleep(500L);
    }

    @Test
    public void testBatchSink() throws Exception {
        HLog.Entry[] entries = new HLog.Entry[10];
        for (int i = 0; i < 10; ++i) {
            entries[i] = this.createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
        }
        SINK.replicateEntries(entries);
        Scan scan = new Scan();
        ResultScanner scanRes = table1.getScanner(scan);
        Assert.assertEquals((long)10L, (long)scanRes.next(10).length);
    }

    @Test
    public void testMixedPutDelete() throws Exception {
        int i;
        HLog.Entry[] entries = new HLog.Entry[5];
        for (i = 0; i < 5; ++i) {
            entries[i] = this.createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
        }
        SINK.replicateEntries(entries);
        entries = new HLog.Entry[10];
        for (i = 0; i < 10; ++i) {
            entries[i] = this.createEntry(TABLE_NAME1, i, i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn);
        }
        SINK.replicateEntries(entries);
        Scan scan = new Scan();
        ResultScanner scanRes = table1.getScanner(scan);
        Assert.assertEquals((long)5L, (long)scanRes.next(10).length);
    }

    @Test
    public void testMixedPutTables() throws Exception {
        HLog.Entry[] entries = new HLog.Entry[10];
        for (int i = 0; i < 10; ++i) {
            entries[i] = this.createEntry(i % 2 == 0 ? TABLE_NAME2 : TABLE_NAME1, i, KeyValue.Type.Put);
        }
        SINK.replicateEntries(entries);
        Scan scan = new Scan();
        ResultScanner scanRes = table2.getScanner(scan);
        for (Result res : scanRes) {
            Assert.assertTrue((Bytes.toInt((byte[])res.getRow()) % 2 == 0 ? 1 : 0) != 0);
        }
    }

    @Test
    public void testMixedDeletes() throws Exception {
        HLog.Entry[] entries = new HLog.Entry[3];
        for (int i = 0; i < 3; ++i) {
            entries[i] = this.createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
        }
        SINK.replicateEntries(entries);
        entries = new HLog.Entry[]{this.createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn), this.createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily), this.createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn)};
        SINK.replicateEntries(entries);
        Scan scan = new Scan();
        ResultScanner scanRes = table1.getScanner(scan);
        Assert.assertEquals((long)0L, (long)scanRes.next(3).length);
    }

    @Test
    public void testApplyDeleteBeforePut() throws Exception {
        int i;
        HLog.Entry[] entries = new HLog.Entry[5];
        for (i = 0; i < 2; ++i) {
            entries[i] = this.createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
        }
        entries[2] = this.createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily);
        for (i = 3; i < 5; ++i) {
            entries[i] = this.createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
        }
        SINK.replicateEntries(entries);
        Get get = new Get(Bytes.toBytes((int)1));
        Result res = table1.get(get);
        Assert.assertEquals((long)0L, (long)res.size());
    }

    private HLog.Entry createEntry(byte[] table, int row, KeyValue.Type type) {
        byte[] fam = Bytes.equals((byte[])table, (byte[])TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
        byte[] rowBytes = Bytes.toBytes((int)row);
        try {
            Thread.sleep(1L);
        }
        catch (InterruptedException e) {
            LOG.info((Object)"Was interrupted while sleep, meh", (Throwable)e);
        }
        long now = System.currentTimeMillis();
        KeyValue kv = null;
        if (type.getCode() == KeyValue.Type.Put.getCode()) {
            kv = new KeyValue(rowBytes, fam, fam, now, KeyValue.Type.Put, Bytes.toBytes((int)row));
        } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) {
            kv = new KeyValue(rowBytes, fam, fam, now, KeyValue.Type.DeleteColumn);
        } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) {
            kv = new KeyValue(rowBytes, fam, null, now, KeyValue.Type.DeleteFamily);
        }
        HLogKey key = new HLogKey(table, table, now, now);
        WALEdit edit = new WALEdit();
        edit.add(kv);
        return new HLog.Entry(key, edit);
    }

    static {
        TABLE_NAME1 = Bytes.toBytes((String)"table1");
        TABLE_NAME2 = Bytes.toBytes((String)"table2");
        FAM_NAME1 = Bytes.toBytes((String)"info1");
        FAM_NAME2 = Bytes.toBytes((String)"info2");
        STOPPABLE = new Stoppable(){
            final AtomicBoolean stop = new AtomicBoolean(false);

            public boolean isStopped() {
                return this.stop.get();
            }

            public void stop(String why) {
                LOG.info((Object)("STOPPING BECAUSE: " + why));
                this.stop.set(true);
            }
        };
    }
}

