/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.scattered.statetransfer;

import java.util.Arrays;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.remote.ClusteredGetCommand;
import org.infinispan.commands.statetransfer.ScatteredStateConfirmRevokedCommand;
import org.infinispan.commands.statetransfer.ScatteredStateGetKeysCommand;
import org.infinispan.commands.statetransfer.StateResponseCommand;
import org.infinispan.commands.statetransfer.StateTransferCancelCommand;
import org.infinispan.commands.topology.RebalanceStartCommand;
import org.infinispan.configuration.cache.BiasAcquisition;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.distribution.BlockingInterceptor;
import org.infinispan.distribution.MagicKey;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.ConsistentHashFactory;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ControlledTransport;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.statetransfer.StateTransferInterceptor;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TopologyChangeListener;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.test.transport.DelayedViewJGroupsTransport;
import org.infinispan.topology.CacheTopology;
import org.infinispan.util.BlockingLocalTopologyManager;
import org.infinispan.util.ControlledConsistentHashFactory;
import org.infinispan.util.ControlledRpcManager;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"unstable"}, testName="scattered.statetransfer.CoordinatorStopTest", description="ISPN-9940")
@CleanupAfterMethod
public class CoordinatorStopTest
extends MultipleCacheManagersTest {
    private CountDownLatch viewLatch;
    private ControlledConsistentHashFactory.Scattered chf;

    @Override
    public Object[] factory() {
        return new Object[]{new CoordinatorStopTest().biasAcquisition(BiasAcquisition.NEVER), new CoordinatorStopTest().biasAcquisition(BiasAcquisition.ON_WRITE)};
    }

    @Override
    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder cb = new ConfigurationBuilder();
        this.chf = new ControlledConsistentHashFactory.Scattered(new int[]{0, 1, 2});
        cb.clustering().cacheMode(CacheMode.SCATTERED_SYNC).hash().numSegments(3).consistentHashFactory((ConsistentHashFactory)this.chf);
        if (this.biasAcquisition != null) {
            cb.clustering().biasAcquisition(this.biasAcquisition);
        }
        this.addClusterEnabledCacheManager(cb);
        this.viewLatch = new CountDownLatch(1);
        GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
        gcb.transport().transport((Transport)new DelayedViewJGroupsTransport(this.viewLatch));
        this.addClusterEnabledCacheManager(gcb, cb);
        gcb.transport().transport((Transport)new DelayedViewJGroupsTransport(this.viewLatch));
        this.addClusterEnabledCacheManager(gcb, cb);
        AssertJUnit.assertTrue((boolean)this.cache(0).getCacheManager().isCoordinator());
        this.cache(1);
        this.cache(2);
        this.waitForClusterToForm();
    }

    public void testCoordinatorLeaves() throws InterruptedException, ExecutionException, TimeoutException, BrokenBarrierException {
        String cacheName = this.cache(1).getName();
        MagicKey key = new MagicKey(this.cache(0));
        this.cache(1).put((Object)key, (Object)"value");
        int stableTopologyId = this.cache(1).getAdvancedCache().getDistributionManager().getCacheTopology().getTopologyId();
        BlockingLocalTopologyManager ltm2 = BlockingLocalTopologyManager.replaceTopologyManager(this.manager(2), cacheName);
        ControlledTransport transport0 = ControlledTransport.replace(this.cache(0));
        ControlledTransport transport1 = ControlledTransport.replace(this.cache(1));
        transport0.blockBefore(RebalanceStartCommand.class, command -> command.getCacheName().equals(cacheName) && command.getTopologyId() == stableTopologyId + 2);
        transport1.blockBefore(RebalanceStartCommand.class, command -> command.getCacheName().equals(cacheName) && command.getTopologyId() == stableTopologyId + 4);
        ControlledRpcManager rpcManager2 = ControlledRpcManager.replaceRpcManager(this.cache(2));
        rpcManager2.excludeCommands(StateResponseCommand.class, ClusteredGetCommand.class);
        this.chf.setOwnerIndexes(new int[][]{{1}, {0}, {1}});
        log.infof("Stopping coordinator %s, last stable topology is %d", (Object)this.manager(0), (Object)stableTopologyId);
        Future<Void> stopFuture = this.fork(() -> this.manager(0).stop());
        BlockingLocalTopologyManager.BlockedTopology t1 = ltm2.expectTopologyUpdate(CacheTopology.Phase.NO_REBALANCE, stableTopologyId + 1);
        if (t1.getCacheTopology().getTopologyId() == stableTopologyId + 1) {
            AssertJUnit.assertEquals((Object)CacheTopology.Phase.NO_REBALANCE, (Object)t1.getPhase());
        }
        AssertJUnit.assertEquals((int)2, (int)t1.getCacheTopology().getActualMembers().size());
        AssertJUnit.assertEquals(null, (Object)t1.getCacheTopology().getPendingCH());
        this.assertOwners(t1, true, 0, new Address[0]);
        this.assertOwners(t1, true, 1, this.address(1));
        this.assertOwners(t1, true, 2, this.address(2));
        t1.unblock();
        transport0.stopBlocking();
        stopFuture.get(10L, TimeUnit.SECONDS);
        BlockingLocalTopologyManager.BlockedTopology t2 = ltm2.expectTopologyUpdate(CacheTopology.Phase.TRANSITORY, stableTopologyId + 2);
        AssertJUnit.assertEquals((Object)CacheTopology.Phase.TRANSITORY, (Object)t2.getPhase());
        AssertJUnit.assertEquals((int)2, (int)t2.getCacheTopology().getActualMembers().size());
        AssertJUnit.assertNotNull((Object)t2.getCacheTopology().getPendingCH());
        this.assertOwners(t2, false, 0, this.address(2));
        this.assertOwners(t2, false, 1, this.address(1));
        this.assertOwners(t2, false, 2, this.address(2));
        t2.unblock();
        rpcManager2.expectCommand(ScatteredStateConfirmRevokedCommand.class).send().receiveAll();
        this.viewLatch.countDown();
        ControlledRpcManager.BlockedRequest<ScatteredStateGetKeysCommand> keyTransferRequest = rpcManager2.expectCommand(ScatteredStateGetKeysCommand.class);
        BlockingLocalTopologyManager.BlockedTopology t3 = ltm2.expectTopologyUpdate(CacheTopology.Phase.NO_REBALANCE, stableTopologyId + 3);
        AssertJUnit.assertEquals((int)2, (int)t3.getCacheTopology().getActualMembers().size());
        AssertJUnit.assertEquals(null, (Object)t3.getCacheTopology().getPendingCH());
        TopologyChangeListener topologyChangeListener = TopologyChangeListener.install(this.cache(2));
        ltm2.stopBlocking();
        t3.unblock();
        if (t3.getCacheTopology().getCurrentCH().locatePrimaryOwnerForSegment(0) == null) {
            ControlledRpcManager.BlockedRequest<StateTransferCancelCommand> cancelStateTransfer = rpcManager2.expectCommand(StateTransferCancelCommand.class);
            cancelStateTransfer.send();
        }
        topologyChangeListener.await(10L, TimeUnit.SECONDS);
        keyTransferRequest.send().receiveAll();
        CyclicBarrier oteBarrier = new CyclicBarrier(2);
        BlockingInterceptor<GetKeyValueCommand> oteInterceptor = new BlockingInterceptor<GetKeyValueCommand>(oteBarrier, GetKeyValueCommand.class, true, true);
        this.cache(2).getAdvancedCache().getAsyncInterceptorChain().addInterceptorAfter(oteInterceptor, StateTransferInterceptor.class);
        Future<Object> future = this.fork(() -> this.cache(2).get((Object)key));
        oteBarrier.await(10L, TimeUnit.SECONDS);
        oteInterceptor.suspend(true);
        rpcManager2.stopBlocking();
        transport1.stopBlocking();
        oteBarrier.await(10L, TimeUnit.SECONDS);
        AssertJUnit.assertEquals((Object)"value", (Object)future.get());
        ((DelayedViewJGroupsTransport)transport1.getDelegate()).assertUnblocked();
        ((DelayedViewJGroupsTransport)this.manager(2).getTransport()).assertUnblocked();
    }

    private void assertOwners(BlockingLocalTopologyManager.BlockedTopology t, boolean current, int segmentId, Address ... address) {
        ConsistentHash ch = current ? t.getCacheTopology().getCurrentCH() : t.getCacheTopology().getPendingCH();
        AssertJUnit.assertEquals((String)("Topology: " + t.getCacheTopology()), Arrays.asList(address), (Object)ch.locateOwnersForSegment(segmentId));
    }
}

