/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.scattered.statetransfer;

import java.util.Collections;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.remote.ClusteredGetAllCommand;
import org.infinispan.commands.remote.ClusteredGetCommand;
import org.infinispan.commands.write.InvalidateVersionsCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.distribution.BlockingInterceptor;
import org.infinispan.distribution.ch.ConsistentHashFactory;
import org.infinispan.interceptors.AsyncInterceptor;
import org.infinispan.statetransfer.StateTransferInterceptor;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.util.ControlledConsistentHashFactory;
import org.infinispan.util.ControlledRpcManager;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="scattered.statetransfer.PrefetchTest")
@CleanupAfterMethod
public class PrefetchTest
extends MultipleCacheManagersTest {
    ControlledConsistentHashFactory chf;

    @Override
    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder cb = new ConfigurationBuilder();
        this.chf = new ControlledConsistentHashFactory.Scattered(0);
        cb.clustering().cacheMode(CacheMode.SCATTERED_SYNC).hash().numSegments(1).consistentHashFactory((ConsistentHashFactory)this.chf);
        this.createCluster(cb, 3);
        this.waitForClusterToForm();
    }

    public void testPrefetch00() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
        this.testPrefetch(0, 0);
    }

    public void testPrefetch01() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
        this.testPrefetch(0, 1);
    }

    public void testPrefetch02() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
        this.testPrefetch(0, 2);
    }

    public void testPrefetch11() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
        this.testPrefetch(1, 1);
    }

    public void testPrefetch12() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
        this.testPrefetch(1, 2);
    }

    private void testPrefetch(int invokePhase, int receivePhase) throws InterruptedException, TimeoutException, BrokenBarrierException, ExecutionException {
        this.cache(1).put((Object)"key", (Object)"v0");
        CyclicBarrier beforeBarrier = new CyclicBarrier(2);
        CyclicBarrier afterBarrier = new CyclicBarrier(2);
        BlockingBefore beforeInterceptor = new BlockingBefore(beforeBarrier);
        BlockingAfter afterInterceptor = new BlockingAfter(afterBarrier);
        this.cache(2).getAdvancedCache().getAsyncInterceptorChain().addInterceptorBefore((AsyncInterceptor)beforeInterceptor, StateTransferInterceptor.class);
        this.cache(2).getAdvancedCache().getAsyncInterceptorChain().addInterceptorBefore((AsyncInterceptor)afterInterceptor, StateTransferInterceptor.class);
        this.chf.setOwnerIndexes(1, new int[0]);
        this.cache(1).stop();
        beforeBarrier.await(10L, TimeUnit.SECONDS);
        ControlledRpcManager controlledRpcManager = ControlledRpcManager.replaceRpcManager(this.cache(2));
        controlledRpcManager.excludeCommands(ClusteredGetAllCommand.class, InvalidateVersionsCommand.class, PutKeyValueCommand.class, PutMapCommand.class);
        if (invokePhase > 0) {
            beforeBarrier.await(10L, TimeUnit.SECONDS);
            afterBarrier.await(10L, TimeUnit.SECONDS);
            afterBarrier.await(10L, TimeUnit.SECONDS);
            beforeBarrier.await(10L, TimeUnit.SECONDS);
        }
        beforeInterceptor.suspend(true);
        Future<Object> future = this.fork(() -> this.cache(2).put((Object)"key", (Object)"v1"));
        ControlledRpcManager.BlockedRequest<ClusteredGetCommand> blockedPrefetch = controlledRpcManager.expectCommand(ClusteredGetCommand.class);
        if (invokePhase > 0) {
            AssertJUnit.assertEquals(Collections.singleton(this.address(0)), blockedPrefetch.getTargets());
        }
        ControlledRpcManager.SentRequest remotePrefetch = blockedPrefetch.send();
        if (receivePhase > 0 && invokePhase == 0) {
            beforeInterceptor.suspend(false);
            beforeBarrier.await(10L, TimeUnit.SECONDS);
            afterBarrier.await(10L, TimeUnit.SECONDS);
            afterBarrier.await(10L, TimeUnit.SECONDS);
        }
        if (receivePhase > 1) {
            if (invokePhase == 0) {
                beforeBarrier.await(10L, TimeUnit.SECONDS);
            }
            beforeBarrier.await(10L, TimeUnit.SECONDS);
            afterBarrier.await(10L, TimeUnit.SECONDS);
            afterBarrier.await(10L, TimeUnit.SECONDS);
        }
        afterInterceptor.suspend(true);
        remotePrefetch.receiveAll();
        AssertJUnit.assertEquals((Object)"v0", (Object)future.get(10L, TimeUnit.SECONDS));
        controlledRpcManager.stopBlocking();
        beforeBarrier.reset();
        afterBarrier.reset();
    }

    private static boolean isStateTransferPut(VisitableCommand command) {
        return command instanceof PutKeyValueCommand && ((PutKeyValueCommand)command).hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER);
    }

    static class BlockingAfter
    extends BlockingInterceptor<PutKeyValueCommand> {
        public BlockingAfter(CyclicBarrier barrier) {
            super(barrier, true, true, x$0 -> PrefetchTest.isStateTransferPut(x$0));
        }
    }

    static class BlockingBefore
    extends BlockingInterceptor<PutKeyValueCommand> {
        public BlockingBefore(CyclicBarrier barrier) {
            super(barrier, false, true, x$0 -> PrefetchTest.isStateTransferPut(x$0));
        }
    }
}

