/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.distribution.rehash;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.distribution.BlockingInterceptor;
import org.infinispan.distribution.ch.ConsistentHashFactory;
import org.infinispan.distribution.rehash.DistributionRehashSCI;
import org.infinispan.interceptors.impl.EntryWrappingInterceptor;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.partitionhandling.AvailabilityMode;
import org.infinispan.protostream.annotations.ProtoName;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.test.op.TestWriteOperation;
import org.infinispan.topology.CacheTopology;
import org.infinispan.topology.LocalTopologyManager;
import org.infinispan.transaction.TransactionMode;
import org.infinispan.util.BaseControlledConsistentHashFactory;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="distribution.rehash.NonTxPrimaryOwnerBecomingNonOwnerTest")
@CleanupAfterMethod
public class NonTxPrimaryOwnerBecomingNonOwnerTest
extends MultipleCacheManagersTest {
    @Override
    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder c = this.getConfigurationBuilder();
        this.createCluster(DistributionRehashSCI.INSTANCE, c, 2);
        this.waitForClusterToForm();
    }

    private ConfigurationBuilder getConfigurationBuilder() {
        ConfigurationBuilder c = new ConfigurationBuilder();
        c.clustering().cacheMode(CacheMode.DIST_SYNC);
        c.clustering().hash().numSegments(1).consistentHashFactory((ConsistentHashFactory)new CustomConsistentHashFactory());
        c.transaction().transactionMode(TransactionMode.NON_TRANSACTIONAL);
        return c;
    }

    public void testPrimaryOwnerChangingDuringPut() throws Exception {
        this.doTest(TestWriteOperation.PUT_CREATE);
    }

    public void testPrimaryOwnerChangingDuringPutIfAbsent() throws Exception {
        this.doTest(TestWriteOperation.PUT_IF_ABSENT);
    }

    public void testPrimaryOwnerChangingDuringReplace() throws Exception {
        this.doTest(TestWriteOperation.REPLACE);
    }

    public void testPrimaryOwnerChangingDuringReplaceExact() throws Exception {
        this.doTest(TestWriteOperation.REPLACE_EXACT);
    }

    public void testPrimaryOwnerChangingDuringRemove() throws Exception {
        this.doTest(TestWriteOperation.REMOVE);
    }

    public void testPrimaryOwnerChangingDuringRemoveExact() throws Exception {
        this.doTest(TestWriteOperation.REMOVE_EXACT);
    }

    private void doTest(TestWriteOperation op) throws Exception {
        String key = "testkey";
        String cacheName = (String)this.manager(0).getCacheManagerConfiguration().defaultCacheName().get();
        if (op.getPreviousValue() != null) {
            this.cache(0, cacheName).put((Object)"testkey", op.getPreviousValue());
        }
        CheckPoint checkPoint = new CheckPoint();
        LocalTopologyManager ltm0 = TestingUtil.extractGlobalComponent((CacheContainer)this.manager(0), LocalTopologyManager.class);
        int preJoinTopologyId = ltm0.getCacheTopology(cacheName).getTopologyId();
        int joinTopologyId = preJoinTopologyId + 1;
        int stateReceivedTopologyId = joinTopologyId + 1;
        AdvancedCache cache0 = this.advancedCache(0);
        this.addBlockingLocalTopologyManager(this.manager(0), checkPoint, joinTopologyId, stateReceivedTopologyId);
        AdvancedCache cache1 = this.advancedCache(1);
        this.addBlockingLocalTopologyManager(this.manager(1), checkPoint, joinTopologyId, stateReceivedTopologyId);
        ConfigurationBuilder c = this.getConfigurationBuilder();
        c.clustering().stateTransfer().awaitInitialTransfer(false);
        this.addClusterEnabledCacheManager(DistributionRehashSCI.INSTANCE, c);
        this.addBlockingLocalTopologyManager(this.manager(2), checkPoint, joinTopologyId, stateReceivedTopologyId);
        log.tracef("Starting the cache on the joiner", new Object[0]);
        AdvancedCache cache2 = this.advancedCache(2);
        checkPoint.trigger("allow_topology_" + joinTopologyId + "_on_" + this.address(0));
        checkPoint.trigger("allow_topology_" + joinTopologyId + "_on_" + this.address(1));
        checkPoint.trigger("allow_topology_" + joinTopologyId + "_on_" + this.address(2));
        Stream.of(cache0, cache1, cache2).forEach(cache -> this.eventuallyEquals(3, () -> cache.getRpcManager().getMembers().size()));
        CacheTopology duringJoinTopology = ltm0.getCacheTopology(cacheName);
        AssertJUnit.assertEquals((Object)CacheTopology.Phase.READ_OLD_WRITE_ALL, (Object)duringJoinTopology.getPhase());
        AssertJUnit.assertEquals((int)joinTopologyId, (int)duringJoinTopology.getTopologyId());
        AssertJUnit.assertNotNull((Object)duringJoinTopology.getPendingCH());
        int keySegment = TestingUtil.getSegmentForKey("testkey", cache0);
        log.tracef("Rebalance started. Found key %s with current owners %s and pending owners %s", (Object)"testkey", (Object)duringJoinTopology.getCurrentCH().locateOwnersForSegment(keySegment), (Object)duringJoinTopology.getPendingCH().locateOwnersForSegment(keySegment));
        CyclicBarrier beforeCache0Barrier = new CyclicBarrier(2);
        BlockingInterceptor<? extends VisitableCommand> blockingInterceptor0 = new BlockingInterceptor<VisitableCommand>(beforeCache0Barrier, op.getCommandClass(), false, true);
        TestingUtil.extractInterceptorChain(cache0).addInterceptorBefore(blockingInterceptor0, EntryWrappingInterceptor.class);
        Future<Object> future = this.fork(() -> op.perform((AdvancedCache<Object, Object>)cache0, "testkey"));
        beforeCache0Barrier.await(10L, TimeUnit.SECONDS);
        checkPoint.trigger("allow_topology_" + stateReceivedTopologyId + "_on_" + this.address(0));
        this.eventuallyEquals(stateReceivedTopologyId, () -> cache0.getDistributionManager().getCacheTopology().getTopologyId());
        AssertJUnit.assertEquals((Object)CacheTopology.Phase.READ_ALL_WRITE_ALL, (Object)cache0.getDistributionManager().getCacheTopology().getPhase());
        log.tracef("Unblocking the write command on node " + this.address(1), new Object[0]);
        beforeCache0Barrier.await(10L, TimeUnit.SECONDS);
        beforeCache0Barrier.await(10L, TimeUnit.SECONDS);
        blockingInterceptor0.suspend(true);
        beforeCache0Barrier.await(10L, TimeUnit.SECONDS);
        checkPoint.trigger("allow_topology_" + stateReceivedTopologyId + "_on_" + this.address(1));
        checkPoint.trigger("allow_topology_" + stateReceivedTopologyId + "_on_" + this.address(2));
        TestingUtil.waitForNoRebalance(new Cache[]{cache0, cache1, cache2});
        Object result = future.get(10L, TimeUnit.SECONDS);
        log.tracef("Write operation is done", new Object[0]);
        AssertJUnit.assertEquals((Object)op.getValue(), (Object)cache0.get((Object)"testkey"));
        AssertJUnit.assertEquals((Object)op.getValue(), (Object)cache1.get((Object)"testkey"));
        AssertJUnit.assertEquals((Object)op.getValue(), (Object)cache2.get((Object)"testkey"));
        AssertJUnit.assertFalse((boolean)cache0.getAdvancedCache().getLockManager().isLocked((Object)"testkey"));
        AssertJUnit.assertFalse((boolean)cache1.getAdvancedCache().getLockManager().isLocked((Object)"testkey"));
        AssertJUnit.assertFalse((boolean)cache2.getAdvancedCache().getLockManager().isLocked((Object)"testkey"));
    }

    private void addBlockingLocalTopologyManager(EmbeddedCacheManager manager, CheckPoint checkPoint, Integer ... blockedTopologyIds) {
        LocalTopologyManager component = TestingUtil.extractGlobalComponent((CacheContainer)manager, LocalTopologyManager.class);
        LocalTopologyManager spyLtm = (LocalTopologyManager)Mockito.spy((Object)component);
        ((LocalTopologyManager)Mockito.doAnswer(invocation -> {
            CacheTopology topology = (CacheTopology)invocation.getArguments()[1];
            if (Arrays.asList(blockedTopologyIds).contains(topology.getTopologyId())) {
                checkPoint.trigger("pre_topology_" + topology.getTopologyId() + "_on_" + manager.getAddress());
                checkPoint.awaitStrict("allow_topology_" + topology.getTopologyId() + "_on_" + manager.getAddress(), 10L, TimeUnit.SECONDS);
            }
            return invocation.callRealMethod();
        }).when((Object)spyLtm)).handleTopologyUpdate((String)ArgumentMatchers.eq((Object)TestingUtil.getDefaultCacheName(manager)), (CacheTopology)ArgumentMatchers.any(CacheTopology.class), (AvailabilityMode)ArgumentMatchers.any(AvailabilityMode.class), ArgumentMatchers.anyInt(), (Address)ArgumentMatchers.any(Address.class));
        TestingUtil.replaceComponent((CacheContainer)manager, LocalTopologyManager.class, spyLtm, true);
    }

    @ProtoName(value="PrimaryOwnerCustomConsistentHashFactory")
    public static class CustomConsistentHashFactory
    extends BaseControlledConsistentHashFactory.Default {
        CustomConsistentHashFactory() {
            super(1);
        }

        @Override
        protected int[][] assignOwners(int numSegments, List<Address> members) {
            switch (members.size()) {
                case 1: {
                    return new int[][]{{0}};
                }
                case 2: {
                    return new int[][]{{0, 1}};
                }
            }
            return new int[][]{{members.size() - 1, 0}};
        }
    }
}

