/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.distribution.rehash;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.distribution.BlockingInterceptor;
import org.infinispan.distribution.ch.ConsistentHashFactory;
import org.infinispan.distribution.rehash.DistributionRehashSCI;
import org.infinispan.interceptors.AsyncInterceptor;
import org.infinispan.interceptors.DDAsyncInterceptor;
import org.infinispan.interceptors.distribution.TriangleDistributionInterceptor;
import org.infinispan.interceptors.impl.EntryWrappingInterceptor;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.partitionhandling.AvailabilityMode;
import org.infinispan.protostream.SerializationContextInitializer;
import org.infinispan.protostream.annotations.ProtoName;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateTransferInterceptor;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.test.fwk.TransportFlags;
import org.infinispan.test.op.TestOperation;
import org.infinispan.test.op.TestWriteOperation;
import org.infinispan.topology.CacheTopology;
import org.infinispan.topology.LocalTopologyManager;
import org.infinispan.transaction.TransactionMode;
import org.infinispan.util.BaseControlledConsistentHashFactory;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="distribution.rehash.NonTxBackupOwnerBecomingPrimaryOwnerTest")
@CleanupAfterMethod
public class NonTxBackupOwnerBecomingPrimaryOwnerTest
extends MultipleCacheManagersTest {
    @Override
    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder c = this.getConfigurationBuilder();
        this.createCluster(DistributionRehashSCI.INSTANCE, c, 2);
        this.waitForClusterToForm();
    }

    private ConfigurationBuilder getConfigurationBuilder() {
        ConfigurationBuilder c = new ConfigurationBuilder();
        c.clustering().cacheMode(CacheMode.DIST_SYNC);
        c.clustering().hash().numSegments(1).consistentHashFactory((ConsistentHashFactory)new CustomConsistentHashFactory());
        c.transaction().transactionMode(TransactionMode.NON_TRANSACTIONAL);
        return c;
    }

    public void testPrimaryOwnerChangingDuringPut() throws Exception {
        this.doTest(TestWriteOperation.PUT_CREATE);
    }

    public void testPrimaryOwnerChangingDuringPutOverwrite() throws Exception {
        this.doTest(TestWriteOperation.PUT_OVERWRITE);
    }

    public void testPrimaryOwnerChangingDuringPutIfAbsent() throws Exception {
        this.doTest(TestWriteOperation.PUT_IF_ABSENT);
    }

    public void testPrimaryOwnerChangingDuringReplace() throws Exception {
        this.doTest(TestWriteOperation.REPLACE);
    }

    public void testPrimaryOwnerChangingDuringReplaceExact() throws Exception {
        this.doTest(TestWriteOperation.REPLACE_EXACT);
    }

    public void testPrimaryOwnerChangingDuringRemove() throws Exception {
        this.doTest(TestWriteOperation.REMOVE);
    }

    public void testPrimaryOwnerChangingDuringRemoveExact() throws Exception {
        this.doTest(TestWriteOperation.REMOVE_EXACT);
    }

    protected void doTest(TestOperation op) throws Exception {
        String key = "testkey";
        String cacheName = this.getDefaultCacheName();
        op.insertPreviousValue(this.advancedCache(0, cacheName), "testkey");
        CheckPoint checkPoint = new CheckPoint();
        LocalTopologyManager ltm0 = TestingUtil.extractGlobalComponent((CacheContainer)this.manager(0), LocalTopologyManager.class);
        int preJoinTopologyId = ltm0.getCacheTopology(cacheName).getTopologyId();
        int joinTopologyId = preJoinTopologyId + 1;
        int stateReceivedTopologyId = joinTopologyId + 1;
        AdvancedCache cache0 = this.advancedCache(0);
        this.addBlockingLocalTopologyManager(this.manager(0), checkPoint, joinTopologyId, stateReceivedTopologyId);
        AdvancedCache cache1 = this.advancedCache(1);
        this.addBlockingLocalTopologyManager(this.manager(1), checkPoint, joinTopologyId, stateReceivedTopologyId);
        ConfigurationBuilder c = this.getConfigurationBuilder();
        c.clustering().stateTransfer().awaitInitialTransfer(false);
        CountDownLatch stateTransferLatch = new CountDownLatch(1);
        if (op.getPreviousValue() != null) {
            c.customInterceptors().addInterceptor().before(EntryWrappingInterceptor.class).interceptor((AsyncInterceptor)new StateTransferLatchInterceptor(stateTransferLatch));
        } else {
            stateTransferLatch.countDown();
        }
        GlobalConfigurationBuilder globalBuilder = GlobalConfigurationBuilder.defaultClusteredBuilder();
        globalBuilder.serialization().addContextInitializer((SerializationContextInitializer)DistributionRehashSCI.INSTANCE);
        EmbeddedCacheManager cm = TestCacheManagerFactory.createClusteredCacheManager(false, globalBuilder, c, new TransportFlags());
        this.registerCacheManager(new CacheContainer[]{cm});
        this.addBlockingLocalTopologyManager(this.manager(2), checkPoint, joinTopologyId, stateReceivedTopologyId);
        log.tracef("Starting the cache on the joiner", new Object[0]);
        AdvancedCache cache2 = this.advancedCache(2);
        checkPoint.trigger("allow_topology_" + joinTopologyId + "_on_" + this.address(0));
        checkPoint.trigger("allow_topology_" + joinTopologyId + "_on_" + this.address(1));
        checkPoint.trigger("allow_topology_" + joinTopologyId + "_on_" + this.address(2));
        this.eventually(() -> cache0.getRpcManager().getMembers().size() == 3 && cache1.getRpcManager().getMembers().size() == 3 && cache2.getRpcManager().getMembers().size() == 3);
        CacheTopology duringJoinTopology = ltm0.getCacheTopology(cacheName);
        AssertJUnit.assertEquals((int)joinTopologyId, (int)duringJoinTopology.getTopologyId());
        AssertJUnit.assertNotNull((Object)duringJoinTopology.getPendingCH());
        int keySegment = TestingUtil.getSegmentForKey("testkey", cache0);
        log.tracef("Rebalance started. Found key %s with current owners %s and pending owners %s", (Object)"testkey", (Object)duringJoinTopology.getCurrentCH().locateOwnersForSegment(keySegment), (Object)duringJoinTopology.getPendingCH().locateOwnersForSegment(keySegment));
        stateTransferLatch.await(10L, TimeUnit.SECONDS);
        CyclicBarrier beforeCache1Barrier = new CyclicBarrier(2);
        BlockingInterceptor<? extends VisitableCommand> blockingInterceptor1 = new BlockingInterceptor<VisitableCommand>(beforeCache1Barrier, op.getCommandClass(), false, false);
        TestingUtil.extractInterceptorChain(cache1).addInterceptorBefore(blockingInterceptor1, TriangleDistributionInterceptor.class);
        CyclicBarrier afterCache2Barrier = new CyclicBarrier(2);
        BlockingInterceptor<VisitableCommand> blockingInterceptor2 = new BlockingInterceptor<VisitableCommand>(afterCache2Barrier, op.getCommandClass(), true, false, cmd -> !(cmd instanceof FlagAffectedCommand) || !((FlagAffectedCommand)cmd).hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER));
        TestingUtil.extractInterceptorChain(cache2).addInterceptorBefore(blockingInterceptor2, StateTransferInterceptor.class);
        Future<Object> future = this.fork(() -> op.perform((AdvancedCache<Object, Object>)cache0, "testkey"));
        afterCache2Barrier.await(10L, TimeUnit.SECONDS);
        afterCache2Barrier.await(10L, TimeUnit.SECONDS);
        checkPoint.trigger("allow_topology_" + stateReceivedTopologyId + "_on_" + this.address(0));
        checkPoint.trigger("allow_topology_" + stateReceivedTopologyId + "_on_" + this.address(1));
        checkPoint.trigger("allow_topology_" + stateReceivedTopologyId + "_on_" + this.address(2));
        TestingUtil.waitForNoRebalance(new Cache[]{cache0, cache1, cache2});
        log.tracef("Unblocking the put command on node " + this.address(1), new Object[0]);
        beforeCache1Barrier.await(10L, TimeUnit.SECONDS);
        beforeCache1Barrier.await(10L, TimeUnit.SECONDS);
        CacheTopology postReceiveStateTopology = ltm0.getCacheTopology(cacheName);
        if (postReceiveStateTopology.getCurrentCH().locateOwnersForSegment(keySegment).contains(this.address(1))) {
            beforeCache1Barrier.await(10L, TimeUnit.SECONDS);
            beforeCache1Barrier.await(10L, TimeUnit.SECONDS);
        }
        afterCache2Barrier.await(10L, TimeUnit.SECONDS);
        afterCache2Barrier.await(10L, TimeUnit.SECONDS);
        Object result = future.get(10L, TimeUnit.SECONDS);
        AssertJUnit.assertEquals((Object)op.getReturnValueWithRetry(), (Object)result);
        log.tracef("Write operation is done", new Object[0]);
        AssertJUnit.assertEquals((Object)op.getValue(), (Object)cache0.get((Object)"testkey"));
        AssertJUnit.assertEquals((Object)op.getValue(), (Object)cache1.get((Object)"testkey"));
        AssertJUnit.assertEquals((Object)op.getValue(), (Object)cache2.get((Object)"testkey"));
        AssertJUnit.assertFalse((boolean)cache0.getAdvancedCache().getLockManager().isLocked((Object)"testkey"));
        AssertJUnit.assertFalse((boolean)cache1.getAdvancedCache().getLockManager().isLocked((Object)"testkey"));
        AssertJUnit.assertFalse((boolean)cache2.getAdvancedCache().getLockManager().isLocked((Object)"testkey"));
    }

    private void addBlockingLocalTopologyManager(EmbeddedCacheManager manager, CheckPoint checkPoint, Integer ... blockedTopologyIds) {
        LocalTopologyManager component = TestingUtil.extractGlobalComponent((CacheContainer)manager, LocalTopologyManager.class);
        LocalTopologyManager spyLtm = (LocalTopologyManager)Mockito.spy((Object)component);
        ((LocalTopologyManager)Mockito.doAnswer(invocation -> {
            CacheTopology topology = (CacheTopology)invocation.getArguments()[1];
            if (Arrays.asList(blockedTopologyIds).contains(topology.getTopologyId())) {
                checkPoint.trigger("pre_topology_" + topology.getTopologyId() + "_on_" + manager.getAddress());
                checkPoint.awaitStrict("allow_topology_" + topology.getTopologyId() + "_on_" + manager.getAddress(), 10L, TimeUnit.SECONDS);
            }
            return invocation.callRealMethod();
        }).when((Object)spyLtm)).handleTopologyUpdate((String)ArgumentMatchers.eq((Object)TestingUtil.getDefaultCacheName(manager)), (CacheTopology)ArgumentMatchers.any(CacheTopology.class), (AvailabilityMode)ArgumentMatchers.any(AvailabilityMode.class), ArgumentMatchers.anyInt(), (Address)ArgumentMatchers.any(Address.class));
        TestingUtil.replaceComponent((CacheContainer)manager, LocalTopologyManager.class, spyLtm, true);
    }

    static class StateTransferLatchInterceptor
    extends DDAsyncInterceptor {
        private final CountDownLatch latch;

        private StateTransferLatchInterceptor(CountDownLatch latch) {
            this.latch = latch;
        }

        public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
            return this.invokeNextAndFinally(ctx, (VisitableCommand)command, (rCtx, rCommand, rv, throwable) -> {
                if (rCommand.hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) {
                    this.latch.countDown();
                }
            });
        }
    }

    @ProtoName(value="BackupOwnerCustomConsistentHashFactory")
    public static class CustomConsistentHashFactory
    extends BaseControlledConsistentHashFactory.Default {
        CustomConsistentHashFactory() {
            super(1);
        }

        @Override
        protected int[][] assignOwners(int numSegments, List<Address> members) {
            switch (members.size()) {
                case 1: {
                    return new int[][]{{0}};
                }
                case 2: {
                    return new int[][]{{0, 1}};
                }
            }
            return new int[][]{{members.size() - 1, 0}};
        }
    }
}

