/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.statetransfer;

import java.lang.reflect.Method;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import org.infinispan.Cache;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.protostream.SerializationContextInitializer;
import org.infinispan.protostream.annotations.AutoProtoSchemaBuilder;
import org.infinispan.protostream.annotations.ProtoField;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateTransferFunctionalSCIImpl;
import org.infinispan.test.AbstractCacheTest;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.data.DelayedMarshallingPojo;
import org.infinispan.test.fwk.TransportFlags;
import org.infinispan.transaction.LockingMode;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="statetransfer.StateTransferFunctionalTest")
public class StateTransferFunctionalTest
extends MultipleCacheManagersTest {
    public static final String A_B_NAME = "a_b_name";
    public static final String A_C_NAME = "a_c_name";
    public static final String A_D_NAME = "a_d_age";
    public static final String A_B_AGE = "a_b_age";
    public static final String A_C_AGE = "a_c_age";
    public static final String A_D_AGE = "a_d_age";
    public static final String JOE = "JOE";
    public static final String BOB = "BOB";
    public static final String JANE = "JANE";
    public static final Integer TWENTY = 20;
    public static final Integer FORTY = 40;
    protected SerializationContextInitializer sci;
    protected ConfigurationBuilder configurationBuilder;
    protected final String cacheName;
    private volatile int testCount = 0;
    private static final Log log = LogFactory.getLog(StateTransferFunctionalTest.class);

    public StateTransferFunctionalTest() {
        this("nbst");
    }

    public StateTransferFunctionalTest(String testCacheName) {
        this.cacheName = testCacheName;
        this.cleanup = AbstractCacheTest.CleanupPhase.AFTER_METHOD;
    }

    @Override
    protected void createCacheManagers() throws Throwable {
        this.sci = new StateTransferFunctionalSCIImpl();
        this.configurationBuilder = StateTransferFunctionalTest.getDefaultClusteredCacheConfig(CacheMode.REPL_SYNC, true);
        this.configurationBuilder.transaction().lockingMode(LockingMode.PESSIMISTIC).useSynchronization(false).recovery().disable();
        this.configurationBuilder.clustering().remoteTimeout(30000L);
        this.configurationBuilder.clustering().stateTransfer().chunkSize(20);
        this.configurationBuilder.locking().useLockStriping(false);
    }

    protected EmbeddedCacheManager createCacheManager(String cacheName) {
        EmbeddedCacheManager cm = this.addClusterEnabledCacheManager(this.sci, this.configurationBuilder, new TransportFlags().withMerge(true));
        cm.defineConfiguration(cacheName, this.configurationBuilder.build());
        return cm;
    }

    public void testInitialStateTransfer(Method m) throws Exception {
        ++this.testCount;
        this.logTestStart(m);
        EmbeddedCacheManager cm1 = this.createCacheManager(this.cacheName);
        Cache cache1 = cm1.getCache(this.cacheName);
        this.writeInitialData((Cache<Object, Object>)cache1);
        EmbeddedCacheManager cm2 = this.createCacheManager(this.cacheName);
        Cache cache2 = cm2.getCache(this.cacheName);
        TestingUtil.waitForNoRebalance(cache1, cache2);
        this.verifyInitialData((Cache<Object, Object>)cache2);
        this.logTestEnd(m);
    }

    public void testInitialStateTransferCacheNotPresent(Method m) throws Exception {
        ++this.testCount;
        this.logTestStart(m);
        EmbeddedCacheManager cacheManager1 = this.createCacheManager(this.cacheName);
        Cache cache1 = cacheManager1.getCache(this.cacheName);
        this.writeInitialData((Cache<Object, Object>)cache1);
        EmbeddedCacheManager cm2 = this.createCacheManager(this.cacheName);
        Cache cache2 = cm2.getCache(this.cacheName);
        TestingUtil.waitForNoRebalance(cache1, cache2);
        this.verifyInitialData((Cache<Object, Object>)cache2);
        cacheManager1.defineConfiguration("otherCache", this.configurationBuilder.build());
        cacheManager1.getCache("otherCache");
        this.logTestEnd(m);
    }

    public void testConcurrentStateTransfer(Method m) throws Exception {
        ++this.testCount;
        this.logTestStart(m);
        Cache cache1 = this.createCacheManager(this.cacheName).getCache(this.cacheName);
        this.writeInitialData((Cache<Object, Object>)cache1);
        EmbeddedCacheManager cm2 = this.createCacheManager(this.cacheName);
        Cache cache2 = cm2.getCache(this.cacheName);
        cache1.put((Object)"delay", (Object)new DelayTransfer());
        TestingUtil.waitForNoRebalance(cache1, cache2);
        this.verifyInitialData((Cache<Object, Object>)cache2);
        EmbeddedCacheManager cm3 = this.createCacheManager(this.cacheName);
        EmbeddedCacheManager cm4 = this.createCacheManager(this.cacheName);
        Future<Cache> joinFuture1 = this.fork(() -> cm3.getCache(this.cacheName));
        Future<Cache> joinFuture2 = this.fork(() -> cm4.getCache(this.cacheName));
        joinFuture1.get(30L, TimeUnit.SECONDS);
        joinFuture2.get(30L, TimeUnit.SECONDS);
        Cache cache3 = cm3.getCache(this.cacheName);
        Cache cache4 = cm4.getCache(this.cacheName);
        TestingUtil.waitForNoRebalance(cache1, cache2, cache3, cache4);
        TestingUtil.waitForNoRebalance(cache1, cache2, cache3, cache4);
        this.verifyInitialData((Cache<Object, Object>)cache3);
        this.verifyInitialData((Cache<Object, Object>)cache4);
        this.logTestEnd(m);
    }

    public void testSTWithThirdWritingNonTxCache(Method m) throws Exception {
        ++this.testCount;
        this.logTestStart(m);
        this.thirdWritingCacheTest(false);
        this.logTestEnd(m);
    }

    public void testSTWithThirdWritingTxCache(Method m) throws Exception {
        ++this.testCount;
        this.logTestStart(m);
        this.thirdWritingCacheTest(true);
        this.logTestEnd(m);
    }

    public void testSTWithWritingNonTxThread(Method m) throws Exception {
        ++this.testCount;
        this.logTestStart(m);
        this.writingThreadTest(false);
        this.logTestEnd(m);
    }

    public void testSTWithWritingTxThread(Method m) throws Exception {
        ++this.testCount;
        this.logTestStart(m);
        this.writingThreadTest(true);
        this.logTestEnd(m);
    }

    public void testInitialStateTransferAfterRestart(Method m) throws Exception {
        ++this.testCount;
        this.logTestStart(m);
        Cache cache1 = this.createCacheManager(this.cacheName).getCache(this.cacheName);
        this.writeInitialData((Cache<Object, Object>)cache1);
        EmbeddedCacheManager cm2 = this.createCacheManager(this.cacheName);
        Cache cache2 = cm2.getCache(this.cacheName);
        TestingUtil.waitForNoRebalance(cache1, cache2);
        this.verifyInitialData((Cache<Object, Object>)cache2);
        cache2.stop();
        cache2.start();
        this.verifyInitialData((Cache<Object, Object>)cache2);
        this.logTestEnd(m);
    }

    private void logTestStart(Method m) {
        this.logTestLifecycle(m, "start");
    }

    private void logTestEnd(Method m) {
        this.logTestLifecycle(m, "end");
    }

    private void logTestLifecycle(Method m, String lifecycle) {
        log.infof("%s %s - %s", (Object)m.getName(), (Object)lifecycle, (Object)this.testCount);
    }

    private void thirdWritingCacheTest(boolean tx) throws Exception {
        Cache cache1 = this.createCacheManager(this.cacheName).getCache(this.cacheName);
        Cache cache3 = this.createCacheManager(this.cacheName).getCache(this.cacheName);
        TestingUtil.blockUntilViewsReceived(60000L, cache1, cache3);
        this.writeInitialData((Cache<Object, Object>)cache1);
        DelayTransfer value = new DelayTransfer();
        cache1.put((Object)"delay", (Object)value);
        value.enableDelay();
        WritingTask writingTask = new WritingTask((Cache<Object, Object>)cache3, tx);
        Future<Integer> future = this.fork(writingTask);
        EmbeddedCacheManager cm2 = this.createCacheManager(this.cacheName);
        Cache cache2 = cm2.getCache(this.cacheName);
        TestingUtil.waitForNoRebalance(cache1, cache2, cache3);
        writingTask.stop();
        int count = future.get(60L, TimeUnit.SECONDS);
        this.verifyInitialData((Cache<Object, Object>)cache2);
        for (int c = 0; c < count; ++c) {
            AssertJUnit.assertEquals((Object)c, (Object)cache2.get((Object)("test" + c)));
        }
    }

    protected void verifyInitialData(Cache<Object, Object> c) {
        Address address = c.getAdvancedCache().getRpcManager().getAddress();
        log.debugf("Checking values on cache " + address, new Object[0]);
        AssertJUnit.assertEquals((String)"Incorrect value for key a_b_name", (Object)JOE, (Object)c.get((Object)A_B_NAME));
        AssertJUnit.assertEquals((String)"Incorrect value for key a_b_age", (Object)TWENTY, (Object)c.get((Object)A_B_AGE));
        AssertJUnit.assertEquals((String)"Incorrect value for key a_c_name", (Object)BOB, (Object)c.get((Object)A_C_NAME));
        AssertJUnit.assertEquals((String)"Incorrect value for key a_c_age", (Object)FORTY, (Object)c.get((Object)A_C_AGE));
    }

    protected void writeInitialData(Cache<Object, Object> c) {
        c.put((Object)A_B_NAME, (Object)JOE);
        c.put((Object)A_B_AGE, (Object)TWENTY);
        c.put((Object)A_C_NAME, (Object)BOB);
        c.put((Object)A_C_AGE, (Object)FORTY);
    }

    private void writingThreadTest(boolean tx) throws Exception {
        Cache cache1 = this.createCacheManager(this.cacheName).getCache(this.cacheName);
        AssertJUnit.assertEquals((int)0, (int)cache1.getAdvancedCache().getDataContainer().size());
        this.writeInitialData((Cache<Object, Object>)cache1);
        DelayTransfer value = new DelayTransfer();
        cache1.put((Object)"delay", (Object)value);
        value.enableDelay();
        WritingTask writingTask = new WritingTask((Cache<Object, Object>)cache1, tx);
        Future<Integer> future = this.fork(writingTask);
        this.verifyInitialData((Cache<Object, Object>)cache1);
        EmbeddedCacheManager cm2 = this.createCacheManager(this.cacheName);
        Cache cache2 = cm2.getCache(this.cacheName);
        TestingUtil.waitForNoRebalance(cache1, cache2);
        writingTask.stop();
        int count = future.get(60L, TimeUnit.SECONDS);
        this.verifyInitialData((Cache<Object, Object>)cache1);
        this.verifyInitialData((Cache<Object, Object>)cache2);
        for (int c = 0; c < count; ++c) {
            AssertJUnit.assertEquals((Object)c, (Object)cache2.get((Object)("test" + c)));
        }
    }

    @AutoProtoSchemaBuilder(includeClasses={DelayedMarshallingPojo.class, DelayTransfer.class}, schemaFileName="test.core.StateTransferFunctionalTest.proto", schemaFilePath="proto/generated", schemaPackageName="org.infinispan.test.core.StateTransferFunctionalTest", service=false)
    static interface StateTransferFunctionalSCI
    extends SerializationContextInitializer {
    }

    private static class WritingTask
    implements Callable<Integer> {
        private final Cache<Object, Object> cache;
        private final boolean tx;
        private volatile boolean stop;
        private TransactionManager tm;

        WritingTask(Cache<Object, Object> cache, boolean tx) {
            this.cache = cache;
            this.tx = tx;
            if (tx) {
                this.tm = TestingUtil.getTransactionManager(cache);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Integer call() throws Exception {
            int c = 0;
            while (!this.stop) {
                boolean success = false;
                try {
                    if (this.tx) {
                        this.tm.begin();
                    }
                    this.cache.put((Object)("test" + c), (Object)c);
                    if (this.tx) {
                        this.tm.commit();
                    }
                    success = true;
                    ++c;
                    Thread.sleep(1L);
                }
                catch (Exception e) {
                    log.errorf((Throwable)e, "Error writing key test%s", (Object)c);
                    this.stop();
                }
                finally {
                    if (!this.tx || success) continue;
                    try {
                        this.tm.rollback();
                    }
                    catch (SystemException e) {
                        log.error((Object)e);
                    }
                }
            }
            return c;
        }

        public void stop() {
            this.stop = true;
        }
    }

    public static class DelayTransfer {
        volatile boolean doDelay = false;

        DelayTransfer() {
        }

        void enableDelay() {
            this.doDelay = true;
        }

        @ProtoField(number=1, defaultValue="false")
        public boolean isIgnore() {
            if (this.doDelay) {
                TestingUtil.sleepThread(1000L);
            }
            return false;
        }

        public void setIgnore(boolean ignore) {
        }
    }
}

