package org.infinispan.scattered.statetransfer;

import java.util.Collections;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.remote.ClusteredGetAllCommand;
import org.infinispan.commands.remote.ClusteredGetCommand;
import org.infinispan.commands.write.InvalidateVersionsCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.distribution.BlockingInterceptor;
import org.infinispan.statetransfer.StateTransferInterceptor;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.util.ControlledConsistentHashFactory;
import org.infinispan.util.ControlledRpcManager;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@CleanupAfterMethod
@Test(groups = {"functional"}, testName = "scattered.statetransfer.PrefetchTest")
/* loaded from: input_file:org/infinispan/scattered/statetransfer/PrefetchTest.class */
public class PrefetchTest extends MultipleCacheManagersTest {
    ControlledConsistentHashFactory chf;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/infinispan/scattered/statetransfer/PrefetchTest$BlockingAfter.class */
    public static class BlockingAfter extends BlockingInterceptor<PutKeyValueCommand> {
        public BlockingAfter(CyclicBarrier cyclicBarrier) {
            super(cyclicBarrier, true, true, (Predicate<VisitableCommand>) visitableCommand -> {
                return PrefetchTest.isStateTransferPut(visitableCommand);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/infinispan/scattered/statetransfer/PrefetchTest$BlockingBefore.class */
    public static class BlockingBefore extends BlockingInterceptor<PutKeyValueCommand> {
        public BlockingBefore(CyclicBarrier cyclicBarrier) {
            super(cyclicBarrier, false, true, (Predicate<VisitableCommand>) visitableCommand -> {
                return PrefetchTest.isStateTransferPut(visitableCommand);
            });
        }
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
        this.chf = new ControlledConsistentHashFactory.Scattered(0);
        configurationBuilder.clustering().cacheMode(CacheMode.SCATTERED_SYNC).hash().numSegments(1).consistentHashFactory(this.chf);
        createCluster(configurationBuilder, 3);
        waitForClusterToForm();
    }

    public void testPrefetch00() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
        testPrefetch(0, 0);
    }

    public void testPrefetch01() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
        testPrefetch(0, 1);
    }

    public void testPrefetch02() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
        testPrefetch(0, 2);
    }

    public void testPrefetch11() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
        testPrefetch(1, 1);
    }

    public void testPrefetch12() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
        testPrefetch(1, 2);
    }

    private void testPrefetch(int i, int i2) throws InterruptedException, TimeoutException, BrokenBarrierException, ExecutionException {
        cache(1).put("key", "v0");
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        CyclicBarrier cyclicBarrier2 = new CyclicBarrier(2);
        BlockingBefore blockingBefore = new BlockingBefore(cyclicBarrier);
        BlockingAfter blockingAfter = new BlockingAfter(cyclicBarrier2);
        cache(2).getAdvancedCache().getAsyncInterceptorChain().addInterceptorBefore(blockingBefore, StateTransferInterceptor.class);
        cache(2).getAdvancedCache().getAsyncInterceptorChain().addInterceptorBefore(blockingAfter, StateTransferInterceptor.class);
        this.chf.setOwnerIndexes(1, new int[0]);
        cache(1).stop();
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        ControlledRpcManager replaceRpcManager = ControlledRpcManager.replaceRpcManager(cache(2));
        replaceRpcManager.excludeCommands(ClusteredGetAllCommand.class, InvalidateVersionsCommand.class, PutKeyValueCommand.class, PutMapCommand.class);
        if (i > 0) {
            cyclicBarrier.await(10L, TimeUnit.SECONDS);
            cyclicBarrier2.await(10L, TimeUnit.SECONDS);
            cyclicBarrier2.await(10L, TimeUnit.SECONDS);
            cyclicBarrier.await(10L, TimeUnit.SECONDS);
        }
        blockingBefore.suspend(true);
        Future fork = fork(() -> {
            return cache(2).put("key", "v1");
        });
        ControlledRpcManager.BlockedRequest expectCommand = replaceRpcManager.expectCommand(ClusteredGetCommand.class);
        if (i > 0) {
            AssertJUnit.assertEquals(Collections.singleton(address(0)), expectCommand.getTargets());
        }
        ControlledRpcManager.SentRequest send = expectCommand.send();
        if (i2 > 0 && i == 0) {
            blockingBefore.suspend(false);
            cyclicBarrier.await(10L, TimeUnit.SECONDS);
            cyclicBarrier2.await(10L, TimeUnit.SECONDS);
            cyclicBarrier2.await(10L, TimeUnit.SECONDS);
        }
        if (i2 > 1) {
            if (i == 0) {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
            }
            cyclicBarrier.await(10L, TimeUnit.SECONDS);
            cyclicBarrier2.await(10L, TimeUnit.SECONDS);
            cyclicBarrier2.await(10L, TimeUnit.SECONDS);
        }
        blockingAfter.suspend(true);
        send.receiveAll();
        AssertJUnit.assertEquals("v0", fork.get(10L, TimeUnit.SECONDS));
        replaceRpcManager.stopBlocking();
        cyclicBarrier.reset();
        cyclicBarrier2.reset();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean isStateTransferPut(VisitableCommand visitableCommand) {
        return (visitableCommand instanceof PutKeyValueCommand) && ((PutKeyValueCommand) visitableCommand).hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER);
    }
}
