/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.statetransfer;

import java.util.Arrays;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.transaction.Transaction;
import org.infinispan.Cache;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.remote.recovery.TxCompletionNotificationCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.tx.lookup.TransactionManagerLookup;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.distribution.ch.ConsistentHashFactory;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.interceptors.AsyncInterceptor;
import org.infinispan.interceptors.AsyncInterceptorChain;
import org.infinispan.interceptors.DDAsyncInterceptor;
import org.infinispan.interceptors.impl.CallInterceptor;
import org.infinispan.statetransfer.TransactionSynchronizerInterceptor;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.concurrent.StateSequencer;
import org.infinispan.test.concurrent.StateSequencerUtil;
import org.infinispan.transaction.impl.TransactionTable;
import org.infinispan.transaction.lookup.EmbeddedTransactionManagerLookup;
import org.infinispan.transaction.tm.EmbeddedTransaction;
import org.infinispan.transaction.tm.EmbeddedTransactionManager;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.ByteString;
import org.infinispan.util.ControlledConsistentHashFactory;
import org.infinispan.util.concurrent.IsolationLevel;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="statetransfer.TxReplay2Test")
public class TxReplay2Test
extends MultipleCacheManagersTest {
    private static final String VALUE = "value";
    ControlledConsistentHashFactory consistentHashFactory = new ControlledConsistentHashFactory.Default(0, 1, 2);

    public void testReplay() throws Exception {
        StateSequencer sequencer = new StateSequencer();
        sequencer.logicalThread("tx", "tx:before_prepare_replay", "tx:resume_prepare_replay", "tx:mark_tx_completed");
        sequencer.logicalThread("sim", "sim:before_extra_commit", "sim:during_extra_commit", "sim:after_extra_commit");
        sequencer.order("tx:before_prepare_replay", "sim:before_extra_commit", new String[0]);
        sequencer.order("sim:during_extra_commit", "tx:resume_prepare_replay", new String[0]);
        sequencer.order("sim:after_extra_commit", "tx:mark_tx_completed", new String[0]);
        String key = "key";
        AssertJUnit.assertEquals(Arrays.asList(this.address(0), this.address(1), this.address(2)), (Object)this.cacheTopology(0).getDistribution((Object)key).writeOwners());
        Cache primaryOwnerCache = this.cache(0);
        Cache newBackupOwnerCache = this.cache(3);
        CountingInterceptor newBackupCounter = CountingInterceptor.inject(newBackupOwnerCache);
        CountingInterceptor primaryCounter = CountingInterceptor.inject(primaryOwnerCache);
        CountingInterceptor oldBackup2Counter = CountingInterceptor.inject(this.cache(2));
        StateSequencerUtil.advanceOnInterceptor(sequencer, newBackupOwnerCache, CallInterceptor.class, StateSequencerUtil.matchCommand(PrepareCommand.class).matchCount(0).build()).before("tx:before_prepare_replay", "tx:resume_prepare_replay");
        StateSequencerUtil.advanceOnInterceptor(sequencer, newBackupOwnerCache, TransactionSynchronizerInterceptor.class, StateSequencerUtil.matchCommand(CommitCommand.class).matchCount(1).build()).before("sim:during_extra_commit", new String[0]);
        StateSequencerUtil.advanceOnInboundRpc(sequencer, newBackupOwnerCache, StateSequencerUtil.matchCommand(TxCompletionNotificationCommand.class).build()).before("tx:mark_tx_completed", new String[0]);
        EmbeddedTransactionManager transactionManager = (EmbeddedTransactionManager)this.tm(0);
        transactionManager.begin();
        primaryOwnerCache.put((Object)key, (Object)VALUE);
        EmbeddedTransaction transaction = transactionManager.getTransaction();
        TransactionTable transactionTable0 = TestingUtil.getTransactionTable(primaryOwnerCache);
        GlobalTransaction gtx = transactionTable0.getLocalTransaction((Transaction)transaction).getGlobalTransaction();
        transaction.runPrepare();
        AssertJUnit.assertEquals((String)"Wrong transaction status before killing backup owner.", (int)2, (int)transaction.getStatus());
        this.killMember(1);
        int currentTopologyId = primaryOwnerCache.getAdvancedCache().getDistributionManager().getCacheTopology().getTopologyId();
        Future<Object> secondCommitFuture = this.fork(() -> {
            sequencer.advance("sim:before_extra_commit");
            CommitCommand command = new CommitCommand(ByteString.fromString((String)newBackupOwnerCache.getName()), gtx);
            command.setTopologyId(currentTopologyId);
            command.markTransactionAsRemote(true);
            ComponentRegistry componentRegistry = TestingUtil.extractComponentRegistry(newBackupOwnerCache);
            try {
                command.invokeAsync(componentRegistry);
            }
            catch (Throwable throwable) {
                throw new CacheException(throwable);
            }
            sequencer.advance("sim:after_extra_commit");
            return null;
        });
        this.checkIfTransactionExists(newBackupOwnerCache);
        AssertJUnit.assertEquals((String)"Wrong transaction status after killing backup owner.", (int)2, (int)transaction.getStatus());
        transaction.runCommit(false);
        secondCommitFuture.get(10L, TimeUnit.SECONDS);
        this.assertNoTransactions();
        AssertJUnit.assertEquals((String)"Wrong number of prepares!", (int)2, (int)newBackupCounter.numberPrepares.get());
        AssertJUnit.assertEquals((String)"Wrong number of commits!", (int)2, (int)newBackupCounter.numberCommits.get());
        AssertJUnit.assertEquals((String)"Wrong number of rollbacks!", (int)0, (int)newBackupCounter.numberRollbacks.get());
        AssertJUnit.assertEquals((String)"Wrong number of prepares!", (int)2, (int)oldBackup2Counter.numberPrepares.get());
        AssertJUnit.assertEquals((String)"Wrong number of commits!", (int)1, (int)oldBackup2Counter.numberCommits.get());
        AssertJUnit.assertEquals((String)"Wrong number of rollbacks!", (int)0, (int)oldBackup2Counter.numberRollbacks.get());
        AssertJUnit.assertEquals((String)"Wrong number of prepares!", (int)0, (int)primaryCounter.numberPrepares.get());
        AssertJUnit.assertEquals((String)"Wrong number of commits!", (int)0, (int)primaryCounter.numberCommits.get());
        AssertJUnit.assertEquals((String)"Wrong number of rollbacks!", (int)0, (int)primaryCounter.numberRollbacks.get());
        this.checkKeyInDataContainer(key);
    }

    @Override
    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder builder = TxReplay2Test.getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, true);
        builder.transaction().useSynchronization(false).transactionManagerLookup((TransactionManagerLookup)new EmbeddedTransactionManagerLookup()).recovery().disable();
        builder.clustering().hash().numOwners(3).numSegments(1).consistentHashFactory((ConsistentHashFactory)this.consistentHashFactory).stateTransfer().fetchInMemoryState(true);
        builder.locking().isolationLevel(IsolationLevel.READ_COMMITTED);
        this.createClusteredCaches(4, builder);
    }

    private void checkKeyInDataContainer(Object key) {
        for (Cache cache : this.caches()) {
            DataContainer container = cache.getAdvancedCache().getDataContainer();
            InternalCacheEntry entry = container.get(key);
            AssertJUnit.assertNotNull((String)("Cache '" + this.address(cache) + "' does not contain key!"), (Object)entry);
            AssertJUnit.assertEquals((String)("Cache '" + this.address(cache) + "' has wrong value!"), (Object)VALUE, (Object)entry.getValue());
        }
    }

    private void checkIfTransactionExists(Cache<Object, Object> cache) {
        TransactionTable table = TestingUtil.extractComponent(cache, TransactionTable.class);
        AssertJUnit.assertFalse((String)"Expected a remote transaction.", (boolean)table.getRemoteTransactions().isEmpty());
    }

    static class CountingInterceptor
    extends DDAsyncInterceptor {
        private static final Log log = LogFactory.getLog(CountingInterceptor.class);
        private final AtomicInteger numberPrepares = new AtomicInteger(0);
        private final AtomicInteger numberCommits = new AtomicInteger(0);
        private final AtomicInteger numberRollbacks = new AtomicInteger(0);

        CountingInterceptor() {
        }

        public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable {
            if (!ctx.isOriginLocal()) {
                log.debugf("Received remote prepare for transaction %s", (Object)command.getGlobalTransaction());
                this.numberPrepares.incrementAndGet();
            }
            return this.invokeNext((InvocationContext)ctx, (VisitableCommand)command);
        }

        public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable {
            if (!ctx.isOriginLocal()) {
                log.debugf("Received remote commit for transaction %s", (Object)command.getGlobalTransaction());
                this.numberCommits.incrementAndGet();
            }
            return this.invokeNext((InvocationContext)ctx, (VisitableCommand)command);
        }

        public Object visitRollbackCommand(TxInvocationContext ctx, RollbackCommand command) throws Throwable {
            if (!ctx.isOriginLocal()) {
                log.debugf("Received remote rollback for transaction %s", (Object)command.getGlobalTransaction());
                this.numberRollbacks.incrementAndGet();
            }
            return this.invokeNext((InvocationContext)ctx, (VisitableCommand)command);
        }

        public static CountingInterceptor inject(Cache cache) {
            AsyncInterceptorChain chain = cache.getAdvancedCache().getAsyncInterceptorChain();
            if (chain.containsInterceptorType(CountingInterceptor.class)) {
                return (CountingInterceptor)chain.findInterceptorWithClass(CountingInterceptor.class);
            }
            CountingInterceptor interceptor = new CountingInterceptor();
            chain.addInterceptorBefore((AsyncInterceptor)interceptor, CallInterceptor.class);
            return interceptor;
        }
    }
}

