/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.client;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Action;
import org.apache.hadoop.hbase.client.AsyncProcess;
import org.apache.hadoop.hbase.client.BufferedMutatorImpl;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionManager;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.MultiAction;
import org.apache.hadoop.hbase.client.MultiResponse;
import org.apache.hadoop.hbase.client.MultiServerCallable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.NonceGenerator;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.RetryingCallable;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.Timeout;
import org.mockito.Mockito;

@Category(value={MediumTests.class})
public class TestAsyncProcess {
    private static final TableName DUMMY_TABLE = TableName.valueOf((String)"DUMMY_TABLE");
    private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes();
    private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes();
    private static final byte[] DUMMY_BYTES_3 = "DUMMY_BYTES_3".getBytes();
    private static final byte[] FAILS = "FAILS".getBytes();
    private static final Configuration conf = new Configuration();
    private static ServerName sn = ServerName.valueOf((String)"s1:1,1");
    private static ServerName sn2 = ServerName.valueOf((String)"s2:2,2");
    private static ServerName sn3 = ServerName.valueOf((String)"s3:3,3");
    private static HRegionInfo hri1 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1L);
    private static HRegionInfo hri2 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2L);
    private static HRegionInfo hri3 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3L);
    private static HRegionLocation loc1 = new HRegionLocation(hri1, sn);
    private static HRegionLocation loc2 = new HRegionLocation(hri2, sn);
    private static HRegionLocation loc3 = new HRegionLocation(hri3, sn2);
    private static HRegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica((HRegionInfo)hri1, (int)1);
    private static HRegionInfo hri1r2 = RegionReplicaUtil.getRegionInfoForReplica((HRegionInfo)hri1, (int)2);
    private static HRegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica((HRegionInfo)hri2, (int)1);
    private static RegionLocations hrls1 = new RegionLocations(new HRegionLocation[]{new HRegionLocation(hri1, sn), new HRegionLocation(hri1r1, sn2), new HRegionLocation(hri1r2, sn3)});
    private static RegionLocations hrls2 = new RegionLocations(new HRegionLocation[]{new HRegionLocation(hri2, sn2), new HRegionLocation(hri2r1, sn3)});
    private static RegionLocations hrls3 = new RegionLocations(new HRegionLocation[]{new HRegionLocation(hri3, sn3), null});
    private static final String success = "success";
    private static Exception failure = new Exception("failure");
    private static int NB_RETRIES = 3;
    @Rule
    public Timeout timeout = new Timeout(10000);

    @BeforeClass
    public static void beforeClass() {
        conf.setInt("hbase.client.retries.number", NB_RETRIES);
    }

    static MultiResponse createMultiResponse(MultiAction<Row> multi, AtomicInteger nbMultiResponse, AtomicInteger nbActions, ResponseGenerator gen) {
        MultiResponse mr = new MultiResponse();
        nbMultiResponse.incrementAndGet();
        for (Map.Entry entry : multi.actions.entrySet()) {
            byte[] regionName = (byte[])entry.getKey();
            for (Action a : (List)entry.getValue()) {
                nbActions.incrementAndGet();
                gen.addResponse(mr, regionName, (Action<Row>)a);
            }
        }
        return mr;
    }

    @Test
    public void testSubmit() throws Exception {
        ClusterConnection hc = TestAsyncProcess.createHConnection();
        MyAsyncProcess ap = new MyAsyncProcess(hc, conf);
        ArrayList<Put> puts = new ArrayList<Put>();
        puts.add(this.createPut(1, true));
        ap.submit(DUMMY_TABLE, puts, false, null, false);
        Assert.assertTrue((boolean)puts.isEmpty());
    }

    @Test
    public void testSubmitWithCB() throws Exception {
        ClusterConnection hc = TestAsyncProcess.createHConnection();
        final AtomicInteger updateCalled = new AtomicInteger(0);
        Batch.Callback<Object> cb = new Batch.Callback<Object>(){

            public void update(byte[] region, byte[] row, Object result) {
                updateCalled.incrementAndGet();
            }
        };
        MyAsyncProcess ap = new MyAsyncProcess(hc, conf);
        ArrayList<Put> puts = new ArrayList<Put>();
        puts.add(this.createPut(1, true));
        AsyncProcess.AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, (Batch.Callback)cb, false);
        Assert.assertTrue((boolean)puts.isEmpty());
        ars.waitUntilDone();
        Assert.assertEquals((long)updateCalled.get(), (long)1L);
    }

    @Test
    public void testSubmitBusyRegion() throws Exception {
        ClusterConnection hc = TestAsyncProcess.createHConnection();
        MyAsyncProcess ap = new MyAsyncProcess(hc, conf);
        ArrayList<Put> puts = new ArrayList<Put>();
        puts.add(this.createPut(1, true));
        ap.incTaskCounters(Arrays.asList(new byte[][]{hri1.getRegionName()}), sn);
        ap.submit(DUMMY_TABLE, puts, false, null, false);
        Assert.assertEquals((long)puts.size(), (long)1L);
        ap.decTaskCounters(Arrays.asList(new byte[][]{hri1.getRegionName()}), sn);
        ap.submit(DUMMY_TABLE, puts, false, null, false);
        Assert.assertEquals((long)0L, (long)puts.size());
    }

    @Test
    public void testSubmitBusyRegionServer() throws Exception {
        ClusterConnection hc = TestAsyncProcess.createHConnection();
        MyAsyncProcess ap = new MyAsyncProcess(hc, conf);
        ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer));
        ArrayList<Put> puts = new ArrayList<Put>();
        puts.add(this.createPut(1, true));
        puts.add(this.createPut(3, true));
        puts.add(this.createPut(1, true));
        puts.add(this.createPut(2, true));
        ap.submit(DUMMY_TABLE, puts, false, null, false);
        Assert.assertEquals((String)(" puts=" + puts), (long)1L, (long)puts.size());
        ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer - 1));
        ap.submit(DUMMY_TABLE, puts, false, null, false);
        Assert.assertTrue((boolean)puts.isEmpty());
    }

    @Test
    public void testFail() throws Exception {
        MyAsyncProcess ap = new MyAsyncProcess(TestAsyncProcess.createHConnection(), conf, false);
        ArrayList<Put> puts = new ArrayList<Put>();
        Put p = this.createPut(1, false);
        puts.add(p);
        AsyncProcess.AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
        Assert.assertEquals((long)0L, (long)puts.size());
        ars.waitUntilDone();
        this.verifyResult(ars, false);
        Assert.assertEquals((long)(NB_RETRIES + 1), (long)ap.callsCt.get());
        Assert.assertEquals((long)1L, (long)ars.getErrors().exceptions.size());
        Assert.assertTrue((String)("was: " + ars.getErrors().exceptions.get(0)), (boolean)failure.equals(ars.getErrors().exceptions.get(0)));
        Assert.assertTrue((String)("was: " + ars.getErrors().exceptions.get(0)), (boolean)failure.equals(ars.getErrors().exceptions.get(0)));
        Assert.assertEquals((long)1L, (long)ars.getFailedOperations().size());
        Assert.assertTrue((String)("was: " + ars.getFailedOperations().get(0)), (boolean)p.equals(ars.getFailedOperations().get(0)));
    }

    @Test
    public void testSubmitTrue() throws IOException {
        final MyAsyncProcess ap = new MyAsyncProcess(TestAsyncProcess.createHConnection(), conf, false);
        ap.tasksInProgress.incrementAndGet();
        final AtomicInteger ai = new AtomicInteger(1);
        ap.taskCounterPerRegion.put(hri1.getRegionName(), ai);
        final AtomicBoolean checkPoint = new AtomicBoolean(false);
        final AtomicBoolean checkPoint2 = new AtomicBoolean(false);
        Thread t = new Thread(){

            @Override
            public void run() {
                Threads.sleep((long)1000L);
                Assert.assertFalse((boolean)checkPoint.get());
                ai.decrementAndGet();
                ap.tasksInProgress.decrementAndGet();
                checkPoint2.set(true);
            }
        };
        ArrayList<Put> puts = new ArrayList<Put>();
        Put p = this.createPut(1, true);
        puts.add(p);
        ap.submit(DUMMY_TABLE, puts, false, null, false);
        Assert.assertFalse((boolean)puts.isEmpty());
        t.start();
        ap.submit(DUMMY_TABLE, puts, true, null, false);
        Assert.assertTrue((boolean)puts.isEmpty());
        checkPoint.set(true);
        while (!checkPoint2.get()) {
            Threads.sleep((long)1L);
        }
    }

    @Test
    public void testFailAndSuccess() throws Exception {
        MyAsyncProcess ap = new MyAsyncProcess(TestAsyncProcess.createHConnection(), conf, false);
        ArrayList<Put> puts = new ArrayList<Put>();
        puts.add(this.createPut(1, false));
        puts.add(this.createPut(1, true));
        puts.add(this.createPut(1, true));
        AsyncProcess.AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
        Assert.assertTrue((boolean)puts.isEmpty());
        ars.waitUntilDone();
        this.verifyResult(ars, false, true, true);
        Assert.assertEquals((long)(NB_RETRIES + 1), (long)ap.callsCt.get());
        ap.callsCt.set(0);
        Assert.assertEquals((long)1L, (long)ars.getErrors().actions.size());
        puts.add(this.createPut(1, true));
        ap.waitUntilDone();
        ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
        Assert.assertEquals((long)0L, (long)puts.size());
        ars.waitUntilDone();
        Assert.assertEquals((long)2L, (long)ap.callsCt.get());
        this.verifyResult(ars, true);
    }

    @Test
    public void testFlush() throws Exception {
        MyAsyncProcess ap = new MyAsyncProcess(TestAsyncProcess.createHConnection(), conf, false);
        ArrayList<Put> puts = new ArrayList<Put>();
        puts.add(this.createPut(1, false));
        puts.add(this.createPut(1, true));
        puts.add(this.createPut(1, true));
        AsyncProcess.AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
        ars.waitUntilDone();
        this.verifyResult(ars, false, true, true);
        Assert.assertEquals((long)(NB_RETRIES + 1), (long)ap.callsCt.get());
        Assert.assertEquals((long)1L, (long)ars.getFailedOperations().size());
    }

    @Test
    public void testMaxTask() throws Exception {
        final MyAsyncProcess ap = new MyAsyncProcess(TestAsyncProcess.createHConnection(), conf, false);
        for (int i = 0; i < 1000; ++i) {
            ap.incTaskCounters(Arrays.asList(new byte[][]{"dummy".getBytes()}), sn);
        }
        final Thread myThread = Thread.currentThread();
        Thread t = new Thread(){

            @Override
            public void run() {
                Threads.sleep((long)2000L);
                myThread.interrupt();
            }
        };
        ArrayList<Put> puts = new ArrayList<Put>();
        puts.add(this.createPut(1, true));
        t.start();
        try {
            ap.submit(DUMMY_TABLE, puts, false, null, false);
            Assert.fail((String)"We should have been interrupted.");
        }
        catch (InterruptedIOException expected) {
            // empty catch block
        }
        long sleepTime = 2000L;
        Thread t2 = new Thread(){

            @Override
            public void run() {
                Threads.sleep((long)2000L);
                while (ap.tasksInProgress.get() > 0L) {
                    ap.decTaskCounters(Arrays.asList(new byte[][]{"dummy".getBytes()}), sn);
                }
            }
        };
        t2.start();
        long start = System.currentTimeMillis();
        ap.submit(DUMMY_TABLE, new ArrayList(), false, null, false);
        long end = System.currentTimeMillis();
        Assert.assertTrue((start + 100L + 2000L > end ? 1 : 0) != 0);
    }

    private static ClusterConnection createHConnection() throws IOException {
        ClusterConnection hc = TestAsyncProcess.createHConnectionCommon();
        TestAsyncProcess.setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(new HRegionLocation[]{loc1}));
        TestAsyncProcess.setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(new HRegionLocation[]{loc2}));
        TestAsyncProcess.setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(new HRegionLocation[]{loc3}));
        TestAsyncProcess.setMockLocation(hc, FAILS, new RegionLocations(new HRegionLocation[]{loc2}));
        return hc;
    }

    private static ClusterConnection createHConnectionWithReplicas() throws IOException {
        ClusterConnection hc = TestAsyncProcess.createHConnectionCommon();
        TestAsyncProcess.setMockLocation(hc, DUMMY_BYTES_1, hrls1);
        TestAsyncProcess.setMockLocation(hc, DUMMY_BYTES_2, hrls2);
        TestAsyncProcess.setMockLocation(hc, DUMMY_BYTES_3, hrls3);
        return hc;
    }

    private static void setMockLocation(ClusterConnection hc, byte[] row, RegionLocations result) throws IOException {
        Mockito.when((Object)hc.locateRegion((TableName)Mockito.eq((Object)DUMMY_TABLE), (byte[])Mockito.eq((Object)row), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt())).thenReturn((Object)result);
    }

    private static ClusterConnection createHConnectionCommon() {
        ClusterConnection hc = (ClusterConnection)Mockito.mock(ClusterConnection.class);
        NonceGenerator ng = (NonceGenerator)Mockito.mock(NonceGenerator.class);
        Mockito.when((Object)ng.getNonceGroup()).thenReturn((Object)0L);
        Mockito.when((Object)hc.getNonceGenerator()).thenReturn((Object)ng);
        Mockito.when((Object)hc.getConfiguration()).thenReturn((Object)conf);
        return hc;
    }

    @Test
    public void testHTablePutSuccess() throws Exception {
        BufferedMutatorImpl ht = (BufferedMutatorImpl)Mockito.mock(BufferedMutatorImpl.class);
        ht.ap = new MyAsyncProcess(TestAsyncProcess.createHConnection(), conf, true);
        Put put = this.createPut(1, true);
        Assert.assertEquals((long)0L, (long)ht.getWriteBufferSize());
        ht.mutate((Mutation)put);
        Assert.assertEquals((long)0L, (long)ht.getWriteBufferSize());
    }

    private void doHTableFailedPut(boolean bufferOn) throws Exception {
        ClusterConnection conn = TestAsyncProcess.createHConnection();
        HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE));
        MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
        ht.mutator.ap = ap;
        if (bufferOn) {
            ht.setWriteBufferSize(0x100000L);
        } else {
            ht.setWriteBufferSize(0L);
        }
        Put put = this.createPut(1, false);
        Assert.assertEquals((long)0L, (long)ht.mutator.currentWriteBufferSize);
        try {
            ht.put(put);
            if (bufferOn) {
                ht.flushCommits();
            }
            Assert.fail();
        }
        catch (RetriesExhaustedException expected) {
            // empty catch block
        }
        Assert.assertEquals((long)0L, (long)ht.mutator.currentWriteBufferSize);
        AsyncProcess.AsyncRequestFuture ars = null;
        for (AsyncProcess.AsyncRequestFuture someReqs : ap.allReqs) {
            if (someReqs.getResults().length == 0) continue;
            Assert.assertTrue((ars == null ? 1 : 0) != 0);
            ars = someReqs;
        }
        Assert.assertTrue((ars != null ? 1 : 0) != 0);
        this.verifyResult(ars, false);
        ht.close();
    }

    @Test
    public void testHTableFailedPutWithBuffer() throws Exception {
        this.doHTableFailedPut(true);
    }

    @Test
    public void testHTableFailedPutWithoutBuffer() throws Exception {
        this.doHTableFailedPut(false);
    }

    @Test
    public void testHTableFailedPutAndNewPut() throws Exception {
        ClusterConnection conn = TestAsyncProcess.createHConnection();
        BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null, new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(0L));
        MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
        mutator.ap = ap;
        Put p = this.createPut(1, false);
        mutator.mutate((Mutation)p);
        ap.waitUntilDone();
        p = this.createPut(1, true);
        Assert.assertEquals((long)0L, (long)mutator.getWriteBuffer().size());
        try {
            mutator.mutate((Mutation)p);
            Assert.fail();
        }
        catch (RetriesExhaustedException expected) {
            // empty catch block
        }
        Assert.assertEquals((String)"the put should not been inserted.", (long)0L, (long)mutator.getWriteBuffer().size());
    }

    @Test
    public void testBatch() throws IOException, InterruptedException {
        MyConnectionImpl conn = new MyConnectionImpl(conf);
        HTable ht = new HTable((ClusterConnection)conn, new BufferedMutatorParams(DUMMY_TABLE));
        ht.multiAp = new MyAsyncProcess((ClusterConnection)conn, conf, false);
        ArrayList<Put> puts = new ArrayList<Put>();
        puts.add(this.createPut(1, true));
        puts.add(this.createPut(1, true));
        puts.add(this.createPut(1, true));
        puts.add(this.createPut(1, true));
        puts.add(this.createPut(1, false));
        puts.add(this.createPut(1, true));
        puts.add(this.createPut(1, false));
        Object[] res = new Object[puts.size()];
        try {
            ht.processBatch(puts, res);
            Assert.fail();
        }
        catch (RetriesExhaustedException expected) {
            // empty catch block
        }
        Assert.assertEquals((Object)res[0], (Object)success);
        Assert.assertEquals((Object)res[1], (Object)success);
        Assert.assertEquals((Object)res[2], (Object)success);
        Assert.assertEquals((Object)res[3], (Object)success);
        Assert.assertEquals((Object)res[4], (Object)failure);
        Assert.assertEquals((Object)res[5], (Object)success);
        Assert.assertEquals((Object)res[6], (Object)failure);
    }

    @Test
    public void testErrorsServers() throws IOException {
        Configuration configuration = new Configuration(conf);
        MyConnectionImpl conn = new MyConnectionImpl(configuration);
        BufferedMutatorImpl mutator = new BufferedMutatorImpl((ClusterConnection)conn, null, null, new BufferedMutatorParams(DUMMY_TABLE));
        configuration.setBoolean("hbase.client.retries.by.server", true);
        MyAsyncProcess ap = new MyAsyncProcess((ClusterConnection)conn, configuration, true);
        mutator.ap = ap;
        Assert.assertNotNull((Object)mutator.ap.createServerErrorTracker());
        Assert.assertTrue((mutator.ap.serverTrackerTimeout > 200 ? 1 : 0) != 0);
        mutator.ap.serverTrackerTimeout = 1;
        Put p = this.createPut(1, false);
        mutator.mutate((Mutation)p);
        try {
            mutator.flush();
            Assert.fail();
        }
        catch (RetriesExhaustedWithDetailsException expected) {
            // empty catch block
        }
        Assert.assertEquals((long)(NB_RETRIES + 1), (long)ap.callsCt.get());
    }

    @Test
    public void testGlobalErrors() throws IOException {
        MyConnectionImpl conn = new MyConnectionImpl(conf);
        BufferedMutatorImpl mutator = (BufferedMutatorImpl)conn.getBufferedMutator(DUMMY_TABLE);
        AsyncProcessWithFailure ap = new AsyncProcessWithFailure((ClusterConnection)conn, conf);
        mutator.ap = ap;
        Assert.assertNotNull((Object)mutator.ap.createServerErrorTracker());
        Put p = this.createPut(1, true);
        mutator.mutate((Mutation)p);
        try {
            mutator.flush();
            Assert.fail();
        }
        catch (RetriesExhaustedWithDetailsException expected) {
            // empty catch block
        }
        Assert.assertEquals((long)(NB_RETRIES + 1), (long)ap.callsCt.get());
    }

    @Test
    public void testThreadCreation() throws Exception {
        int NB_REGS = 100;
        ArrayList<HRegionLocation> hrls = new ArrayList<HRegionLocation>(100);
        ArrayList<Get> gets = new ArrayList<Get>(100);
        for (int i = 0; i < 100; ++i) {
            HRegionInfo hri = new HRegionInfo(DUMMY_TABLE, Bytes.toBytes((long)((long)i * 10L)), Bytes.toBytes((long)((long)i * 10L + 9L)), false, (long)i);
            HRegionLocation hrl = new HRegionLocation(hri, i % 2 == 0 ? sn : sn2);
            hrls.add(hrl);
            Get get = new Get(Bytes.toBytes((long)((long)i * 10L)));
            gets.add(get);
        }
        MyConnectionImpl2 con = new MyConnectionImpl2(hrls);
        HTable ht = new HTable((ClusterConnection)con, new BufferedMutatorParams(DUMMY_TABLE));
        MyAsyncProcess ap = new MyAsyncProcess((ClusterConnection)con, conf, con.nbThreads);
        ht.multiAp = ap;
        ht.batch(gets, new Object[gets.size()]);
        Assert.assertEquals((long)ap.nbActions.get(), (long)100L);
        Assert.assertEquals((String)"1 multi response per server", (long)2L, (long)ap.nbMultiResponse.get());
        Assert.assertEquals((String)"1 thread per server", (long)2L, (long)con.nbThreads.get());
        int nbReg = 0;
        for (int i = 0; i < 100; ++i) {
            if (!con.usedRegions[i]) continue;
            ++nbReg;
        }
        Assert.assertEquals((String)("nbReg=" + nbReg), (long)nbReg, (long)100L);
    }

    @Test
    public void testReplicaReplicaSuccess() throws Exception {
        MyAsyncProcessWithReplicas ap = this.createReplicaAp(10, 1000, 0);
        List<Get> rows = TestAsyncProcess.makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
        AsyncProcess.AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]);
        this.verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE);
        Assert.assertEquals((long)2L, (long)ap.getReplicaCallCount());
    }

    @Test
    public void testReplicaPrimarySuccessWoReplicaCalls() throws Exception {
        MyAsyncProcessWithReplicas ap = this.createReplicaAp(1000, 10, 0);
        List<Get> rows = TestAsyncProcess.makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
        AsyncProcess.AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]);
        this.verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE);
        Assert.assertEquals((long)0L, (long)ap.getReplicaCallCount());
    }

    @Test
    public void testReplicaParallelCallsSucceed() throws Exception {
        MyAsyncProcessWithReplicas ap = this.createReplicaAp(0, 0, 0);
        List<Get> rows = TestAsyncProcess.makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
        AsyncProcess.AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
        this.verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE);
        long replicaCalls = ap.getReplicaCallCount();
        Assert.assertTrue((replicaCalls >= 0L ? 1 : 0) != 0);
        Assert.assertTrue((replicaCalls <= 2L ? 1 : 0) != 0);
    }

    @Test
    public void testReplicaPartialReplicaCall() throws Exception {
        MyAsyncProcessWithReplicas ap = this.createReplicaAp(1000, 0, 0);
        ap.setPrimaryCallDelay(sn2, 2000L);
        List<Get> rows = TestAsyncProcess.makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
        AsyncProcess.AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
        this.verifyReplicaResult(ars, RR.FALSE, RR.TRUE);
        Assert.assertEquals((long)1L, (long)ap.getReplicaCallCount());
    }

    @Test
    public void testReplicaMainFailsBeforeReplicaCalls() throws Exception {
        MyAsyncProcessWithReplicas ap = this.createReplicaAp(1000, 0, 0, 1);
        ap.addFailures(hri1, hri2);
        List<Get> rows = TestAsyncProcess.makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
        AsyncProcess.AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
        this.verifyReplicaResult(ars, RR.FAILED, RR.FAILED);
        Assert.assertEquals((long)0L, (long)ap.getReplicaCallCount());
    }

    @Test
    public void testReplicaReplicaSuccessWithParallelFailures() throws Exception {
        MyAsyncProcessWithReplicas ap = this.createReplicaAp(0, 1000, 1000, 1);
        ap.addFailures(hri1, hri1r2, hri2);
        List<Get> rows = TestAsyncProcess.makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
        AsyncProcess.AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
        this.verifyReplicaResult(ars, RR.TRUE, RR.TRUE);
        Assert.assertEquals((long)2L, (long)ap.getReplicaCallCount());
    }

    @Test
    public void testReplicaAllCallsFailForOneRegion() throws Exception {
        MyAsyncProcessWithReplicas ap = this.createReplicaAp(500, 1000, 0, 1);
        ap.addFailures(hri1, hri1r1, hri1r2, hri2r1);
        List<Get> rows = TestAsyncProcess.makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
        AsyncProcess.AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
        this.verifyReplicaResult(ars, RR.FAILED, RR.FALSE);
        Assert.assertEquals((long)3L, (long)ars.getErrors().getNumExceptions());
        for (int i = 0; i < ars.getErrors().getNumExceptions(); ++i) {
            Assert.assertArrayEquals((byte[])DUMMY_BYTES_1, (byte[])ars.getErrors().getRow(i).getRow());
        }
    }

    private MyAsyncProcessWithReplicas createReplicaAp(int replicaAfterMs, int primaryMs, int replicaMs) throws Exception {
        return this.createReplicaAp(replicaAfterMs, primaryMs, replicaMs, -1);
    }

    private MyAsyncProcessWithReplicas createReplicaAp(int replicaAfterMs, int primaryMs, int replicaMs, int retries) throws Exception {
        Configuration conf = new Configuration();
        ClusterConnection conn = TestAsyncProcess.createHConnectionWithReplicas();
        conf.setInt("hbase.client.primaryCallTimeout.multiget", replicaAfterMs * 1000);
        if (retries > 0) {
            conf.setInt("hbase.client.retries.number", retries);
        }
        MyAsyncProcessWithReplicas ap = new MyAsyncProcessWithReplicas(conn, conf);
        ap.setCallDelays(primaryMs, replicaMs);
        return ap;
    }

    private static List<Get> makeTimelineGets(byte[] ... rows) {
        ArrayList<Get> result = new ArrayList<Get>();
        for (byte[] row : rows) {
            Get get = new Get(row);
            get.setConsistency(Consistency.TIMELINE);
            result.add(get);
        }
        return result;
    }

    private void verifyResult(AsyncProcess.AsyncRequestFuture ars, boolean ... expected) throws Exception {
        Object[] actual = ars.getResults();
        Assert.assertEquals((long)expected.length, (long)actual.length);
        for (int i = 0; i < expected.length; ++i) {
            Assert.assertEquals((Object)expected[i], (Object)(!(actual[i] instanceof Throwable) ? 1 : 0));
        }
    }

    private void verifyReplicaResult(AsyncProcess.AsyncRequestFuture ars, RR ... expecteds) throws Exception {
        Object[] actuals = ars.getResults();
        Assert.assertEquals((long)expecteds.length, (long)actuals.length);
        for (int i = 0; i < expecteds.length; ++i) {
            Object actual = actuals[i];
            RR expected = expecteds[i];
            Assert.assertEquals((String)actual.toString(), (Object)(expected == RR.FAILED ? 1 : 0), (Object)(actual instanceof Throwable));
            if (expected == RR.FAILED || expected == RR.DONT_CARE) continue;
            Assert.assertEquals((Object)(expected == RR.TRUE ? 1 : 0), (Object)((Result)actual).isStale());
        }
    }

    private Put createPut(int regCnt, boolean success) {
        Put p;
        if (!success) {
            p = new Put(FAILS);
        } else {
            switch (regCnt) {
                case 1: {
                    p = new Put(DUMMY_BYTES_1);
                    break;
                }
                case 2: {
                    p = new Put(DUMMY_BYTES_2);
                    break;
                }
                case 3: {
                    p = new Put(DUMMY_BYTES_3);
                    break;
                }
                default: {
                    throw new IllegalArgumentException("unknown " + regCnt);
                }
            }
        }
        p.add(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
        return p;
    }

    private static enum RR {
        TRUE,
        FALSE,
        DONT_CARE,
        FAILED;

    }

    static class MyConnectionImpl2
    extends MyConnectionImpl {
        List<HRegionLocation> hrl;
        final boolean[] usedRegions;

        protected MyConnectionImpl2(List<HRegionLocation> hrl) {
            super(conf);
            this.hrl = hrl;
            this.usedRegions = new boolean[hrl.size()];
        }

        @Override
        public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException {
            int i = 0;
            for (HRegionLocation hr : this.hrl) {
                if (Arrays.equals(row, hr.getRegionInfo().getStartKey())) {
                    this.usedRegions[i] = true;
                    return new RegionLocations(new HRegionLocation[]{hr});
                }
                ++i;
            }
            return null;
        }
    }

    static class MyConnectionImpl
    extends ConnectionManager.HConnectionImplementation {
        final AtomicInteger nbThreads = new AtomicInteger(0);

        protected MyConnectionImpl(Configuration conf) {
            super(conf);
        }

        public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException {
            return new RegionLocations(new HRegionLocation[]{loc1});
        }
    }

    private static interface ResponseGenerator {
        public void addResponse(MultiResponse var1, byte[] var2, Action<Row> var3);
    }

    class MyAsyncProcessWithReplicas
    extends MyAsyncProcess {
        private Set<byte[]> failures;
        private long primarySleepMs;
        private long replicaSleepMs;
        private Map<ServerName, Long> customPrimarySleepMs;
        private final AtomicLong replicaCalls;

        public void addFailures(HRegionInfo ... hris) {
            for (HRegionInfo hri : hris) {
                this.failures.add(hri.getRegionName());
            }
        }

        public long getReplicaCallCount() {
            return this.replicaCalls.get();
        }

        public void setPrimaryCallDelay(ServerName server, long primaryMs) {
            this.customPrimarySleepMs.put(server, primaryMs);
        }

        public MyAsyncProcessWithReplicas(ClusterConnection hc, Configuration conf) {
            super(hc, conf);
            this.failures = new TreeSet<byte[]>((Comparator<byte[]>)new Bytes.ByteArrayComparator());
            this.primarySleepMs = 0L;
            this.replicaSleepMs = 0L;
            this.customPrimarySleepMs = new HashMap<ServerName, Long>();
            this.replicaCalls = new AtomicLong(0L);
        }

        public void setCallDelays(long primaryMs, long replicaMs) {
            this.primarySleepMs = primaryMs;
            this.replicaSleepMs = replicaMs;
        }

        @Override
        protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
            final MultiResponse mr = TestAsyncProcess.createMultiResponse((MultiAction<Row>)callable.getMulti(), this.nbMultiResponse, this.nbActions, new ResponseGenerator(){

                @Override
                public void addResponse(MultiResponse mr, byte[] regionName, Action<Row> a) {
                    if (MyAsyncProcessWithReplicas.this.failures.contains(regionName)) {
                        mr.add(regionName, a.getOriginalIndex(), (Object)failure);
                    } else {
                        boolean isStale = !RegionReplicaUtil.isDefaultReplica((int)a.getReplicaId());
                        mr.add(regionName, a.getOriginalIndex(), (Object)Result.create((Cell[])new Cell[0], null, (boolean)isStale));
                    }
                }
            });
            final boolean isDefault = RegionReplicaUtil.isDefaultReplica((int)((Action)((List)callable.getMulti().actions.values().iterator().next()).iterator().next()).getReplicaId());
            final ServerName server = callable.getServerName();
            String debugMsg = "Call to " + server + ", primary=" + isDefault + " with " + callable.getMulti().actions.size() + " entries: ";
            for (byte[] region : callable.getMulti().actions.keySet()) {
                debugMsg = debugMsg + "[" + Bytes.toStringBinary((byte[])region) + "], ";
            }
            LOG.debug((Object)debugMsg);
            if (!isDefault) {
                this.replicaCalls.incrementAndGet();
            }
            return new RpcRetryingCaller<MultiResponse>(100L, 10, 9){

                public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout) throws IOException, RuntimeException {
                    Long customSleep;
                    long sleep = -1L;
                    sleep = isDefault ? ((customSleep = (Long)MyAsyncProcessWithReplicas.this.customPrimarySleepMs.get(server)) == null ? MyAsyncProcessWithReplicas.this.primarySleepMs : customSleep) : MyAsyncProcessWithReplicas.this.replicaSleepMs;
                    if (sleep != 0L) {
                        try {
                            Thread.sleep(sleep);
                        }
                        catch (InterruptedException e) {
                            // empty catch block
                        }
                    }
                    return mr;
                }
            };
        }
    }

    static class AsyncProcessWithFailure
    extends MyAsyncProcess {
        public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf) {
            super(hc, conf, true);
            this.serverTrackerTimeout = 1;
        }

        @Override
        protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
            this.callsCt.incrementAndGet();
            return new CallerWithFailure();
        }
    }

    static class CallerWithFailure
    extends RpcRetryingCaller<MultiResponse> {
        public CallerWithFailure() {
            super(100L, 100, 9);
        }

        public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout) throws IOException, RuntimeException {
            throw new IOException("test");
        }
    }

    static class MyAsyncProcess
    extends AsyncProcess {
        final AtomicInteger nbMultiResponse = new AtomicInteger();
        final AtomicInteger nbActions = new AtomicInteger();
        public List<AsyncProcess.AsyncRequestFuture> allReqs = new ArrayList<AsyncProcess.AsyncRequestFuture>();
        public AtomicInteger callsCt = new AtomicInteger();

        protected <Res> AsyncProcess.AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool, Batch.Callback<Res> callback, Object[] results, boolean needResults) {
            AsyncProcess.AsyncRequestFutureImpl r = super.createAsyncRequestFuture(DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults);
            this.allReqs.add((AsyncProcess.AsyncRequestFuture)r);
            this.callsCt.incrementAndGet();
            return r;
        }

        public MyAsyncProcess(ClusterConnection hc, Configuration conf) {
            this(hc, conf, new AtomicInteger());
        }

        public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
            super(hc, conf, (ExecutorService)new ThreadPoolExecutor(1, 20, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)), new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf));
        }

        public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors) {
            super(hc, conf, (ExecutorService)new ThreadPoolExecutor(1, 20, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())), new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
        }

        public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors, boolean dummy) {
            super(hc, conf, (ExecutorService)new ThreadPoolExecutor(1, 20, 60L, TimeUnit.SECONDS, (BlockingQueue)new SynchronousQueue(), (ThreadFactory)new CountingThreadFactory(new AtomicInteger())){

                @Override
                public void execute(Runnable command) {
                    throw new RejectedExecutionException("test under failure");
                }
            }, new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
        }

        public <Res> AsyncProcess.AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows, boolean atLeastOne, Batch.Callback<Res> callback, boolean needResults) throws InterruptedIOException {
            return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true);
        }

        protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
            this.callsCt.incrementAndGet();
            final MultiResponse mr = TestAsyncProcess.createMultiResponse((MultiAction<Row>)callable.getMulti(), this.nbMultiResponse, this.nbActions, new ResponseGenerator(){

                @Override
                public void addResponse(MultiResponse mr, byte[] regionName, Action<Row> a) {
                    if (Arrays.equals(FAILS, a.getAction().getRow())) {
                        mr.add(regionName, a.getOriginalIndex(), (Object)failure);
                    } else {
                        mr.add(regionName, a.getOriginalIndex(), (Object)TestAsyncProcess.success);
                    }
                }
            });
            return new RpcRetryingCaller<MultiResponse>(100L, 10, 9){

                public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout) throws IOException, RuntimeException {
                    try {
                        Thread.sleep(1000L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    return mr;
                }
            };
        }
    }

    static class CountingThreadFactory
    implements ThreadFactory {
        final AtomicInteger nbThreads;
        ThreadFactory realFactory = Threads.newDaemonThreadFactory((String)"test-TestAsyncProcess");

        @Override
        public Thread newThread(Runnable r) {
            this.nbThreads.incrementAndGet();
            return this.realFactory.newThread(r);
        }

        CountingThreadFactory(AtomicInteger nbThreads) {
            this.nbThreads = nbThreads;
        }
    }
}

