/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.statetransfer;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.commons.tx.lookup.TransactionManagerLookup;
import org.infinispan.configuration.cache.BiasAcquisition;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.Configurations;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.interceptors.AsyncInterceptor;
import org.infinispan.interceptors.BaseAsyncInterceptor;
import org.infinispan.interceptors.DDAsyncInterceptor;
import org.infinispan.interceptors.impl.BiasedEntryWrappingInterceptor;
import org.infinispan.interceptors.impl.CallInterceptor;
import org.infinispan.interceptors.impl.EntryWrappingInterceptor;
import org.infinispan.interceptors.impl.InvocationContextInterceptor;
import org.infinispan.interceptors.impl.RetryingEntryWrappingInterceptor;
import org.infinispan.interceptors.impl.VersionedEntryWrappingInterceptor;
import org.infinispan.manager.CacheContainer;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.topology.ClusterTopologyManager;
import org.infinispan.transaction.LockingMode;
import org.infinispan.transaction.TransactionMode;
import org.infinispan.transaction.lookup.EmbeddedTransactionManagerLookup;
import org.infinispan.util.concurrent.IsolationLevel;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="statetransfer.OperationsDuringStateTransferTest")
@CleanupAfterMethod
public class OperationsDuringStateTransferTest
extends MultipleCacheManagersTest {
    private static final Log log = LogFactory.getLog(OperationsDuringStateTransferTest.class);
    private ConfigurationBuilder cacheConfigBuilder;

    @Override
    public Object[] factory() {
        return new Object[]{new OperationsDuringStateTransferTest().cacheMode(CacheMode.DIST_SYNC).transactional(false), new OperationsDuringStateTransferTest().cacheMode(CacheMode.DIST_SYNC).transactional(true).lockingMode(LockingMode.PESSIMISTIC), new OperationsDuringStateTransferTest().cacheMode(CacheMode.DIST_SYNC).transactional(true).lockingMode(LockingMode.OPTIMISTIC), new OperationsDuringStateTransferTest().cacheMode(CacheMode.REPL_SYNC).transactional(false), new OperationsDuringStateTransferTest().cacheMode(CacheMode.REPL_SYNC).transactional(true).lockingMode(LockingMode.PESSIMISTIC), new OperationsDuringStateTransferTest().cacheMode(CacheMode.REPL_SYNC).transactional(true).lockingMode(LockingMode.OPTIMISTIC), new OperationsDuringStateTransferTest().cacheMode(CacheMode.SCATTERED_SYNC).transactional(false).biasAcquisition(BiasAcquisition.NEVER), new OperationsDuringStateTransferTest().cacheMode(CacheMode.SCATTERED_SYNC).transactional(false).biasAcquisition(BiasAcquisition.ON_WRITE)};
    }

    @Override
    protected void createCacheManagers() {
        this.cacheConfigBuilder = OperationsDuringStateTransferTest.getDefaultClusteredCacheConfig(this.cacheMode, this.transactional, true);
        if (this.transactional.booleanValue()) {
            this.cacheConfigBuilder.transaction().transactionMode(TransactionMode.TRANSACTIONAL).transactionManagerLookup((TransactionManagerLookup)new EmbeddedTransactionManagerLookup());
            this.cacheConfigBuilder.transaction().lockingMode(this.lockingMode);
            if (this.lockingMode == LockingMode.OPTIMISTIC) {
                this.cacheConfigBuilder.locking().isolationLevel(IsolationLevel.REPEATABLE_READ);
            }
        }
        if (this.biasAcquisition != null) {
            this.cacheConfigBuilder.clustering().biasAcquisition(this.biasAcquisition);
        }
        this.cacheConfigBuilder.clustering().hash().numSegments(10).l1().disable().locking().lockAcquisitionTimeout(TestingUtil.shortTimeoutMillis());
        this.cacheConfigBuilder.clustering().stateTransfer().fetchInMemoryState(true).awaitInitialTransfer(false);
        this.addClusterEnabledCacheManager(this.cacheConfigBuilder);
        this.waitForClusterToForm();
    }

    public void testRemove() throws Exception {
        this.cache(0).put((Object)"myKey", (Object)"myValue");
        CountDownLatch removeStartedLatch = new CountDownLatch(1);
        CountDownLatch removeProceedLatch = new CountDownLatch(1);
        this.cacheConfigBuilder.customInterceptors().addInterceptor().after(this.ewi()).interceptor((AsyncInterceptor)new RemoveLatchInterceptor(removeStartedLatch, removeProceedLatch));
        ClusterTopologyManager ctm0 = TestingUtil.extractGlobalComponent((CacheContainer)this.manager(0), ClusterTopologyManager.class);
        ctm0.setRebalancingEnabled(false);
        log.info((Object)"Adding a new node ..");
        this.addClusterEnabledCacheManager(this.cacheConfigBuilder);
        log.info((Object)"Added a new node");
        LocalizedCacheTopology cacheTopology = this.advancedCache(1).getDistributionManager().getCacheTopology();
        AssertJUnit.assertNull((Object)cacheTopology.getPendingCH());
        AssertJUnit.assertTrue((boolean)cacheTopology.getMembers().contains(this.address(0)));
        AssertJUnit.assertFalse((boolean)cacheTopology.getMembers().contains(this.address(1)));
        AssertJUnit.assertFalse((boolean)cacheTopology.getCurrentCH().getMembers().contains(this.address(1)));
        AssertJUnit.assertTrue((boolean)this.cache(1).getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL).keySet().isEmpty());
        Future<Object> getFuture = this.fork(() -> {
            try {
                return this.cache(1).remove((Object)"myKey");
            }
            catch (Exception e) {
                log.errorf((Throwable)e, "PUT failed: %s", (Object)e.getMessage());
                throw e;
            }
        });
        if (!removeStartedLatch.await(10L, TimeUnit.SECONDS)) {
            throw new TimeoutException();
        }
        AssertJUnit.assertTrue((boolean)this.cache(1).getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL).keySet().isEmpty());
        ctm0.setRebalancingEnabled(true);
        TestingUtil.waitForNoRebalance(this.cache(0), this.cache(1));
        AssertJUnit.assertEquals((int)1, (int)this.cache(1).keySet().size());
        removeProceedLatch.countDown();
        Object oldVal = getFuture.get(10L, TimeUnit.SECONDS);
        AssertJUnit.assertNotNull((Object)oldVal);
        AssertJUnit.assertEquals((Object)"myValue", (Object)oldVal);
        AssertJUnit.assertNull((Object)this.cache(0).get((Object)"myKey"));
        AssertJUnit.assertNull((Object)this.cache(1).get((Object)"myKey"));
    }

    public Class<? extends DDAsyncInterceptor> ewi() {
        Class after = this.cacheMode.isScattered() ? (this.biasAcquisition == BiasAcquisition.NEVER ? RetryingEntryWrappingInterceptor.class : BiasedEntryWrappingInterceptor.class) : (Configurations.isTxVersioned((Configuration)this.cache(0).getCacheConfiguration()) ? VersionedEntryWrappingInterceptor.class : EntryWrappingInterceptor.class);
        return after;
    }

    public void testPut() throws Exception {
        this.cache(0).put((Object)"myKey", (Object)"myValue");
        CountDownLatch putStartedLatch = new CountDownLatch(1);
        CountDownLatch putProceedLatch = new CountDownLatch(1);
        this.cacheConfigBuilder.customInterceptors().addInterceptor().after(this.ewi()).interceptor((AsyncInterceptor)new PutLatchInterceptor(putStartedLatch, putProceedLatch));
        ClusterTopologyManager ctm0 = TestingUtil.extractGlobalComponent((CacheContainer)this.manager(0), ClusterTopologyManager.class);
        ctm0.setRebalancingEnabled(false);
        log.info((Object)"Adding a new node ..");
        this.addClusterEnabledCacheManager(this.cacheConfigBuilder);
        log.info((Object)"Added a new node");
        LocalizedCacheTopology cacheTopology = this.advancedCache(1).getDistributionManager().getCacheTopology();
        AssertJUnit.assertNull((Object)cacheTopology.getPendingCH());
        AssertJUnit.assertTrue((boolean)cacheTopology.getMembers().contains(this.address(0)));
        AssertJUnit.assertFalse((boolean)cacheTopology.getMembers().contains(this.address(1)));
        AssertJUnit.assertFalse((boolean)cacheTopology.getCurrentCH().getMembers().contains(this.address(1)));
        AssertJUnit.assertTrue((boolean)this.cache(1).getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL).keySet().isEmpty());
        Future<Object> putFuture = this.fork(() -> {
            try {
                return this.cache(1).put((Object)"myKey", (Object)"newValue");
            }
            catch (Exception e) {
                log.errorf((Throwable)e, "PUT failed: %s", (Object)e.getMessage());
                throw e;
            }
        });
        if (!putStartedLatch.await(10L, TimeUnit.SECONDS)) {
            throw new TimeoutException();
        }
        AssertJUnit.assertTrue((boolean)this.cache(1).getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL).keySet().isEmpty());
        ctm0.setRebalancingEnabled(true);
        TestingUtil.waitForNoRebalance(this.cache(0), this.cache(1));
        AssertJUnit.assertEquals((int)1, (int)this.cache(1).keySet().size());
        putProceedLatch.countDown();
        Object oldVal = putFuture.get(10L, TimeUnit.SECONDS);
        AssertJUnit.assertNotNull((Object)oldVal);
        AssertJUnit.assertEquals((Object)"myValue", (Object)oldVal);
        AssertJUnit.assertEquals((Object)"newValue", (Object)this.cache(0).get((Object)"myKey"));
        AssertJUnit.assertEquals((Object)"newValue", (Object)this.cache(1).get((Object)"myKey"));
    }

    public void testReplace() throws Exception {
        this.cache(0).put((Object)"myKey", (Object)"myValue");
        CountDownLatch replaceStartedLatch = new CountDownLatch(1);
        CountDownLatch replaceProceedLatch = new CountDownLatch(1);
        this.cacheConfigBuilder.customInterceptors().addInterceptor().after(this.ewi()).interceptor((AsyncInterceptor)new ReplaceLatchInterceptor(replaceStartedLatch, replaceProceedLatch));
        ClusterTopologyManager ctm0 = TestingUtil.extractGlobalComponent((CacheContainer)this.manager(0), ClusterTopologyManager.class);
        ctm0.setRebalancingEnabled(false);
        log.info((Object)"Adding a new node ..");
        this.addClusterEnabledCacheManager(this.cacheConfigBuilder);
        log.info((Object)"Added a new node");
        LocalizedCacheTopology cacheTopology = this.advancedCache(1).getDistributionManager().getCacheTopology();
        AssertJUnit.assertNull((Object)cacheTopology.getPendingCH());
        AssertJUnit.assertTrue((boolean)cacheTopology.getMembers().contains(this.address(0)));
        AssertJUnit.assertFalse((boolean)cacheTopology.getMembers().contains(this.address(1)));
        AssertJUnit.assertFalse((boolean)cacheTopology.getCurrentCH().getMembers().contains(this.address(1)));
        AssertJUnit.assertTrue((boolean)this.cache(1).getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL).keySet().isEmpty());
        Future<Object> getFuture = this.fork(() -> {
            try {
                return this.cache(1).replace((Object)"myKey", (Object)"newValue");
            }
            catch (Exception e) {
                log.errorf((Throwable)e, "REPLACE failed: %s", (Object)e.getMessage());
                throw e;
            }
        });
        if (!replaceStartedLatch.await(10L, TimeUnit.SECONDS)) {
            throw new TimeoutException();
        }
        AssertJUnit.assertTrue((boolean)this.cache(1).getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL).keySet().isEmpty());
        ctm0.setRebalancingEnabled(true);
        TestingUtil.waitForNoRebalance(this.cache(0), this.cache(1));
        AssertJUnit.assertEquals((int)1, (int)this.cache(1).keySet().size());
        replaceProceedLatch.countDown();
        Object oldVal = getFuture.get(10L, TimeUnit.SECONDS);
        AssertJUnit.assertNotNull((Object)oldVal);
        AssertJUnit.assertEquals((Object)"myValue", (Object)oldVal);
        AssertJUnit.assertEquals((Object)"newValue", (Object)this.cache(0).get((Object)"myKey"));
        AssertJUnit.assertEquals((Object)"newValue", (Object)this.cache(1).get((Object)"myKey"));
    }

    public void testGet() throws Exception {
        this.cache(0).put((Object)"myKey", (Object)"myValue");
        CountDownLatch applyStateProceedLatch = new CountDownLatch(1);
        CountDownLatch applyStateStartedLatch = new CountDownLatch(1);
        this.cacheConfigBuilder.customInterceptors().addInterceptor().before(InvocationContextInterceptor.class).interceptor((AsyncInterceptor)new StateTransferLatchInterceptor(applyStateStartedLatch, applyStateProceedLatch));
        CountDownLatch getKeyStartedLatch = new CountDownLatch(1);
        CountDownLatch getKeyProceedLatch = new CountDownLatch(1);
        this.cacheConfigBuilder.customInterceptors().addInterceptor().before(CallInterceptor.class).interceptor((AsyncInterceptor)new GetLatchInterceptor(getKeyStartedLatch, getKeyProceedLatch));
        log.info((Object)"Adding a new node ..");
        this.addClusterEnabledCacheManager(this.cacheConfigBuilder);
        log.info((Object)"Added a new node");
        AssertJUnit.assertEquals((int)0, (int)this.cache(1).getAdvancedCache().getDataContainer().size());
        if (!applyStateStartedLatch.await(10L, TimeUnit.SECONDS)) {
            throw new TimeoutException();
        }
        AssertJUnit.assertEquals((int)0, (int)this.cache(1).getAdvancedCache().getDataContainer().size());
        Future<Object> getFuture = this.fork(() -> this.cache(1).get((Object)"myKey"));
        if (!getKeyStartedLatch.await(10L, TimeUnit.SECONDS)) {
            throw new TimeoutException();
        }
        applyStateProceedLatch.countDown();
        TestingUtil.waitForNoRebalance(this.cache(0), this.cache(1));
        AssertJUnit.assertEquals((int)1, (int)this.cache(1).getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL).keySet().size());
        getKeyProceedLatch.countDown();
        Object value = getFuture.get(10L, TimeUnit.SECONDS);
        AssertJUnit.assertEquals((Object)"myValue", (Object)value);
    }

    static class GetLatchInterceptor
    extends BaseAsyncInterceptor {
        private final CountDownLatch getKeyStartedLatch;
        private final CountDownLatch getKeyProceedLatch;

        public GetLatchInterceptor(CountDownLatch getKeyStartedLatch, CountDownLatch getKeyProceedLatch) {
            this.getKeyStartedLatch = getKeyStartedLatch;
            this.getKeyProceedLatch = getKeyProceedLatch;
        }

        public Object visitCommand(InvocationContext ctx, VisitableCommand cmd) throws Throwable {
            if (cmd instanceof GetKeyValueCommand && this.getKeyStartedLatch.getCount() != 0L) {
                this.getKeyStartedLatch.countDown();
                if (!this.getKeyProceedLatch.await(10L, TimeUnit.SECONDS)) {
                    throw new TimeoutException();
                }
            }
            return this.invokeNext(ctx, cmd);
        }
    }

    static class StateTransferLatchInterceptor
    extends BaseAsyncInterceptor {
        private final CountDownLatch applyStateStartedLatch;
        private final CountDownLatch applyStateProceedLatch;

        public StateTransferLatchInterceptor(CountDownLatch applyStateStartedLatch, CountDownLatch applyStateProceedLatch) {
            this.applyStateStartedLatch = applyStateStartedLatch;
            this.applyStateProceedLatch = applyStateProceedLatch;
        }

        public Object visitCommand(InvocationContext ctx, VisitableCommand cmd) throws Throwable {
            if (cmd instanceof PutKeyValueCommand && ((PutKeyValueCommand)cmd).hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) {
                this.applyStateStartedLatch.countDown();
                if (!this.applyStateProceedLatch.await(10L, TimeUnit.SECONDS)) {
                    throw new TimeoutException();
                }
            }
            return this.invokeNext(ctx, cmd);
        }
    }

    static class ReplaceLatchInterceptor
    extends BaseAsyncInterceptor {
        private final CountDownLatch replaceStartedLatch;
        private final CountDownLatch replaceProceedLatch;

        public ReplaceLatchInterceptor(CountDownLatch replaceStartedLatch, CountDownLatch replaceProceedLatch) {
            this.replaceStartedLatch = replaceStartedLatch;
            this.replaceProceedLatch = replaceProceedLatch;
        }

        public Object visitCommand(InvocationContext ctx, VisitableCommand cmd) throws Throwable {
            if (cmd instanceof ReplaceCommand) {
                this.replaceStartedLatch.countDown();
                if (!this.replaceProceedLatch.await(10L, TimeUnit.SECONDS)) {
                    throw new TimeoutException();
                }
            }
            return this.invokeNext(ctx, cmd);
        }
    }

    static class PutLatchInterceptor
    extends BaseAsyncInterceptor {
        private final CountDownLatch putStartedLatch;
        private final CountDownLatch putProceedLatch;

        public PutLatchInterceptor(CountDownLatch putStartedLatch, CountDownLatch putProceedLatch) {
            this.putStartedLatch = putStartedLatch;
            this.putProceedLatch = putProceedLatch;
        }

        public Object visitCommand(InvocationContext ctx, VisitableCommand cmd) throws Throwable {
            if (cmd instanceof PutKeyValueCommand && !((PutKeyValueCommand)cmd).hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) {
                this.putStartedLatch.countDown();
                if (!this.putProceedLatch.await(10L, TimeUnit.SECONDS)) {
                    throw new TimeoutException();
                }
            }
            return this.invokeNext(ctx, cmd);
        }
    }

    static class RemoveLatchInterceptor
    extends BaseAsyncInterceptor {
        private final CountDownLatch removeStartedLatch;
        private final CountDownLatch removeProceedLatch;

        public RemoveLatchInterceptor(CountDownLatch removeStartedLatch, CountDownLatch removeProceedLatch) {
            this.removeStartedLatch = removeStartedLatch;
            this.removeProceedLatch = removeProceedLatch;
        }

        public Object visitCommand(InvocationContext ctx, VisitableCommand cmd) throws Throwable {
            if (cmd instanceof RemoveCommand) {
                this.removeStartedLatch.countDown();
                if (!this.removeProceedLatch.await(10L, TimeUnit.SECONDS)) {
                    throw new TimeoutException();
                }
            }
            return this.invokeNext(ctx, cmd);
        }
    }
}

