/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.commands;

import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.Cache;
import org.infinispan.commands.remote.ClusteredGetAllCommand;
import org.infinispan.commands.statetransfer.StateResponseCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.container.entries.ImmortalCacheValue;
import org.infinispan.container.entries.InternalCacheValue;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.distribution.MagicKey;
import org.infinispan.distribution.ch.ConsistentHashFactory;
import org.infinispan.remoting.responses.CacheNotFoundResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.responses.UnsureResponse;
import org.infinispan.statetransfer.StateTransferLock;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.topology.CacheTopology;
import org.infinispan.util.ControlledConsistentHashFactory;
import org.infinispan.util.ControlledRpcManager;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="commands.GetAllCacheNotFoundResponseTest")
public class GetAllCacheNotFoundResponseTest
extends MultipleCacheManagersTest {
    @Override
    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder cb = GetAllCacheNotFoundResponseTest.getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC);
        ControlledConsistentHashFactory.Default chf = new ControlledConsistentHashFactory.Default(new int[][]{{0, 1}, {0, 2}, {2, 3}});
        cb.clustering().hash().numOwners(2).numSegments(3).consistentHashFactory((ConsistentHashFactory)chf);
        this.createClusteredCaches(5, cb);
    }

    public void test() throws InterruptedException, ExecutionException, TimeoutException {
        ControlledRpcManager crm4 = ControlledRpcManager.replaceRpcManager(this.cache(4));
        crm4.excludeCommands(StateResponseCommand.class);
        MagicKey key1 = new MagicKey(this.cache(0), this.cache(1));
        MagicKey key2 = new MagicKey(this.cache(0), this.cache(2));
        MagicKey key3 = new MagicKey(this.cache(2), this.cache(3));
        LinkedHashSet<MagicKey> keys = new LinkedHashSet<MagicKey>(Arrays.asList(key1, key2, key3));
        Future<Map> future = this.fork(() -> this.cache(4).getAdvancedCache().getAll(keys));
        log.debugf("Expect first get all", new Object[0]);
        ControlledRpcManager.BlockedRequests<ClusteredGetAllCommand> round1 = crm4.expectCommands(ClusteredGetAllCommand.class, this.address(0), this.address(2));
        round1.skipSendAndReceive(this.address(0), (Response)CacheNotFoundResponse.INSTANCE);
        round1.skipSendAndReceiveAsync(this.address(2), (Response)UnsureResponse.INSTANCE);
        log.debugf("Expect 1st retry", new Object[0]);
        ControlledRpcManager.BlockedRequests<ClusteredGetAllCommand> round2 = crm4.expectCommands(ClusteredGetAllCommand.class, this.address(1), this.address(2), this.address(3));
        round2.skipSendAndReceive(this.address(1), (Response)CacheNotFoundResponse.INSTANCE);
        round2.skipSendAndReceive(this.address(2), (Response)SuccessfulResponse.create((Object)new InternalCacheValue[]{new ImmortalCacheValue((Object)"value2")}));
        round2.skipSendAndReceiveAsync(this.address(3), (Response)SuccessfulResponse.create((Object)new InternalCacheValue[]{new ImmortalCacheValue((Object)"value3")}));
        crm4.expectNoCommand(10L, TimeUnit.MILLISECONDS);
        log.debugf("Increment topology and expect 2nd retry", new Object[0]);
        Future<Void> topologyUpdateFuture = this.simulateTopologyUpdate(this.cache(4));
        ControlledRpcManager.BlockedRequests<ClusteredGetAllCommand> round3 = crm4.expectCommands(ClusteredGetAllCommand.class, this.address(0));
        round3.skipSendAndReceive(this.address(0), (Response)SuccessfulResponse.create((Object)new InternalCacheValue[]{null}));
        log.debugf("Expect final result", new Object[0]);
        topologyUpdateFuture.get(10L, TimeUnit.SECONDS);
        Map values = future.get(10L, TimeUnit.SECONDS);
        AssertJUnit.assertEquals(null, values.get(key1));
        AssertJUnit.assertEquals((Object)"value2", values.get(key2));
        AssertJUnit.assertEquals((Object)"value3", values.get(key3));
    }

    private Future<Void> simulateTopologyUpdate(Cache<Object, Object> cache) {
        StateTransferLock stl4 = TestingUtil.extractComponent(cache, StateTransferLock.class);
        DistributionManager dm4 = cache.getAdvancedCache().getDistributionManager();
        LocalizedCacheTopology cacheTopology = dm4.getCacheTopology();
        int newTopologyId = cacheTopology.getTopologyId() + 1;
        CacheTopology newTopology = new CacheTopology(newTopologyId, cacheTopology.getRebalanceId(), cacheTopology.getCurrentCH(), cacheTopology.getPendingCH(), cacheTopology.getUnionCH(), cacheTopology.getPhase(), cacheTopology.getActualMembers(), cacheTopology.getMembersPersistentUUIDs());
        dm4.setCacheTopology(newTopology);
        return this.fork(() -> stl4.notifyTransactionDataReceived(newTopologyId));
    }
}

