/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.distribution.rehash;

import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.statetransfer.StateResponseCommand;
import org.infinispan.commands.statetransfer.StateTransferStartCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.distribution.MagicKey;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.TopologyChanged;
import org.infinispan.notifications.cachelistener.event.TopologyChangedEvent;
import org.infinispan.remoting.responses.CacheNotFoundResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestDataSCI;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.test.op.TestWriteOperation;
import org.infinispan.transaction.TransactionMode;
import org.infinispan.util.ControlledRpcManager;
import org.infinispan.util.concurrent.ReclosableLatch;
import org.infinispan.util.concurrent.TimeoutException;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="distribution.rehash.NonTxPrimaryOwnerLeavingTest")
@CleanupAfterMethod
public class NonTxPrimaryOwnerLeavingTest
extends MultipleCacheManagersTest {
    @Override
    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder c = new ConfigurationBuilder();
        c.clustering().cacheMode(CacheMode.DIST_SYNC);
        c.transaction().transactionMode(TransactionMode.NON_TRANSACTIONAL);
        this.createCluster(TestDataSCI.INSTANCE, c, 3);
        this.waitForClusterToForm();
    }

    @Test(groups={"unstable"})
    public void testPrimaryOwnerLeavingDuringPut() throws Exception {
        this.doTest(TestWriteOperation.PUT_CREATE, false);
    }

    public void testPrimaryOwnerLeavingDuringPutIfAbsent() throws Exception {
        this.doTest(TestWriteOperation.PUT_IF_ABSENT, false);
    }

    public void testPrimaryOwnerLeaveDuringPutAll() throws Exception {
        this.doTest(TestWriteOperation.PUT_MAP_CREATE, false);
    }

    public void testPrimaryOwnerLeaveDuringPutAll2() throws Exception {
        this.doTest(TestWriteOperation.PUT_MAP_CREATE, true);
    }

    private void doTest(TestWriteOperation operation, boolean blockTopologyOnOriginator) throws Exception {
        AdvancedCache cache0 = this.advancedCache(0);
        AdvancedCache cache1 = this.advancedCache(1);
        AdvancedCache cache2 = this.advancedCache(2);
        TopologyUpdateListener listener0 = new TopologyUpdateListener();
        cache0.addListener((Object)listener0);
        TopologyUpdateListener listener2 = new TopologyUpdateListener();
        cache2.addListener((Object)listener2);
        ControlledRpcManager crm = ControlledRpcManager.replaceRpcManager(cache0);
        crm.excludeCommands(StateTransferStartCommand.class, StateResponseCommand.class);
        MagicKey key = new MagicKey((Cache<?, ?>)cache1);
        Future<Object> future = this.fork(() -> operation.perform((AdvancedCache<Object, Object>)cache0, key));
        ControlledRpcManager.BlockedRequest<? extends VisitableCommand> blockedWrite = crm.expectCommand(operation.getCommandClass());
        cache1.stop();
        if (!blockTopologyOnOriginator) {
            listener0.unblockOnce();
            listener0.waitForTopologyToFinish();
        }
        blockedWrite.send().expectResponse(this.address(1), (Response)CacheNotFoundResponse.INSTANCE).receive();
        if (blockTopologyOnOriginator) {
            crm.expectNoCommand(100L, TimeUnit.MILLISECONDS);
            listener0.unblockOnce();
            listener0.waitForTopologyToFinish();
        }
        listener2.unblockOnce();
        listener2.waitForTopologyToFinish();
        if (!cache0.getDistributionManager().getCacheTopology().getDistribution((Object)key).isPrimary()) {
            crm.expectCommand(operation.getCommandClass()).send().receiveAll();
        }
        Object result = future.get(10L, TimeUnit.SECONDS);
        AssertJUnit.assertNull((Object)result);
        log.tracef("Write operation is done", new Object[0]);
        cache0.removeListener((Object)listener0);
        cache2.removeListener((Object)listener2);
        listener0.unblockOnce();
        listener0.unblockOnce();
        crm.stopBlocking();
        AssertJUnit.assertEquals((Object)operation.getValue(), (Object)cache0.get((Object)key));
        AssertJUnit.assertEquals((Object)operation.getValue(), (Object)cache2.get((Object)key));
    }

    @Listener
    public class TopologyUpdateListener {
        private final ReclosableLatch preLatch = new ReclosableLatch();
        private final ReclosableLatch postLatch = new ReclosableLatch();
        private volatile boolean broken = false;

        @TopologyChanged
        public void onTopologyChange(TopologyChangedEvent e) throws InterruptedException {
            if (e.isPre()) {
                log.tracef("Blocking topology %d", e.getNewTopologyId());
                this.broken = !this.preLatch.await(10L, TimeUnit.SECONDS);
                this.preLatch.close();
            } else {
                log.tracef("Signalling topology %d finished installing", e.getNewTopologyId());
                this.postLatch.open();
            }
        }

        void unblockOnce() {
            this.preLatch.open();
            AssertJUnit.assertFalse((boolean)this.broken);
        }

        private void waitForTopologyToFinish() throws InterruptedException {
            if (!this.postLatch.await(10L, TimeUnit.SECONDS)) {
                throw new TimeoutException();
            }
            this.postLatch.close();
        }
    }
}

