/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.statetransfer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.Cache;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.Util;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.globalstate.NoOpGlobalConfigurationManager;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.partitionhandling.AvailabilityMode;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateProvider;
import org.infinispan.test.Mocks;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestException;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.test.fwk.InTransactionMode;
import org.infinispan.test.fwk.TransportFlags;
import org.infinispan.topology.CacheTopology;
import org.infinispan.topology.LocalTopologyManager;
import org.infinispan.transaction.TransactionMode;
import org.jgroups.protocols.DISCARD;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="statetransfer.ClusterTopologyManagerTest")
@CleanupAfterMethod
public class ClusterTopologyManagerTest
extends MultipleCacheManagersTest {
    public static final String CACHE_NAME = "testCache";
    private static final String OTHER_CACHE_NAME = "other_cache";
    private ConfigurationBuilder defaultConfig;
    private Cache<?, ?> c1;
    private Cache<?, ?> c2;
    private Cache<?, ?> c3;
    private DISCARD d1;
    private DISCARD d2;
    private DISCARD d3;

    @Override
    public Object[] factory() {
        return new Object[]{new ClusterTopologyManagerTest().cacheMode(CacheMode.DIST_SYNC).transactional(true), new ClusterTopologyManagerTest().cacheMode(CacheMode.SCATTERED_SYNC).transactional(false)};
    }

    @Override
    protected void createCacheManagers() throws Throwable {
        this.defaultConfig = ClusterTopologyManagerTest.getDefaultClusteredCacheConfig(this.cacheMode, this.transactional);
        this.createClusteredCaches(3, this.defaultConfig, new TransportFlags().withFD(true).withMerge(true), CACHE_NAME);
        this.c1 = this.cache(0, CACHE_NAME);
        this.c2 = this.cache(1, CACHE_NAME);
        this.c3 = this.cache(2, CACHE_NAME);
        this.d1 = TestingUtil.getDiscardForCache(this.c1.getCacheManager());
        this.d2 = TestingUtil.getDiscardForCache(this.c2.getCacheManager());
        this.d3 = TestingUtil.getDiscardForCache(this.c3.getCacheManager());
    }

    @Override
    protected void amendCacheManagerBeforeStart(EmbeddedCacheManager cm) {
        NoOpGlobalConfigurationManager.amendCacheManager(cm);
    }

    public void testNodeAbruptLeave() {
        ConfigurationBuilder cfg = this.defaultConfig;
        this.defineConfigurationOnAllManagers("cache2", cfg);
        this.defineConfigurationOnAllManagers("cache3", cfg);
        this.defineConfigurationOnAllManagers("cache4", cfg);
        this.defineConfigurationOnAllManagers("cache5", cfg);
        this.cache(0, "cache2");
        this.cache(1, "cache2");
        this.cache(0, "cache3");
        this.cache(2, "cache3");
        this.cache(1, "cache4");
        this.cache(2, "cache4");
        this.cache(0, "cache5");
        this.cache(1, "cache5");
        log.debugf("Splitting cluster", new Object[0]);
        this.d3.setDiscardAll(true);
        TestingUtil.installNewView(this.manager(0), this.manager(1));
        TestingUtil.installNewView(this.manager(2));
        long startTime = System.currentTimeMillis();
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c1, this.c2);
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c3);
        TestingUtil.waitForNoRebalance(this.c1, this.c2);
        TestingUtil.waitForNoRebalance(this.c3);
        TestingUtil.waitForNoRebalance(this.cache(0, "cache2"), this.cache(1, "cache2"));
        TestingUtil.waitForNoRebalance(this.cache(0, "cache3"));
        TestingUtil.waitForNoRebalance(this.cache(1, "cache4"));
        TestingUtil.waitForNoRebalance(this.cache(0, "cache5"), this.cache(1, "cache5"));
        long endTime = System.currentTimeMillis();
        log.debugf("Recovery took %s", (Object)Util.prettyPrintTime((long)(endTime - startTime)));
        assert (endTime - startTime < 30000L) : "Recovery took too long: " + Util.prettyPrintTime((long)(endTime - startTime));
        EmbeddedCacheManager newCm = this.addClusterEnabledCacheManager(new TransportFlags().withFD(true).withMerge(true));
        newCm.defineConfiguration(CACHE_NAME, this.defaultConfig.build());
        Cache c4 = this.cache(3, CACHE_NAME);
        TestingUtil.blockUntilViewsReceived(30000L, true, this.c1, this.c2, c4);
        TestingUtil.waitForNoRebalance(this.c1, this.c2, c4);
        newCm.defineConfiguration("cache2", this.defaultConfig.build());
        newCm.defineConfiguration("cache3", this.defaultConfig.build());
        newCm.defineConfiguration("cache4", this.defaultConfig.build());
        newCm.defineConfiguration("cache5", this.defaultConfig.build());
        this.cache(3, "cache2");
        this.cache(3, "cache3");
        this.cache(3, "cache4");
        this.cache(3, "cache5");
        TestingUtil.waitForNoRebalance(this.cache(0, "cache2"), this.cache(1, "cache2"), this.cache(3, "cache2"));
        TestingUtil.waitForNoRebalance(this.cache(0, "cache3"), this.cache(3, "cache3"));
        TestingUtil.waitForNoRebalance(this.cache(1, "cache4"), this.cache(3, "cache4"));
        TestingUtil.waitForNoRebalance(this.cache(0, "cache5"), this.cache(1, "cache5"), this.cache(3, "cache5"));
    }

    public void testClusterRecoveryAfterCoordLeave() {
        log.debugf("Splitting cluster", new Object[0]);
        this.d1.setDiscardAll(true);
        TestingUtil.installNewView(this.manager(0));
        TestingUtil.installNewView(this.manager(1), this.manager(2));
        long startTime = System.currentTimeMillis();
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c1);
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c2, this.c3);
        TestingUtil.waitForNoRebalance(this.c1);
        TestingUtil.waitForNoRebalance(this.c2, this.c3);
        long endTime = System.currentTimeMillis();
        log.debugf("Recovery took %s", (Object)Util.prettyPrintTime((long)(endTime - startTime)));
        assert (endTime - startTime < 30000L) : "Recovery took too long: " + Util.prettyPrintTime((long)(endTime - startTime));
        this.addClusterEnabledCacheManager(new TransportFlags().withFD(true).withMerge(true));
        this.manager(3).defineConfiguration(CACHE_NAME, this.defaultConfig.build());
        Cache c4 = this.cache(3, CACHE_NAME);
        TestingUtil.blockUntilViewsReceived(30000L, true, this.c2, this.c3, c4);
        TestingUtil.waitForNoRebalance(this.c2, this.c3, c4);
    }

    public void testClusterRecoveryAfterThreeWaySplit() {
        log.debugf("Splitting the cluster in three", new Object[0]);
        this.d1.setDiscardAll(true);
        this.d2.setDiscardAll(true);
        this.d3.setDiscardAll(true);
        TestingUtil.installNewView(this.manager(0));
        TestingUtil.installNewView(this.manager(1));
        TestingUtil.installNewView(this.manager(2));
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c1);
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c2);
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c3);
        TestingUtil.waitForNoRebalance(this.c1);
        TestingUtil.waitForNoRebalance(this.c2);
        TestingUtil.waitForNoRebalance(this.c3);
        log.debugf("Merging the cluster partitions", new Object[0]);
        this.d1.setDiscardAll(false);
        this.d2.setDiscardAll(false);
        this.d3.setDiscardAll(false);
        long startTime = System.currentTimeMillis();
        TestingUtil.blockUntilViewsReceived(60000L, this.c1, this.c2, this.c3);
        TestingUtil.waitForNoRebalance(this.c1, this.c2, this.c3);
        long endTime = System.currentTimeMillis();
        log.debugf("Merge took %s", (Object)Util.prettyPrintTime((long)(endTime - startTime)));
        assert (endTime - startTime < 30000L) : "Merge took too long: " + Util.prettyPrintTime((long)(endTime - startTime));
        this.addClusterEnabledCacheManager(new TransportFlags().withFD(true).withMerge(true));
        this.manager(3).defineConfiguration(CACHE_NAME, this.defaultConfig.build());
        Cache c4 = this.cache(3, CACHE_NAME);
        TestingUtil.blockUntilViewsReceived(30000L, true, this.c1, this.c2, this.c3, c4);
        TestingUtil.waitForNoRebalance(this.c1, this.c2, this.c3, c4);
    }

    public void testClusterRecoveryAfterSplitAndCoordLeave() {
        log.debugf("Splitting the cluster in three", new Object[0]);
        this.d1.setDiscardAll(true);
        this.d2.setDiscardAll(true);
        this.d3.setDiscardAll(true);
        TestingUtil.installNewView(this.manager(0));
        TestingUtil.installNewView(this.manager(1));
        TestingUtil.installNewView(this.manager(2));
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c1);
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c2);
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c3);
        TestingUtil.waitForNoRebalance(this.c1);
        TestingUtil.waitForNoRebalance(this.c2);
        TestingUtil.waitForNoRebalance(this.c3);
        this.manager(0).stop();
        log.debugf("Merging the cluster partitions", new Object[0]);
        this.d2.setDiscardAll(false);
        this.d3.setDiscardAll(false);
        long startTime = System.currentTimeMillis();
        TestingUtil.blockUntilViewsReceived(30000L, this.c2, this.c3);
        TestingUtil.waitForNoRebalance(this.c2, this.c3);
        long endTime = System.currentTimeMillis();
        log.debugf("Merge took %s", (Object)Util.prettyPrintTime((long)(endTime - startTime)));
        assert (endTime - startTime < 30000L) : "Merge took too long: " + Util.prettyPrintTime((long)(endTime - startTime));
        this.addClusterEnabledCacheManager(new TransportFlags().withFD(true).withMerge(true));
        this.manager(3).defineConfiguration(CACHE_NAME, this.defaultConfig.build());
        Cache c4 = this.cache(3, CACHE_NAME);
        TestingUtil.blockUntilViewsReceived(30000L, true, this.c2, this.c3, c4);
        TestingUtil.waitForNoRebalance(this.c2, this.c3, c4);
    }

    public void testClusterRecoveryWithRebalance() throws Exception {
        ArrayList members = new ArrayList(this.manager(0).getMembers());
        Collections.sort(members);
        Address mergeCoordAddress = (Address)members.get(0);
        log.debugf("The merge coordinator will be %s", (Object)mergeCoordAddress);
        EmbeddedCacheManager mergeCoordManager = this.manager(mergeCoordAddress);
        int mergeCoordIndex = this.cacheManagers.indexOf(mergeCoordManager);
        log.debugf("Splitting the cluster in three", new Object[0]);
        this.d1.setDiscardAll(true);
        this.d2.setDiscardAll(true);
        this.d3.setDiscardAll(true);
        TestingUtil.installNewView(this.manager(0));
        TestingUtil.installNewView(this.manager(1));
        TestingUtil.installNewView(this.manager(2));
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c1);
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c2);
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c3);
        TestingUtil.waitForNoRebalance(this.c1);
        TestingUtil.waitForNoRebalance(this.c2);
        TestingUtil.waitForNoRebalance(this.c3);
        if (mergeCoordIndex == 0) {
            this.d1.setDiscardAll(false);
        }
        if (mergeCoordIndex == 1) {
            this.d2.setDiscardAll(false);
        }
        if (mergeCoordIndex == 2) {
            this.d3.setDiscardAll(false);
        }
        int viewIdAfterSplit = mergeCoordManager.getTransport().getViewId();
        CheckPoint checkpoint = new CheckPoint();
        this.blockRebalanceStart(mergeCoordManager, checkpoint, 2);
        EmbeddedCacheManager cm4 = this.addClusterEnabledCacheManager(new TransportFlags().withFD(true).withMerge(true));
        this.blockRebalanceStart(cm4, checkpoint, 2);
        cm4.defineConfiguration(CACHE_NAME, this.defaultConfig.build());
        cm4.defineConfiguration(OTHER_CACHE_NAME, this.defaultConfig.build());
        cm4.getCache(OTHER_CACHE_NAME);
        TestingUtil.blockUntilViewsReceived(30000L, new CacheContainer[]{this.manager(mergeCoordIndex), cm4});
        Future<Cache> cacheFuture = this.fork(() -> cm4.getCache(CACHE_NAME));
        log.debugf("Waiting for the REBALANCE_START command to reach the merge coordinator", new Object[0]);
        checkpoint.awaitStrict("rebalance_" + Arrays.asList(mergeCoordAddress, cm4.getAddress()), 10L, TimeUnit.SECONDS);
        log.debugf("Merging the cluster partitions", new Object[0]);
        this.d1.setDiscardAll(false);
        this.d2.setDiscardAll(false);
        this.d3.setDiscardAll(false);
        long startTime = System.currentTimeMillis();
        TestingUtil.blockUntilViewsReceived(30000, this.cacheManagers);
        TestingUtil.waitForNoRebalance(this.caches(CACHE_NAME));
        log.debugf("Unblocking the REBALANCE_START command on the coordinator", new Object[0]);
        checkpoint.triggerForever("merge");
        Cache c4 = cacheFuture.get(30L, TimeUnit.SECONDS);
        TestingUtil.waitForNoRebalance(this.c1, this.c2, this.c3, c4);
        long endTime = System.currentTimeMillis();
        log.debugf("Merge took %s", (Object)Util.prettyPrintTime((long)(endTime - startTime)));
        assert (endTime - startTime < 30000L) : "Merge took too long: " + Util.prettyPrintTime((long)(endTime - startTime));
        EmbeddedCacheManager cm5 = this.addClusterEnabledCacheManager(new TransportFlags().withFD(true).withMerge(true));
        cm5.defineConfiguration(CACHE_NAME, this.defaultConfig.build());
        Cache c5 = cm5.getCache(CACHE_NAME);
        TestingUtil.blockUntilViewsReceived(30000L, true, this.c1, this.c2, this.c3, c4, c5);
        TestingUtil.waitForNoRebalance(this.c1, this.c2, this.c3, c4, c5);
    }

    protected void blockRebalanceStart(EmbeddedCacheManager manager, CheckPoint checkpoint, int numMembers) {
        LocalTopologyManager localTopologyManager = TestingUtil.extractGlobalComponent((CacheContainer)manager, LocalTopologyManager.class);
        LocalTopologyManager spyLocalTopologyManager = (LocalTopologyManager)Mockito.spy((Object)localTopologyManager);
        ((LocalTopologyManager)Mockito.doAnswer(invocation -> {
            CacheTopology topology = (CacheTopology)invocation.getArguments()[1];
            List members = topology.getMembers();
            checkpoint.trigger("rebalance_" + members);
            if (members.size() == numMembers) {
                log.debugf("Blocking the REBALANCE_START command with members %s on %s", (Object)members, (Object)manager.getAddress());
                return TestingUtil.sequence(checkpoint.future("merge", 30L, TimeUnit.SECONDS, this.testExecutor()), () -> (CompletionStage)Mocks.callRealMethod(invocation));
            }
            return invocation.callRealMethod();
        }).when((Object)spyLocalTopologyManager)).handleRebalance((String)ArgumentMatchers.eq((Object)CACHE_NAME), (CacheTopology)ArgumentMatchers.any(CacheTopology.class), ArgumentMatchers.anyInt(), (Address)ArgumentMatchers.any(Address.class));
        TestingUtil.replaceComponent((CacheContainer)manager, LocalTopologyManager.class, spyLocalTopologyManager, true);
    }

    public void testAbruptLeaveAfterGetStatus() throws TimeoutException, InterruptedException {
        LocalTopologyManager localTopologyManager2 = TestingUtil.extractGlobalComponent((CacheContainer)this.manager(1), LocalTopologyManager.class);
        CheckPoint checkpoint = new CheckPoint();
        LocalTopologyManager spyLocalTopologyManager2 = (LocalTopologyManager)Mockito.spy((Object)localTopologyManager2);
        CacheTopology initialTopology = localTopologyManager2.getCacheTopology(CACHE_NAME);
        log.debugf("Starting with topology %d", initialTopology.getTopologyId());
        ((LocalTopologyManager)Mockito.doAnswer(invocation -> {
            int viewId = (Integer)invocation.getArguments()[0];
            checkpoint.trigger("GET_STATUS_" + viewId);
            log.debugf("Blocking the GET_STATUS command on the new coordinator", new Object[0]);
            checkpoint.awaitStrict("3 left", 10L, TimeUnit.SECONDS);
            return invocation.callRealMethod();
        }).when((Object)spyLocalTopologyManager2)).handleStatusRequest(ArgumentMatchers.anyInt());
        CompletableFuture update2MembersFuture = new CompletableFuture();
        ((LocalTopologyManager)Mockito.doAnswer(invocation -> {
            CacheTopology topology = (CacheTopology)invocation.getArguments()[1];
            if (topology.getMembers().size() == 2) {
                log.debugf("Found CH update with 2 mem %s", (Object)topology);
                update2MembersFuture.completeExceptionally(new TestException());
            }
            return invocation.callRealMethod();
        }).when((Object)spyLocalTopologyManager2)).handleTopologyUpdate((String)ArgumentMatchers.eq((Object)CACHE_NAME), (CacheTopology)ArgumentMatchers.any(CacheTopology.class), (AvailabilityMode)ArgumentMatchers.any(AvailabilityMode.class), ArgumentMatchers.anyInt(), (Address)ArgumentMatchers.any(Address.class));
        ((LocalTopologyManager)Mockito.doAnswer(invocation -> {
            CacheTopology topology = (CacheTopology)invocation.getArguments()[1];
            if (topology.getMembers().size() == 2) {
                log.debugf("Discarding rebalance command %s", (Object)topology);
                update2MembersFuture.completeExceptionally(new TestException());
            }
            return invocation.callRealMethod();
        }).when((Object)spyLocalTopologyManager2)).handleRebalance((String)ArgumentMatchers.eq((Object)CACHE_NAME), (CacheTopology)ArgumentMatchers.any(CacheTopology.class), ArgumentMatchers.anyInt(), (Address)ArgumentMatchers.any(Address.class));
        TestingUtil.replaceComponent((CacheContainer)this.manager(1), LocalTopologyManager.class, spyLocalTopologyManager2, true);
        this.killNode(this.manager(0), new EmbeddedCacheManager[]{this.manager(1), this.manager(2)});
        int viewId = this.manager(1).getTransport().getViewId();
        checkpoint.awaitStrict("GET_STATUS_" + viewId, 10L, TimeUnit.SECONDS);
        this.killNode(this.manager(2), new EmbeddedCacheManager[]{this.manager(1)});
        checkpoint.triggerForever("3 left");
        TestingUtil.waitForNoRebalance(this.c2);
        update2MembersFuture.complete(null);
        update2MembersFuture.join();
    }

    private void killNode(EmbeddedCacheManager nodeToKill, EmbeddedCacheManager[] nodesToKeep) {
        log.debugf("Killing node %s", (Object)nodeToKill);
        this.d1.setDiscardAll(true);
        TestingUtil.installNewView(nodeToKill);
        nodeToKill.stop();
        TestingUtil.installNewView(nodesToKeep);
        TestingUtil.blockUntilViewsReceived(30000L, false, (CacheContainer[])nodesToKeep);
    }

    @InTransactionMode(value={TransactionMode.TRANSACTIONAL})
    public void testLeaveDuringGetTransactions() throws InterruptedException, TimeoutException {
        CheckPoint checkpoint = new CheckPoint();
        StateProvider stateProvider = TestingUtil.extractComponent(this.c2, StateProvider.class);
        StateProvider spyStateProvider = (StateProvider)Mockito.spy((Object)stateProvider);
        ((StateProvider)Mockito.doAnswer(invocation -> {
            int topologyId = (Integer)invocation.getArguments()[1];
            checkpoint.trigger("GET_TRANSACTIONS");
            log.debugf("Blocking the GET_TRANSACTIONS(%d) command on the %s", topologyId, this.c2);
            checkpoint.awaitStrict("LEAVE", 10L, TimeUnit.SECONDS);
            return invocation.callRealMethod();
        }).when((Object)spyStateProvider)).getTransactionsForSegments((Address)ArgumentMatchers.any(Address.class), ArgumentMatchers.anyInt(), (IntSet)ArgumentMatchers.any());
        TestingUtil.replaceComponent(this.c2, StateProvider.class, spyStateProvider, true);
        long startTime = System.currentTimeMillis();
        this.manager(2).stop();
        checkpoint.awaitStrict("GET_TRANSACTIONS", 10L, TimeUnit.SECONDS);
        this.manager(1).stop();
        checkpoint.trigger("LEAVE");
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c1);
        TestingUtil.waitForNoRebalance(this.c1);
        long endTime = System.currentTimeMillis();
        log.debugf("Recovery took %s", (Object)Util.prettyPrintTime((long)(endTime - startTime)));
        assert (endTime - startTime < 30000L) : "Recovery took too long: " + Util.prettyPrintTime((long)(endTime - startTime));
    }
}

