/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.topology;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.infinispan.commands.topology.CacheStatusRequestCommand;
import org.infinispan.commands.topology.RebalanceStartCommand;
import org.infinispan.commands.topology.RebalanceStatusRequestCommand;
import org.infinispan.commands.topology.TopologyUpdateCommand;
import org.infinispan.commands.topology.TopologyUpdateStableCommand;
import org.infinispan.commons.test.ExceptionRunnable;
import org.infinispan.configuration.ConfigurationManager;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.distribution.TestAddress;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.ConsistentHashFactory;
import org.infinispan.distribution.ch.impl.ReplicatedConsistentHashFactory;
import org.infinispan.factories.GlobalComponentRegistry;
import org.infinispan.factories.impl.BasicComponentRegistry;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.manager.TestModuleRepository;
import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifierImpl;
import org.infinispan.partitionhandling.AvailabilityMode;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.MockTransport;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.topology.CacheJoinInfo;
import org.infinispan.topology.CacheStatusResponse;
import org.infinispan.topology.CacheTopology;
import org.infinispan.topology.ClusterTopologyManager;
import org.infinispan.topology.ClusterTopologyManagerImpl;
import org.infinispan.topology.LocalTopologyManager;
import org.infinispan.topology.MockLocalTopologyManager;
import org.infinispan.topology.PersistentUUID;
import org.infinispan.topology.PersistentUUIDManager;
import org.infinispan.topology.PersistentUUIDManagerImpl;
import org.infinispan.util.concurrent.CompletionStages;
import org.mockito.Mockito;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;

@Test(groups={"unit"}, testName="topology.ClusterTopologyManagerImplTest")
public class ClusterTopologyManagerImplTest
extends AbstractInfinispanTest {
    private static final String CACHE_NAME = "testCache";
    private ExecutorService executor = Executors.newFixedThreadPool(2, this.getTestThreadFactory("Executor"));
    private ExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(this.getTestThreadFactory("Executor"));
    private static final Address A = new TestAddress(0, "A");
    private static final Address B = new TestAddress(1, "B");
    private final ConsistentHashFactory<?> replicatedChf = new ReplicatedConsistentHashFactory();
    private final CacheJoinInfo joinInfoA = this.makeJoinInfo();
    private final CacheJoinInfo joinInfoB = this.makeJoinInfo();

    private CacheJoinInfo makeJoinInfo() {
        return new CacheJoinInfo(this.replicatedChf, 16, 1, 1000L, CacheMode.REPL_SYNC, 1.0f, PersistentUUID.randomUUID(), Optional.empty());
    }

    public void testClusterStartupWith2Nodes() throws Exception {
        GlobalConfiguration gc = GlobalConfigurationBuilder.defaultClusteredBuilder().build();
        EmbeddedCacheManager cacheManager = (EmbeddedCacheManager)Mockito.mock(EmbeddedCacheManager.class);
        GlobalComponentRegistry gcr = new GlobalComponentRegistry(gc, cacheManager, Collections.emptySet(), TestModuleRepository.defaultModuleRepository(), (ConfigurationManager)Mockito.mock(ConfigurationManager.class));
        BasicComponentRegistry gbcr = (BasicComponentRegistry)gcr.getComponent(BasicComponentRegistry.class);
        CacheManagerNotifierImpl managerNotifier = new CacheManagerNotifierImpl();
        gbcr.replaceComponent(CacheManagerNotifier.class.getName(), (Object)managerNotifier, false);
        managerNotifier.start();
        MockTransport transport = new MockTransport(A);
        gbcr.replaceComponent(Transport.class.getName(), (Object)transport, false);
        PersistentUUIDManagerImpl persistentUUIDManager = new PersistentUUIDManagerImpl();
        gbcr.replaceComponent(PersistentUUIDManager.class.getName(), (Object)persistentUUIDManager, false);
        gbcr.replaceComponent("org.infinispan.executors.non-blocking", (Object)this.executor, false);
        gbcr.replaceComponent("org.infinispan.executors.timeout", (Object)this.scheduledExecutor, false);
        MockLocalTopologyManager ltm = new MockLocalTopologyManager(CACHE_NAME);
        gbcr.replaceComponent(LocalTopologyManager.class.getName(), (Object)ltm, false);
        transport.init(1, Collections.singletonList(A));
        ltm.init(null, null, null, null);
        ClusterTopologyManagerImpl ctm = new ClusterTopologyManagerImpl();
        gbcr.replaceComponent(ClusterTopologyManager.class.getName(), (Object)ctm, false);
        gcr.rewire();
        ctm.start();
        transport.expectCommand(CacheStatusRequestCommand.class).finish();
        Thread.sleep(1L);
        transport.verifyNoErrors();
        CacheStatusResponse joinResponseA = (CacheStatusResponse)CompletionStages.join((CompletionStage)ctm.handleJoin(CACHE_NAME, A, this.joinInfoA, 1));
        AssertJUnit.assertEquals((int)1, (int)joinResponseA.getCacheTopology().getTopologyId());
        this.assertCHMembers(joinResponseA.getCacheTopology().getCurrentCH(), A);
        AssertJUnit.assertNull((Object)joinResponseA.getCacheTopology().getPendingCH());
        ltm.handleTopologyUpdate(CACHE_NAME, joinResponseA.getCacheTopology(), joinResponseA.getAvailabilityMode(), 1, A);
        ltm.expectTopology(1, Collections.singletonList(A), null, CacheTopology.Phase.NO_REBALANCE);
        transport.expectCommand(TopologyUpdateStableCommand.class, c -> {
            this.assertCHMembers(c.getCurrentCH(), A);
            AssertJUnit.assertNull((Object)c.getPendingCH());
        }).finish();
        transport.updateView(2, Arrays.asList(A, B));
        managerNotifier.notifyViewChange(Arrays.asList(A, B), Collections.singletonList(A), A, 2);
        transport.expectHeartBeatCommand().finish();
        CacheStatusResponse joinResponseB1 = (CacheStatusResponse)CompletionStages.join((CompletionStage)ctm.handleJoin(CACHE_NAME, B, this.joinInfoB, 1));
        AssertJUnit.assertNull((Object)joinResponseB1);
        CacheStatusResponse joinResponseB = (CacheStatusResponse)CompletionStages.join((CompletionStage)ctm.handleJoin(CACHE_NAME, B, this.joinInfoB, 2));
        AssertJUnit.assertEquals((int)1, (int)joinResponseB.getCacheTopology().getTopologyId());
        this.assertCHMembers(joinResponseB.getCacheTopology().getCurrentCH(), A);
        AssertJUnit.assertNull((Object)joinResponseB.getCacheTopology().getPendingCH());
        this.verifyRebalance(transport, ltm, ctm, 2, 1, Collections.singletonList(A), Arrays.asList(A, B));
        transport.verifyNoErrors();
        gcr.stop();
    }

    public void testCoordinatorLostDuringRebalance() throws Exception {
        GlobalConfiguration gc = GlobalConfigurationBuilder.defaultClusteredBuilder().build();
        EmbeddedCacheManager cacheManager = (EmbeddedCacheManager)Mockito.mock(EmbeddedCacheManager.class);
        GlobalComponentRegistry gcr = new GlobalComponentRegistry(gc, cacheManager, Collections.emptySet(), TestModuleRepository.defaultModuleRepository(), (ConfigurationManager)Mockito.mock(ConfigurationManager.class));
        BasicComponentRegistry gbcr = (BasicComponentRegistry)gcr.getComponent(BasicComponentRegistry.class);
        CacheManagerNotifierImpl managerNotifier = new CacheManagerNotifierImpl();
        gbcr.replaceComponent(CacheManagerNotifier.class.getName(), (Object)managerNotifier, false);
        managerNotifier.start();
        MockTransport transport = new MockTransport(B);
        gbcr.replaceComponent(Transport.class.getName(), (Object)transport, false);
        PersistentUUIDManagerImpl persistentUUIDManager = new PersistentUUIDManagerImpl();
        gbcr.replaceComponent(PersistentUUIDManager.class.getName(), (Object)persistentUUIDManager, false);
        gbcr.replaceComponent("org.infinispan.executors.non-blocking", (Object)this.executor, false);
        gbcr.replaceComponent("org.infinispan.executors.timeout", (Object)this.scheduledExecutor, false);
        MockLocalTopologyManager ltm = new MockLocalTopologyManager(CACHE_NAME);
        gbcr.replaceComponent(LocalTopologyManager.class.getName(), (Object)ltm, false);
        transport.init(2, Arrays.asList(A, B));
        ConsistentHash stableCH = this.replicatedChf.create(this.joinInfoA.getNumOwners(), this.joinInfoA.getNumSegments(), Collections.singletonList(A), null);
        ConsistentHash pendingCH = this.replicatedChf.create(this.joinInfoA.getNumOwners(), this.joinInfoA.getNumSegments(), Arrays.asList(A, B), null);
        CacheTopology initialTopology = new CacheTopology(4, 2, stableCH, pendingCH, CacheTopology.Phase.READ_NEW_WRITE_ALL, Arrays.asList(A, B), Arrays.asList(this.joinInfoA.getPersistentUUID(), this.joinInfoB.getPersistentUUID()));
        CacheTopology stableTopology = new CacheTopology(1, 1, stableCH, null, CacheTopology.Phase.NO_REBALANCE, Collections.singletonList(A), Collections.singletonList(this.joinInfoA.getPersistentUUID()));
        ltm.init(this.joinInfoA, initialTopology, stableTopology, AvailabilityMode.AVAILABLE);
        persistentUUIDManager.addPersistentAddressMapping(A, this.joinInfoA.getPersistentUUID());
        persistentUUIDManager.addPersistentAddressMapping(B, this.joinInfoB.getPersistentUUID());
        ClusterTopologyManagerImpl ctm = new ClusterTopologyManagerImpl();
        gbcr.replaceComponent(ClusterTopologyManager.class.getName(), (Object)ctm, false);
        gcr.rewire();
        ExceptionRunnable[] exceptionRunnableArray = new ExceptionRunnable[2];
        exceptionRunnableArray[0] = () -> ((ClusterTopologyManagerImpl)ctm).start();
        exceptionRunnableArray[1] = () -> transport.expectCommand(RebalanceStatusRequestCommand.class).singleResponse(A, (Response)SuccessfulResponse.create((Object)true));
        this.runConcurrently(exceptionRunnableArray);
        this.eventuallyEquals(ClusterTopologyManager.ClusterManagerStatus.REGULAR_MEMBER, () -> ((ClusterTopologyManagerImpl)ctm).getStatus());
        transport.updateView(3, Collections.singletonList(B));
        managerNotifier.notifyViewChange(Collections.singletonList(B), Arrays.asList(A, B), B, 3);
        transport.expectCommand(CacheStatusRequestCommand.class).finish();
        ltm.expectTopology(5, Arrays.asList(A, B), null, CacheTopology.Phase.NO_REBALANCE);
        transport.expectCommand(TopologyUpdateCommand.class, c -> {
            AssertJUnit.assertEquals((int)5, (int)c.getTopologyId());
            this.assertCHMembers(c.getCurrentCH(), A, B);
            AssertJUnit.assertNull((Object)c.getPendingCH());
        });
        transport.expectCommand(TopologyUpdateStableCommand.class, c -> {
            AssertJUnit.assertEquals((int)1, (int)c.getTopologyId());
            this.assertCHMembers(c.getCurrentCH(), A);
            AssertJUnit.assertNull((Object)c.getPendingCH());
        });
        ltm.expectTopology(6, Collections.singletonList(B), null, CacheTopology.Phase.NO_REBALANCE);
        transport.expectCommand(TopologyUpdateCommand.class, c -> {
            AssertJUnit.assertEquals((int)6, (int)c.getTopologyId());
            this.assertCHMembers(c.getCurrentCH(), B);
            AssertJUnit.assertNull((Object)c.getPendingCH());
        });
        transport.expectCommand(TopologyUpdateStableCommand.class, c -> {
            AssertJUnit.assertEquals((int)6, (int)c.getTopologyId());
            this.assertCHMembers(c.getCurrentCH(), B);
            AssertJUnit.assertNull((Object)c.getPendingCH());
        });
        Thread.sleep(1L);
        transport.verifyNoErrors();
        transport.updateView(4, Arrays.asList(B, A));
        managerNotifier.notifyViewChange(Arrays.asList(B, A), Collections.singletonList(B), A, 4);
        transport.expectHeartBeatCommand().finish();
        ctm.handleJoin(CACHE_NAME, A, this.joinInfoA, 4);
        this.verifyRebalance(transport, ltm, ctm, 7, 4, Collections.singletonList(B), Arrays.asList(B, A));
        transport.verifyNoErrors();
        gcr.stop();
    }

    private void verifyRebalance(MockTransport transport, MockLocalTopologyManager ltm, ClusterTopologyManagerImpl ctm, int rebalanceTopologyId, int rebalanceViewId, List<Address> initialMembers, List<Address> finalMembers) throws Exception {
        ltm.expectTopology(rebalanceTopologyId, initialMembers, finalMembers, CacheTopology.Phase.READ_OLD_WRITE_ALL);
        transport.expectCommand(RebalanceStartCommand.class, c -> {
            AssertJUnit.assertEquals((int)rebalanceTopologyId, (int)c.getTopologyId());
            AssertJUnit.assertEquals((Object)CacheTopology.Phase.READ_OLD_WRITE_ALL, (Object)c.getPhase());
            AssertJUnit.assertEquals((Object)initialMembers, (Object)c.getCurrentCH().getMembers());
            AssertJUnit.assertEquals((Object)finalMembers, (Object)c.getPendingCH().getMembers());
        }).finish();
        ctm.handleRebalancePhaseConfirm(CACHE_NAME, A, rebalanceTopologyId, null, rebalanceViewId);
        ctm.handleRebalancePhaseConfirm(CACHE_NAME, B, rebalanceTopologyId, null, rebalanceViewId);
        ltm.expectTopology(rebalanceTopologyId + 1, initialMembers, finalMembers, CacheTopology.Phase.READ_ALL_WRITE_ALL);
        transport.expectCommand(TopologyUpdateCommand.class, c -> {
            AssertJUnit.assertEquals((int)(rebalanceTopologyId + 1), (int)c.getTopologyId());
            AssertJUnit.assertEquals((Object)CacheTopology.Phase.READ_ALL_WRITE_ALL, (Object)c.getPhase());
            AssertJUnit.assertEquals((Object)initialMembers, (Object)c.getCurrentCH().getMembers());
            AssertJUnit.assertEquals((Object)finalMembers, (Object)c.getPendingCH().getMembers());
        }).finish();
        ctm.handleRebalancePhaseConfirm(CACHE_NAME, A, rebalanceTopologyId + 1, null, rebalanceViewId);
        ctm.handleRebalancePhaseConfirm(CACHE_NAME, B, rebalanceTopologyId + 1, null, rebalanceViewId);
        ltm.expectTopology(rebalanceTopologyId + 2, initialMembers, finalMembers, CacheTopology.Phase.READ_NEW_WRITE_ALL);
        transport.expectCommand(TopologyUpdateCommand.class, c -> {
            AssertJUnit.assertEquals((int)(rebalanceTopologyId + 2), (int)c.getTopologyId());
            AssertJUnit.assertEquals((Object)CacheTopology.Phase.READ_NEW_WRITE_ALL, (Object)c.getPhase());
            AssertJUnit.assertEquals((Object)initialMembers, (Object)c.getCurrentCH().getMembers());
            AssertJUnit.assertEquals((Object)finalMembers, (Object)c.getPendingCH().getMembers());
        }).finish();
        ctm.handleRebalancePhaseConfirm(CACHE_NAME, A, rebalanceTopologyId + 2, null, rebalanceViewId);
        ctm.handleRebalancePhaseConfirm(CACHE_NAME, B, rebalanceTopologyId + 2, null, rebalanceViewId);
        ltm.expectTopology(rebalanceTopologyId + 3, finalMembers, null, CacheTopology.Phase.NO_REBALANCE);
        transport.expectCommand(TopologyUpdateCommand.class, c -> {
            AssertJUnit.assertEquals((int)(rebalanceTopologyId + 3), (int)c.getTopologyId());
            AssertJUnit.assertEquals((Object)CacheTopology.Phase.NO_REBALANCE, (Object)c.getPhase());
            AssertJUnit.assertEquals((Object)finalMembers, (Object)c.getCurrentCH().getMembers());
            AssertJUnit.assertNull((Object)c.getPendingCH());
        }).finish();
        transport.expectCommand(TopologyUpdateStableCommand.class, c -> {
            AssertJUnit.assertEquals((int)(rebalanceTopologyId + 3), (int)c.getTopologyId());
            AssertJUnit.assertEquals((Object)finalMembers, (Object)c.getCurrentCH().getMembers());
            AssertJUnit.assertNull((Object)c.getPendingCH());
        }).finish();
    }

    private void assertCHMembers(ConsistentHash ch, Address ... members) {
        AssertJUnit.assertEquals(Arrays.asList(members), (Object)ch.getMembers());
    }

    @AfterClass(alwaysRun=true)
    public void shutdownExecutors() throws InterruptedException {
        this.executor.shutdownNow();
        AssertJUnit.assertTrue((boolean)this.executor.awaitTermination(10L, TimeUnit.SECONDS));
        this.scheduledExecutor.shutdownNow();
        AssertJUnit.assertTrue((boolean)this.scheduledExecutor.awaitTermination(10L, TimeUnit.SECONDS));
    }
}

