/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.partitionhandling;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.BaseStream;
import org.infinispan.Cache;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.commons.util.Closeables;
import org.infinispan.commons.util.IntSet;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.distribution.MagicKey;
import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.notifications.cachelistener.cluster.ClusterCacheNotifier;
import org.infinispan.partitionhandling.AvailabilityException;
import org.infinispan.partitionhandling.AvailabilityMode;
import org.infinispan.partitionhandling.BasePartitionHandlingTest;
import org.infinispan.reactive.publisher.impl.ClusterPublisherManager;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
import org.infinispan.reactive.publisher.impl.SegmentCompletionPublisher;
import org.infinispan.reactive.publisher.impl.commands.batch.InitialPublisherCommand;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ResponseCollector;
import org.infinispan.test.Mocks;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="partitionhandling.StreamDistPartitionHandlingTest")
public class StreamDistPartitionHandlingTest
extends BasePartitionHandlingTest {
    @Test(expectedExceptions={AvailabilityException.class})
    public void testRetrievalWhenPartitionIsDegraded() {
        Cache cache0 = this.cache(0);
        cache0.put((Object)new MagicKey(this.cache(1), this.cache(2)), (Object)"not-local");
        cache0.put((Object)new MagicKey(this.cache(0), this.cache(1)), (Object)"local");
        this.splitCluster({0, 1}, {2, 3});
        this.partition(0).assertDegradedMode();
        try (CloseableIterator iterator = Closeables.iterator((BaseStream)this.cache(0).entrySet().stream());){
            iterator.next();
        }
    }

    public void testRetrievalWhenPartitionIsDegradedButLocal() {
        Cache cache0 = this.cache(0);
        cache0.put((Object)new MagicKey(this.cache(1), this.cache(2)), (Object)"not-local");
        cache0.put((Object)new MagicKey(this.cache(0), this.cache(1)), (Object)"local");
        this.splitCluster({0, 1}, {2, 3});
        this.partition(0).assertDegradedMode();
        try (CloseableIterator iterator = Closeables.iterator((BaseStream)cache0.getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL).entrySet().stream());){
            AssertJUnit.assertEquals((String)"local", (String)((String)((Map.Entry)iterator.next()).getValue()));
            AssertJUnit.assertFalse((boolean)iterator.hasNext());
        }
    }

    @Test(enabled=false)
    public void testUsingIterableButPartitionOccursBeforeGettingIterator() throws InterruptedException {
    }

    public void testUsingIteratorButPartitionOccursBeforeRetrievingRemoteValues() throws Exception {
        Cache cache0 = this.cache(0);
        cache0.put((Object)new MagicKey(this.cache(1), this.cache(2)), (Object)"not-local");
        cache0.put((Object)new MagicKey(this.cache(0), this.cache(1)), (Object)"local");
        CheckPoint iteratorCP = new CheckPoint();
        iteratorCP.triggerForever("before_release");
        StreamDistPartitionHandlingTest.registerBlockingRpcManagerOnInitialPublisherCommand(iteratorCP, cache0, this.testExecutor());
        try (CloseableIterator iterator = Closeables.iterator((BaseStream)cache0.entrySet().stream());){
            CheckPoint partitionCP = new CheckPoint();
            StreamDistPartitionHandlingTest.registerBlockingCacheNotifierOnDegradedMode(partitionCP, cache0);
            partitionCP.triggerForever("before_release");
            partitionCP.triggerForever("after_release");
            this.splitCluster({0, 1}, {2, 3});
            partitionCP.awaitStrict("after_invocation", 10L, TimeUnit.SECONDS);
            iteratorCP.triggerForever("after_release");
            try {
                while (iterator.hasNext()) {
                    iterator.next();
                }
                AssertJUnit.fail((String)"Expected AvailabilityException");
            }
            catch (AvailabilityException availabilityException) {
                // empty catch block
            }
        }
    }

    public void testUsingIteratorButPartitionOccursAfterRetrievingRemoteValues() throws InterruptedException, TimeoutException, ExecutionException {
        Cache cache0 = this.cache(0);
        cache0.put((Object)new MagicKey(this.cache(1), this.cache(2)), (Object)"not-local");
        cache0.put((Object)new MagicKey(this.cache(0), this.cache(1)), (Object)"local");
        CheckPoint iteratorCP = new CheckPoint();
        StreamDistPartitionHandlingTest.registerBlockingPublisher(iteratorCP, cache0);
        iteratorCP.triggerForever("before_release");
        iteratorCP.triggerForever("after_release");
        try (CloseableIterator iterator = Closeables.iterator((BaseStream)cache0.entrySet().stream());){
            AssertJUnit.assertTrue((boolean)iterator.hasNext());
            iteratorCP.awaitStrict("after_invocation", 10L, TimeUnit.SECONDS);
            CheckPoint partitionCP = new CheckPoint();
            StreamDistPartitionHandlingTest.registerBlockingCacheNotifierOnDegradedMode(partitionCP, cache0);
            this.splitCluster({0, 1}, {2, 3});
            partitionCP.triggerForever("before_release");
            partitionCP.triggerForever("after_release");
            while (iterator.hasNext()) {
                iterator.next();
            }
        }
    }

    private static <K, V> void registerBlockingCacheNotifierOnDegradedMode(CheckPoint checkPoint, Cache<K, V> cache) {
        Mocks.blockingMock(checkPoint, CacheNotifier.class, cache, (stub, m) -> ((CacheNotifier)stub.when(m)).notifyPartitionStatusChanged((AvailabilityMode)Mockito.eq((Object)AvailabilityMode.DEGRADED_MODE), Mockito.eq((boolean)false)), ClusterCacheNotifier.class);
    }

    private static void registerBlockingPublisher(CheckPoint checkPoint, Cache<?, ?> cache) {
        ClusterPublisherManager spy = Mocks.replaceComponentWithSpy(cache, ClusterPublisherManager.class);
        ((ClusterPublisherManager)Mockito.doAnswer(invocation -> {
            SegmentCompletionPublisher result = (SegmentCompletionPublisher)invocation.callRealMethod();
            return Mocks.blockingPublisher(result, checkPoint);
        }).when((Object)spy)).entryPublisher((IntSet)Mockito.any(), (Set)Mockito.any(), (InvocationContext)Mockito.any(), ArgumentMatchers.anyBoolean(), (DeliveryGuarantee)Mockito.any(), ArgumentMatchers.anyInt(), (Function)Mockito.any());
    }

    private static void registerBlockingRpcManagerOnInitialPublisherCommand(CheckPoint checkPoint, Cache<?, ?> cache, Executor completionExecutor) {
        RpcManager realManager = TestingUtil.extractComponent(cache, RpcManager.class);
        RpcManager spy = (RpcManager)Mockito.spy((Object)realManager);
        ((RpcManager)Mockito.doAnswer(invocation -> Mocks.blockingCompletableFuture(() -> {
            try {
                return (CompletableFuture)invocation.callRealMethod();
            }
            catch (Throwable throwable) {
                throw new AssertionError((Object)throwable);
            }
        }, checkPoint, completionExecutor).call()).when((Object)spy)).invokeCommand((Address)Mockito.any(Address.class), (ReplicableCommand)Mockito.any(InitialPublisherCommand.class), (ResponseCollector)Mockito.any(), (RpcOptions)Mockito.any());
        TestingUtil.replaceComponent(cache, RpcManager.class, spy, true);
    }
}

