/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.statetransfer;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.Cache;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.control.LockControlCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.distribution.ch.ConsistentHashFactory;
import org.infinispan.interceptors.AsyncInterceptor;
import org.infinispan.interceptors.BaseCustomAsyncInterceptor;
import org.infinispan.interceptors.impl.EntryWrappingInterceptor;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.util.ReplicatedControlledConsistentHashFactory;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="statetransfer.ReplCommandForwardingTest")
@CleanupAfterMethod
public class ReplCommandForwardingTest
extends MultipleCacheManagersTest {
    private static final String CACHE_NAME = "testCache";

    @Override
    protected void createCacheManagers() {
    }

    private ConfigurationBuilder buildConfig(Class<?> commandToBlock) {
        ConfigurationBuilder configurationBuilder = ReplCommandForwardingTest.getDefaultClusteredCacheConfig(CacheMode.REPL_ASYNC, false);
        configurationBuilder.clustering().remoteTimeout(15000L);
        configurationBuilder.clustering().hash().numSegments(1).consistentHashFactory((ConsistentHashFactory)new ReplicatedControlledConsistentHashFactory(0, new int[0]));
        configurationBuilder.clustering().stateTransfer().fetchInMemoryState(true);
        configurationBuilder.customInterceptors().addInterceptor().after(EntryWrappingInterceptor.class).interceptor((AsyncInterceptor)new DelayInterceptor(commandToBlock));
        return configurationBuilder;
    }

    public void testForwardToJoinerNonTransactional() throws Exception {
        EmbeddedCacheManager cm1 = this.addClusterEnabledCacheManager();
        Cache c1 = cm1.createCache(CACHE_NAME, this.buildConfig(PutKeyValueCommand.class).build());
        DelayInterceptor di1 = TestingUtil.findInterceptor(c1, DelayInterceptor.class);
        int initialTopologyId = c1.getAdvancedCache().getDistributionManager().getCacheTopology().getTopologyId();
        EmbeddedCacheManager cm2 = this.addClusterEnabledCacheManager();
        Cache c2 = cm2.createCache(CACHE_NAME, this.buildConfig(PutKeyValueCommand.class).build());
        DelayInterceptor di2 = TestingUtil.findInterceptor(c2, DelayInterceptor.class);
        this.waitForStateTransfer(initialTopologyId + 4, c1, c2);
        EmbeddedCacheManager cm3 = this.addClusterEnabledCacheManager();
        cm3.createCache("differentCache", ReplCommandForwardingTest.getDefaultClusteredCacheConfig(CacheMode.REPL_SYNC).build());
        Future<Object> f = this.fork(() -> {
            log.tracef("Initiating a put command on %s", (Object)c1);
            c1.put((Object)"k", (Object)"v");
            return null;
        });
        di1.waitUntilBlocked(1);
        di2.waitUntilBlocked(1);
        Cache c3 = cm3.createCache(CACHE_NAME, this.buildConfig(PutKeyValueCommand.class).build());
        DelayInterceptor di3 = TestingUtil.findInterceptor(c3, DelayInterceptor.class);
        this.waitForStateTransfer(initialTopologyId + 8, c1, c2, c3);
        di2.unblock(1);
        di1.unblock(1);
        Thread.sleep(2000L);
        AssertJUnit.assertEquals((String)("The command shouldn't have been forwarded to " + c3), (int)0, (int)di3.getCounter());
        log.tracef("Waiting for the put command to finish on %s", (Object)c1);
        Object retval = f.get(10L, TimeUnit.SECONDS);
        log.tracef("Put command finished on %s", (Object)c1);
        AssertJUnit.assertNull((Object)retval);
        AssertJUnit.assertEquals((int)1, (int)di1.getCounter());
        AssertJUnit.assertEquals((int)1, (int)di2.getCounter());
        AssertJUnit.assertEquals((int)0, (int)di3.getCounter());
    }

    private void waitForStateTransfer(int expectedTopologyId, Cache ... caches) {
        TestingUtil.waitForNoRebalance(caches);
        for (Cache c : caches) {
            LocalizedCacheTopology cacheTopology = c.getAdvancedCache().getDistributionManager().getCacheTopology();
            AssertJUnit.assertEquals((String)String.format("Wrong topology on cache %s, expected %d and got %s", c, expectedTopologyId, cacheTopology), (int)cacheTopology.getTopologyId(), (int)expectedTopologyId);
        }
    }

    class DelayInterceptor
    extends BaseCustomAsyncInterceptor {
        private final AtomicInteger counter = new AtomicInteger(0);
        private final CheckPoint checkPoint = new CheckPoint();
        private final Class<?> commandToBlock;

        public DelayInterceptor(Class<?> commandToBlock) {
            this.commandToBlock = commandToBlock;
        }

        public int getCounter() {
            return this.counter.get();
        }

        public void waitUntilBlocked(int count) throws TimeoutException, InterruptedException {
            String event = this.checkPoint.peek(5L, TimeUnit.SECONDS, "blocked_" + count + "_on_" + this.cache);
            AssertJUnit.assertEquals((String)("blocked_" + count + "_on_" + this.cache), (String)event);
        }

        public void unblock(int count) throws InterruptedException, TimeoutException, BrokenBarrierException {
            log.tracef("Unblocking command on cache %s", (Object)this.cache);
            this.checkPoint.awaitStrict("blocked_" + count + "_on_" + this.cache, 5L, TimeUnit.SECONDS);
            this.checkPoint.trigger("resume_" + count + "_on_" + this.cache);
        }

        public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
            return this.invokeNextThenAccept(ctx, (VisitableCommand)command, (rCtx, rCommand, rv) -> {
                if (!ctx.isInTxScope() && !command.hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) {
                    this.doBlock(ctx, (ReplicableCommand)command);
                }
            });
        }

        public Object visitLockControlCommand(TxInvocationContext ctx, LockControlCommand command) throws Throwable {
            return this.invokeNextThenAccept((InvocationContext)ctx, (VisitableCommand)command, (rCtx, rCommand, rv) -> {
                if (!ctx.getCacheTransaction().isFromStateTransfer()) {
                    this.doBlock((InvocationContext)ctx, (ReplicableCommand)command);
                }
            });
        }

        public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable {
            return this.invokeNextThenAccept((InvocationContext)ctx, (VisitableCommand)command, (rCtx, rCommand, rv) -> {
                if (!ctx.getCacheTransaction().isFromStateTransfer()) {
                    this.doBlock((InvocationContext)ctx, (ReplicableCommand)command);
                }
            });
        }

        public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable {
            return this.invokeNextThenAccept((InvocationContext)ctx, (VisitableCommand)command, (rCtx, rCommand, rv) -> {
                if (!ctx.getCacheTransaction().isFromStateTransfer()) {
                    this.doBlock((InvocationContext)ctx, (ReplicableCommand)command);
                }
            });
        }

        private void doBlock(InvocationContext ctx, ReplicableCommand command) throws InterruptedException, TimeoutException {
            if (this.commandToBlock != command.getClass()) {
                return;
            }
            log.tracef("Delaying command %s originating from %s", (Object)command, (Object)ctx.getOrigin());
            Integer myCount = this.counter.incrementAndGet();
            this.checkPoint.trigger("blocked_" + myCount + "_on_" + this.cache);
            this.checkPoint.awaitStrict("resume_" + myCount + "_on_" + this.cache, 15L, TimeUnit.SECONDS);
            log.tracef("Command unblocked: %s", (Object)command);
        }

        public String toString() {
            return "DelayInterceptor{counter=" + this.counter + "}";
        }
    }
}

