/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={MediumTests.class})
public class TestReplicationEndpoint
extends TestReplicationBase {
    static int numRegionServers;

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        TestReplicationBase.setUpBeforeClass();
        utility2.shutdownMiniCluster();
        admin.removePeer("2");
        numRegionServers = utility1.getHBaseCluster().getRegionServerThreads().size();
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        TestReplicationBase.tearDownAfterClass();
        Assert.assertTrue((ReplicationEndpointForTest.stoppedCount.get() > 0 ? 1 : 0) != 0);
    }

    @Before
    public void setup() throws FailedLogCloseException, IOException {
        ReplicationEndpointForTest.contructedCount.set(0);
        ReplicationEndpointForTest.startedCount.set(0);
        ReplicationEndpointForTest.replicateCount.set(0);
        ReplicationEndpointForTest.lastEntries = null;
        for (JVMClusterUtil.RegionServerThread rs : utility1.getMiniHBaseCluster().getRegionServerThreads()) {
            utility1.getHBaseAdmin().rollHLogWriter(rs.getRegionServer().getServerName().toString());
        }
    }

    @Test
    public void testCustomReplicationEndpoint() throws Exception {
        admin.addPeer("testCustomReplicationEndpoint", new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey((Configuration)conf1)).setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
        Waiter.waitFor((Configuration)conf1, (long)60000L, (Waiter.Predicate)new Waiter.Predicate<Exception>(){

            public boolean evaluate() throws Exception {
                return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
            }
        });
        Waiter.waitFor((Configuration)conf1, (long)60000L, (Waiter.Predicate)new Waiter.Predicate<Exception>(){

            public boolean evaluate() throws Exception {
                return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
            }
        });
        Assert.assertEquals((long)0L, (long)ReplicationEndpointForTest.replicateCount.get());
        this.doPut(Bytes.toBytes((String)"row42"));
        Waiter.waitFor((Configuration)conf1, (long)60000L, (Waiter.Predicate)new Waiter.Predicate<Exception>(){

            public boolean evaluate() throws Exception {
                return ReplicationEndpointForTest.replicateCount.get() >= 1;
            }
        });
        TestReplicationEndpoint.doAssert(Bytes.toBytes((String)"row42"));
        admin.removePeer("testCustomReplicationEndpoint");
    }

    @Test
    public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
        Assert.assertEquals((long)0L, (long)ReplicationEndpointForTest.replicateCount.get());
        Assert.assertTrue((!ReplicationEndpointReturningFalse.replicated.get() ? 1 : 0) != 0);
        String id = "testReplicationEndpointReturnsFalseOnReplicate";
        admin.addPeer("testReplicationEndpointReturnsFalseOnReplicate", new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey((Configuration)conf1)).setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
        this.doPut(row);
        Waiter.waitFor((Configuration)conf1, (long)60000L, (Waiter.Predicate)new Waiter.Predicate<Exception>(){

            public boolean evaluate() throws Exception {
                return ReplicationEndpointReturningFalse.replicated.get();
            }
        });
        if (ReplicationEndpointReturningFalse.ex.get() != null) {
            throw ReplicationEndpointReturningFalse.ex.get();
        }
        admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate");
    }

    @Test(timeout=120000L)
    public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
        admin.addPeer("testWALEntryFilterFromReplicationEndpoint", new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey((Configuration)conf1)).setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()), null);
        this.doPut(Bytes.toBytes((String)"row1"));
        this.doPut(row);
        this.doPut(Bytes.toBytes((String)"row2"));
        Waiter.waitFor((Configuration)conf1, (long)60000L, (Waiter.Predicate)new Waiter.Predicate<Exception>(){

            public boolean evaluate() throws Exception {
                return ReplicationEndpointForTest.replicateCount.get() >= 1;
            }
        });
        Assert.assertNull((Object)ReplicationEndpointWithWALEntryFilter.ex.get());
        admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
    }

    private void doPut(byte[] row) throws IOException {
        Put put = new Put(row);
        put.add(famName, row, row);
        htable1 = new HTable(conf1, tableName);
        htable1.put(put);
        htable1.close();
    }

    private static void doAssert(byte[] row) throws Exception {
        if (ReplicationEndpointForTest.lastEntries == null) {
            return;
        }
        Assert.assertEquals((long)1L, (long)ReplicationEndpointForTest.lastEntries.size());
        ArrayList kvs = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getKeyValues();
        Assert.assertEquals((long)1L, (long)kvs.size());
        Assert.assertTrue((boolean)Bytes.equals((byte[])((KeyValue)kvs.get(0)).getRowArray(), (int)((KeyValue)kvs.get(0)).getRowOffset(), (int)((KeyValue)kvs.get(0)).getRowLength(), (byte[])row, (int)0, (int)row.length));
    }

    public static class ReplicationEndpointWithWALEntryFilter
    extends ReplicationEndpointForTest {
        static AtomicReference<Exception> ex = new AtomicReference<Object>(null);

        @Override
        public boolean replicate(ReplicationEndpoint.ReplicateContext replicateContext) {
            try {
                super.replicate(replicateContext);
                TestReplicationEndpoint.doAssert(TestReplicationBase.row);
            }
            catch (Exception e) {
                ex.set(e);
            }
            return true;
        }

        public WALEntryFilter getWALEntryfilter() {
            return new ChainWALEntryFilter(new WALEntryFilter[]{super.getWALEntryfilter(), new WALEntryFilter(){

                public HLog.Entry filter(HLog.Entry entry) {
                    ArrayList kvs = entry.getEdit().getKeyValues();
                    int size = kvs.size();
                    for (int i = size - 1; i >= 0; --i) {
                        KeyValue kv = (KeyValue)kvs.get(i);
                        if (Bytes.equals((byte[])kv.getRowArray(), (int)kv.getRowOffset(), (int)kv.getRowLength(), (byte[])TestReplicationBase.row, (int)0, (int)TestReplicationBase.row.length)) continue;
                        kvs.remove(i);
                    }
                    return entry;
                }
            }});
        }
    }

    public static class ReplicationEndpointReturningFalse
    extends ReplicationEndpointForTest {
        static AtomicReference<Exception> ex = new AtomicReference<Object>(null);
        static AtomicBoolean replicated = new AtomicBoolean(false);

        @Override
        public boolean replicate(ReplicationEndpoint.ReplicateContext replicateContext) {
            try {
                TestReplicationEndpoint.doAssert(TestReplicationBase.row);
            }
            catch (Exception e) {
                ex.set(e);
            }
            super.replicate(replicateContext);
            replicated.set(replicateCount.get() > 10);
            return replicated.get();
        }
    }

    public static class ReplicationEndpointForTest
    extends BaseReplicationEndpoint {
        static UUID uuid = UUID.randomUUID();
        static AtomicInteger contructedCount = new AtomicInteger();
        static AtomicInteger startedCount = new AtomicInteger();
        static AtomicInteger stoppedCount = new AtomicInteger();
        static AtomicInteger replicateCount = new AtomicInteger();
        static volatile List<HLog.Entry> lastEntries = null;

        public ReplicationEndpointForTest() {
            contructedCount.incrementAndGet();
        }

        public UUID getPeerUUID() {
            return uuid;
        }

        public boolean replicate(ReplicationEndpoint.ReplicateContext replicateContext) {
            replicateCount.incrementAndGet();
            lastEntries = replicateContext.entries;
            return true;
        }

        protected void doStart() {
            startedCount.incrementAndGet();
            this.notifyStarted();
        }

        protected void doStop() {
            stoppedCount.incrementAndGet();
            this.notifyStopped();
        }
    }
}

