/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.scattered.statetransfer;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.infinispan.Cache;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commands.statetransfer.StateResponseCommand;
import org.infinispan.configuration.cache.BiasAcquisition;
import org.infinispan.distribution.MagicKey;
import org.infinispan.factories.impl.BasicComponentRegistry;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.inboundhandler.AbstractDelegatingHandler;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.inboundhandler.PerCacheInboundInvocationHandler;
import org.infinispan.remoting.inboundhandler.Reply;
import org.infinispan.scattered.statetransfer.AbstractStateTransferTest;
import org.infinispan.test.TestDataSCI;
import org.infinispan.test.TestingUtil;
import org.infinispan.topology.CacheTopology;
import org.infinispan.util.BlockingLocalTopologyManager;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="scattered.statetransfer.PushTransferTest")
public class PushTransferTest
extends AbstractStateTransferTest {
    @Override
    public Object[] factory() {
        return new Object[]{new PushTransferTest().biasAcquisition(BiasAcquisition.NEVER), new PushTransferTest().biasAcquisition(BiasAcquisition.ON_WRITE)};
    }

    public void testNodeJoin() throws Exception {
        List<MagicKey> keys = this.init();
        final EmbeddedCacheManager cm4 = this.addClusterEnabledCacheManager(TestDataSCI.INSTANCE, null, TRANSPORT_FLAGS);
        cm4.defineConfiguration("scattered", this.defaultConfig.build());
        int startTopologyId = this.c1.getAdvancedCache().getDistributionManager().getCacheTopology().getTopologyId();
        BlockingLocalTopologyManager bltm = BlockingLocalTopologyManager.replaceTopologyManager(cm4, "scattered");
        final CountDownLatch statePushedLatch = new CountDownLatch(1);
        final CountDownLatch stateAppliedLatch = new CountDownLatch(1);
        TestingUtil.addCacheStartingHook(cm4, (name, cr) -> {
            final PerCacheInboundInvocationHandler originalHandler = (PerCacheInboundInvocationHandler)cr.getComponent(PerCacheInboundInvocationHandler.class);
            AbstractDelegatingHandler newHandler = new AbstractDelegatingHandler(originalHandler){

                public void handle(CacheRpcCommand command, Reply reply, DeliverOrder order) {
                    if (command instanceof StateResponseCommand) {
                        log.tracef("State received on %s", (Object)cm4.getAddress());
                        statePushedLatch.countDown();
                    }
                    originalHandler.handle(command, response -> {
                        log.tracef("State applied on %s", (Object)cm4.getAddress());
                        stateAppliedLatch.countDown();
                        reply.reply(response);
                    }, order);
                }
            };
            BasicComponentRegistry bcr = (BasicComponentRegistry)cr.getComponent(BasicComponentRegistry.class);
            bcr.replaceComponent(PerCacheInboundInvocationHandler.class.getName(), (Object)newHandler, false);
            cr.rewire();
            cr.cacheComponents();
        });
        Future<Cache> c4Future = this.fork(() -> cm4.getCache("scattered"));
        AssertJUnit.assertTrue((boolean)statePushedLatch.await(10L, TimeUnit.SECONDS));
        AssertJUnit.assertFalse((boolean)stateAppliedLatch.await(100L, TimeUnit.MILLISECONDS));
        bltm.confirmTopologyUpdate(CacheTopology.Phase.TRANSITORY);
        AssertJUnit.assertEquals((long)0L, (long)stateAppliedLatch.getCount());
        bltm.confirmTopologyUpdate(CacheTopology.Phase.NO_REBALANCE);
        Cache c4 = c4Future.get(30L, TimeUnit.SECONDS);
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c1, this.c2, this.c3, c4);
        TestingUtil.waitForNoRebalance(this.c1, this.c2, this.c3, c4);
        for (MagicKey key : keys) {
            int copies = Stream.of(this.c1, this.c2, this.c3, c4).mapToInt(c -> c.getAdvancedCache().getDataContainer().containsKey((Object)key) ? 1 : 0).sum();
            AssertJUnit.assertEquals((String)("Key " + key + " has incorrect number of copies"), (int)2, (int)copies);
        }
    }
}

