/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.statetransfer;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.tx.lookup.TransactionManagerLookup;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.distribution.MagicKey;
import org.infinispan.distribution.ch.ConsistentHashFactory;
import org.infinispan.remoting.inboundhandler.AbstractDelegatingHandler;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.inboundhandler.PerCacheInboundInvocationHandler;
import org.infinispan.remoting.inboundhandler.Reply;
import org.infinispan.remoting.responses.UnsureResponse;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ResponseCollector;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestDataSCI;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.concurrent.StateSequencer;
import org.infinispan.transaction.lookup.EmbeddedTransactionManagerLookup;
import org.infinispan.util.AbstractDelegatingRpcManager;
import org.infinispan.util.ControlledConsistentHashFactory;
import org.infinispan.util.concurrent.IsolationLevel;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="statetransfer.TxReplay3Test")
public class TxReplay3Test
extends MultipleCacheManagersTest {
    private static final Log log = LogFactory.getLog(TxReplay3Test.class);
    private static final String VALUE_1 = "v1";
    private static final String VALUE_2 = "v2";
    private static final String TX1_LOCKED = "tx1:acquired_lock";
    private static final String TX1_UNSURE = "tx1:unsure_response";
    private static final String TX2_PENDING = "tx2:waiting_tx1";
    private static final String MAIN_ADVANCE = "main:advance";
    private static final String JOIN_NEW_NODE = "join:add_new_node";

    public void testReplay() throws Exception {
        MagicKey key = new MagicKey("TxReplay3Test", this.cache(0));
        StateSequencer sequencer = new StateSequencer();
        sequencer.logicalThread("tx1", TX1_LOCKED, TX1_UNSURE);
        sequencer.logicalThread("tx2", TX2_PENDING, new String[0]);
        sequencer.logicalThread("join", JOIN_NEW_NODE, new String[0]);
        sequencer.logicalThread("main", MAIN_ADVANCE, new String[0]);
        sequencer.order(TX1_LOCKED, MAIN_ADVANCE, TX2_PENDING, JOIN_NEW_NODE, TX1_UNSURE);
        TestingUtil.wrapComponent(this.cache(1), RpcManager.class, (wrapOn, current) -> new UnsureResponseRpcManager((RpcManager)current, sequencer), true);
        Handler handler = TestingUtil.wrapInboundInvocationHandler(this.cache(0), current -> new Handler((PerCacheInboundInvocationHandler)current, sequencer));
        handler.setOrigin(this.address(this.cache(2)));
        Future<Void> tx1 = this.fork(() -> {
            this.cache(1).put(key, (Object)VALUE_1);
            return null;
        });
        sequencer.advance(MAIN_ADVANCE);
        Future<Void> tx2 = this.fork(() -> {
            this.cache(2).put(key, (Object)VALUE_2);
            return null;
        });
        sequencer.enter(JOIN_NEW_NODE);
        this.addClusterEnabledCacheManager(TxReplay3Test.config()).getCache();
        this.waitForClusterToForm();
        sequencer.exit(JOIN_NEW_NODE);
        tx1.get(30L, TimeUnit.SECONDS);
        tx2.get(30L, TimeUnit.SECONDS);
        AssertJUnit.assertEquals((Object)VALUE_2, (Object)this.cache(0).get((Object)key));
    }

    @Override
    protected void createCacheManagers() throws Throwable {
        this.createClusteredCaches(3, TestDataSCI.INSTANCE, TxReplay3Test.config());
    }

    private static ConfigurationBuilder config() {
        ConfigurationBuilder builder = TxReplay3Test.getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, true);
        builder.transaction().useSynchronization(false).transactionManagerLookup((TransactionManagerLookup)new EmbeddedTransactionManagerLookup()).recovery().disable();
        builder.locking().lockAcquisitionTimeout(1L, TimeUnit.MINUTES).isolationLevel(IsolationLevel.READ_COMMITTED);
        builder.clustering().remoteTimeout(1L, TimeUnit.MINUTES).hash().numOwners(1).numSegments(1).consistentHashFactory((ConsistentHashFactory)new ControlledConsistentHashFactory.Default(0, new int[0])).stateTransfer().fetchInMemoryState(false);
        return builder;
    }

    private static class Handler
    extends AbstractDelegatingHandler {
        private final StateSequencer sequencer;
        private volatile boolean triggered = false;
        private volatile Address origin;

        public Handler(PerCacheInboundInvocationHandler delegate, StateSequencer sequencer) {
            super(delegate);
            this.sequencer = sequencer;
        }

        public void setOrigin(Address origin) {
            this.origin = origin;
        }

        protected boolean beforeHandle(CacheRpcCommand command, Reply reply, DeliverOrder order) {
            log.debugf("Before invoking %s. expected origin=%s", (Object)command, (Object)this.origin);
            return super.beforeHandle(command, reply, order);
        }

        protected void afterHandle(CacheRpcCommand command, DeliverOrder order, boolean delegated) {
            super.afterHandle(command, order, delegated);
            log.debugf("After invoking %s. expected origin=%s", (Object)command, (Object)this.origin);
            if (!this.triggered && command instanceof PrepareCommand && command.getOrigin().equals(this.origin)) {
                log.debugf("Triggering %s.", (Object)TxReplay3Test.TX2_PENDING);
                this.triggered = true;
                try {
                    this.sequencer.advance(TxReplay3Test.TX2_PENDING);
                }
                catch (InterruptedException | TimeoutException e) {
                    throw new CacheException((Throwable)e);
                }
            }
        }
    }

    private static class UnsureResponseRpcManager
    extends AbstractDelegatingRpcManager {
        private final StateSequencer sequencer;
        private volatile boolean triggered = false;

        public UnsureResponseRpcManager(RpcManager realOne, StateSequencer sequencer) {
            super(realOne);
            this.sequencer = sequencer;
        }

        @Override
        protected <T> CompletionStage<T> performRequest(Collection<Address> targets, ReplicableCommand command, ResponseCollector<T> collector, Function<ResponseCollector<T>, CompletionStage<T>> invoker, RpcOptions rpcOptions) {
            return super.performRequest(targets, command, collector, invoker, rpcOptions).thenApply(result -> {
                log.debugf("After invoke remotely %s. Responses=%s", (Object)command, result);
                if (this.triggered || !(command instanceof PrepareCommand)) {
                    return result;
                }
                log.debugf("Triggering %s and %s", (Object)TxReplay3Test.TX1_LOCKED, (Object)TxReplay3Test.TX1_UNSURE);
                this.triggered = true;
                try {
                    this.sequencer.advance(TxReplay3Test.TX1_LOCKED);
                    this.sequencer.advance(TxReplay3Test.TX1_UNSURE);
                }
                catch (InterruptedException | TimeoutException e) {
                    throw new CacheException((Throwable)e);
                }
                HashMap newResult = new HashMap();
                ((Map)result).forEach((address, response) -> newResult.put(address, UnsureResponse.INSTANCE));
                log.debugf("After invoke remotely %s. New Responses=%s", (Object)command, newResult);
                return newResult;
            });
        }
    }
}

