/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.distribution;

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.infinispan.Cache;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.read.GetCacheEntryCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.remote.ClusteredGetCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.test.Exceptions;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.container.entries.ImmortalCacheValue;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.interceptors.AsyncInterceptor;
import org.infinispan.interceptors.DDAsyncInterceptor;
import org.infinispan.remoting.RemoteException;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.responses.CacheNotFoundResponse;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ResponseCollector;
import org.infinispan.remoting.transport.impl.MapResponseCollector;
import org.infinispan.remoting.transport.jgroups.JGroupsAddress;
import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
import org.infinispan.statetransfer.StateTransferInterceptor;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestDataSCI;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.test.fwk.TransportFlags;
import org.infinispan.util.ByteString;
import org.infinispan.util.concurrent.TimeoutException;
import org.jgroups.JChannel;
import org.jgroups.View;
import org.jgroups.protocols.pbcast.GMS;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="distribution.RemoteGetFailureTest")
@CleanupAfterMethod
public class RemoteGetFailureTest
extends MultipleCacheManagersTest {
    private Object key;

    @Override
    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder builder = RemoteGetFailureTest.getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC);
        builder.clustering().stateTransfer().timeout(10L, TimeUnit.SECONDS);
        builder.clustering().remoteTimeout(5L, TimeUnit.SECONDS);
        this.createClusteredCaches(3, TestDataSCI.INSTANCE, builder, new TransportFlags().withFD(true), new String[0]);
        this.waitForClusterToForm();
        this.key = this.getKeyForCache(this.cache(1), this.cache(2));
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void clearContent() throws Throwable {
        for (Cache cache : this.caches()) {
            this.installNewView(cache, this.caches().toArray(new Cache[0]));
        }
        super.clearContent();
    }

    public void testDelayed(Method m) {
        this.initAndCheck(m);
        CountDownLatch release = new CountDownLatch(1);
        this.cache(1).getAdvancedCache().getAsyncInterceptorChain().addInterceptor((AsyncInterceptor)new DelayingInterceptor(null, release), 0);
        long requestStart = System.nanoTime();
        AssertJUnit.assertEquals((Object)m.getName(), (Object)this.cache(0).get(this.key));
        long requestEnd = System.nanoTime();
        long remoteTimeout = this.cache(0).getCacheConfiguration().clustering().remoteTimeout();
        long delay = TimeUnit.NANOSECONDS.toMillis(requestEnd - requestStart);
        AssertJUnit.assertTrue((delay < remoteTimeout ? 1 : 0) != 0);
        release.countDown();
    }

    public void testExceptionFromBothOwners(Method m) {
        this.initAndCheck(m);
        this.cache(1).getAdvancedCache().getAsyncInterceptorChain().addInterceptor((AsyncInterceptor)new FailingInterceptor(), 0);
        this.cache(2).getAdvancedCache().getAsyncInterceptorChain().addInterceptor((AsyncInterceptor)new FailingInterceptor(), 0);
        Exceptions.expectException(RemoteException.class, CacheException.class, (String)"Injected", () -> this.cache(0).get(this.key));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testExceptionFromOneOwnerOtherTimeout(Method m) {
        this.initAndCheck(m);
        CountDownLatch release = new CountDownLatch(1);
        this.cache(1).getAdvancedCache().getAsyncInterceptorChain().addInterceptor((AsyncInterceptor)new FailingInterceptor(), 0);
        this.cache(2).getAdvancedCache().getAsyncInterceptorChain().addInterceptor((AsyncInterceptor)new DelayingInterceptor(null, release), 0);
        long requestStart = System.nanoTime();
        try {
            Exceptions.expectException(RemoteException.class, CacheException.class, (String)"Injected", () -> this.cache(0).get(this.key));
            long exceptionThrown = System.nanoTime();
            long remoteTimeout = this.cache(0).getCacheConfiguration().clustering().remoteTimeout();
            long delay = TimeUnit.NANOSECONDS.toMillis(exceptionThrown - requestStart);
            AssertJUnit.assertTrue((delay < remoteTimeout ? 1 : 0) != 0);
        }
        finally {
            release.countDown();
        }
    }

    public void testBothOwnersSuspected(Method m) throws ExecutionException, InterruptedException {
        this.initAndCheck(m);
        CountDownLatch arrival = new CountDownLatch(2);
        CountDownLatch release = new CountDownLatch(1);
        AtomicInteger thrown = new AtomicInteger();
        AtomicInteger retried = new AtomicInteger();
        this.cache(0).getAdvancedCache().getAsyncInterceptorChain().addInterceptorAfter((AsyncInterceptor)new CheckOTEInterceptor(thrown, retried), StateTransferInterceptor.class);
        this.cache(1).getAdvancedCache().getAsyncInterceptorChain().addInterceptor((AsyncInterceptor)new DelayingInterceptor(arrival, release), 0);
        this.cache(2).getAdvancedCache().getAsyncInterceptorChain().addInterceptor((AsyncInterceptor)new DelayingInterceptor(arrival, release), 0);
        Future<Object> future = this.fork(() -> this.cache(0).get(this.key));
        AssertJUnit.assertTrue((boolean)arrival.await(10L, TimeUnit.SECONDS));
        this.installNewView(this.cache(0), this.cache(0));
        AssertJUnit.assertNull((Object)future.get());
        AssertJUnit.assertEquals((int)1, (int)thrown.get());
        AssertJUnit.assertEquals((int)1, (int)retried.get());
        release.countDown();
    }

    public void testOneOwnerSuspected(Method m) throws ExecutionException, InterruptedException {
        this.initAndCheck(m);
        CountDownLatch arrival = new CountDownLatch(2);
        CountDownLatch release1 = new CountDownLatch(1);
        CountDownLatch release2 = new CountDownLatch(1);
        this.cache(1).getAdvancedCache().getAsyncInterceptorChain().addInterceptor((AsyncInterceptor)new DelayingInterceptor(arrival, release1), 0);
        this.cache(2).getAdvancedCache().getAsyncInterceptorChain().addInterceptor((AsyncInterceptor)new DelayingInterceptor(arrival, release2), 0);
        Future<Void> future = this.fork(() -> AssertJUnit.assertEquals((Object)this.cache(0).get(this.key), (Object)m.getName()));
        AssertJUnit.assertTrue((boolean)arrival.await(10L, TimeUnit.SECONDS));
        this.installNewView(this.cache(0), this.cache(0), this.cache(1));
        AssertJUnit.assertFalse((boolean)future.isDone());
        release1.countDown();
        future.get();
        release2.countDown();
    }

    public void testOneOwnerSuspectedNoFilter(Method m) throws ExecutionException, InterruptedException {
        this.initAndCheck(m);
        CountDownLatch arrival = new CountDownLatch(2);
        CountDownLatch release1 = new CountDownLatch(1);
        CountDownLatch release2 = new CountDownLatch(1);
        this.cache(1).getAdvancedCache().getAsyncInterceptorChain().addInterceptor((AsyncInterceptor)new DelayingInterceptor(arrival, release1), 0);
        this.cache(2).getAdvancedCache().getAsyncInterceptorChain().addInterceptor((AsyncInterceptor)new DelayingInterceptor(arrival, release2), 0);
        Address address1 = this.address(1);
        Address address2 = this.address(2);
        List<Address> owners = Arrays.asList(address1, address2);
        ClusteredGetCommand clusteredGet = new ClusteredGetCommand(this.key, ByteString.fromString((String)this.cache(0).getName()), Integer.valueOf(TestingUtil.getSegmentForKey(this.key, this.cache(1))), 0L);
        int timeout = 15;
        RpcOptions rpcOptions = new RpcOptions(DeliverOrder.NONE, 15L, TimeUnit.SECONDS);
        RpcManager rpcManager = this.cache(0).getAdvancedCache().getRpcManager();
        clusteredGet.setTopologyId(rpcManager.getTopologyId());
        CompletableFuture future = rpcManager.invokeCommand(owners, (ReplicableCommand)clusteredGet, (ResponseCollector)MapResponseCollector.ignoreLeavers(), rpcOptions).toCompletableFuture();
        AssertJUnit.assertTrue((boolean)arrival.await(10L, TimeUnit.SECONDS));
        this.installNewView(this.cache(0), this.cache(0), this.cache(1));
        Thread.sleep(100L);
        AssertJUnit.assertFalse((boolean)future.isDone());
        long requestAllowed = System.nanoTime();
        release1.countDown();
        Map responses = (Map)future.get();
        long requestCompleted = System.nanoTime();
        long requestSeconds = TimeUnit.NANOSECONDS.toSeconds(requestCompleted - requestAllowed);
        AssertJUnit.assertTrue((String)("Request took too long: " + requestSeconds), (requestSeconds < 7L ? 1 : 0) != 0);
        AssertJUnit.assertEquals((Object)SuccessfulResponse.create((Object)new ImmortalCacheValue((Object)m.getName())), responses.get(address1));
        AssertJUnit.assertEquals((Object)CacheNotFoundResponse.INSTANCE, responses.get(address2));
        release2.countDown();
    }

    public void testOneOwnerSuspectedOtherTimeout(Method m) throws ExecutionException, InterruptedException {
        this.initAndCheck(m);
        CountDownLatch arrival = new CountDownLatch(2);
        CountDownLatch release = new CountDownLatch(1);
        this.cache(1).getAdvancedCache().getAsyncInterceptorChain().addInterceptor((AsyncInterceptor)new DelayingInterceptor(arrival, release), 0);
        this.cache(2).getAdvancedCache().getAsyncInterceptorChain().addInterceptor((AsyncInterceptor)new DelayingInterceptor(arrival, release), 0);
        Future<Void> future = this.fork(() -> {
            long start = System.nanoTime();
            Exceptions.expectException(TimeoutException.class, () -> this.cache(0).get(this.key));
            long end = System.nanoTime();
            long duration = TimeUnit.NANOSECONDS.toMillis(end - start);
            AssertJUnit.assertTrue((String)("Request did not wait for long enough: " + duration), (duration >= this.cache(0).getCacheConfiguration().clustering().remoteTimeout() ? 1 : 0) != 0);
        });
        AssertJUnit.assertTrue((boolean)arrival.await(10L, TimeUnit.SECONDS));
        this.installNewView(this.cache(0), this.cache(0), this.cache(1));
        AssertJUnit.assertFalse((boolean)future.isDone());
        future.get();
        release.countDown();
    }

    private void initAndCheck(Method m) {
        this.cache(0).put(this.key, (Object)m.getName());
        AssertJUnit.assertEquals((Object)m.getName(), (Object)this.cache(1).get(this.key));
        AssertJUnit.assertEquals((Object)m.getName(), (Object)this.cache(2).get(this.key));
    }

    private void installNewView(Cache installing, Cache ... cachesInView) {
        JGroupsTransport transport = (JGroupsTransport)installing.getCacheManager().getTransport();
        JChannel channel = transport.getChannel();
        org.jgroups.Address[] members = (org.jgroups.Address[])Stream.of(cachesInView).map(c -> ((JGroupsAddress)this.address((Cache<?, ?>)c)).getJGroupsAddress()).toArray(org.jgroups.Address[]::new);
        View view = View.create((org.jgroups.Address)members[0], (long)(transport.getViewId() + 1), (org.jgroups.Address[])members);
        ((GMS)channel.getProtocolStack().findProtocol(GMS.class)).installView(view);
    }

    class CheckOTEInterceptor
    extends DDAsyncInterceptor {
        private final AtomicInteger thrown;
        private final AtomicInteger retried;

        public CheckOTEInterceptor(AtomicInteger thrown, AtomicInteger retried) {
            this.thrown = thrown;
            this.retried = retried;
        }

        public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable {
            if (command.hasAnyFlag(FlagBitSets.COMMAND_RETRY)) {
                this.retried.incrementAndGet();
            }
            return this.invokeNextAndExceptionally(ctx, (VisitableCommand)command, (rCtx, rCommand, t) -> {
                this.thrown.incrementAndGet();
                throw t;
            });
        }
    }

    static class DelayingInterceptor
    extends DDAsyncInterceptor {
        private final CountDownLatch arrival;
        private final CountDownLatch release;

        private DelayingInterceptor(CountDownLatch arrival, CountDownLatch release) {
            this.arrival = arrival;
            this.release = release;
        }

        public Object visitGetCacheEntryCommand(InvocationContext ctx, GetCacheEntryCommand command) throws Throwable {
            if (this.arrival != null) {
                this.arrival.countDown();
            }
            this.release.await(30L, TimeUnit.SECONDS);
            return super.visitGetCacheEntryCommand(ctx, command);
        }
    }

    static class FailingInterceptor
    extends DDAsyncInterceptor {
        FailingInterceptor() {
        }

        public Object visitGetCacheEntryCommand(InvocationContext ctx, GetCacheEntryCommand command) throws Throwable {
            throw new CacheException("Injected");
        }
    }
}

