/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.conflict.impl;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commands.statetransfer.StateResponseCommand;
import org.infinispan.commands.topology.TopologyUpdateCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.conflict.impl.BaseMergePolicyTest;
import org.infinispan.distribution.MagicKey;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.inboundhandler.AbstractDelegatingHandler;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.inboundhandler.InboundInvocationHandler;
import org.infinispan.remoting.inboundhandler.PerCacheInboundInvocationHandler;
import org.infinispan.remoting.inboundhandler.Reply;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestInternalCacheEntryFactory;
import org.infinispan.topology.CacheTopology;
import org.infinispan.xsite.XSiteReplicateCommand;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="conflict.impl.OperationsDuringMergeConflictTest")
public class OperationsDuringMergeConflictTest
extends BaseMergePolicyTest {
    private static final String PARTITION_0_VAL = "A";
    private static final String PARTITION_1_VAL = "B";
    private static final String MERGE_RESULT = "C";
    private static final String PUT_RESULT = "D";
    private MergeAction mergeAction;

    @Override
    public Object[] factory() {
        return new Object[]{new OperationsDuringMergeConflictTest(MergeAction.NONE), new OperationsDuringMergeConflictTest(MergeAction.PUT), new OperationsDuringMergeConflictTest(MergeAction.REMOVE)};
    }

    public OperationsDuringMergeConflictTest() {
    }

    public OperationsDuringMergeConflictTest(MergeAction mergeAction) {
        super(CacheMode.DIST_SYNC, null, new int[]{0, 1}, new int[]{2, 3});
        this.mergePolicy = (preferredEntry, otherEntries) -> TestInternalCacheEntryFactory.create(this.conflictKey, MERGE_RESULT);
        this.description = mergeAction.toString();
        this.mergeAction = mergeAction;
        this.valueAfterMerge = mergeAction.value;
    }

    @Override
    protected void beforeSplit() {
        this.conflictKey = new MagicKey(this.cache(this.p0.node(0)), this.cache(this.p1.node(0)));
    }

    @Override
    protected void duringSplit(AdvancedCache preferredPartitionCache, AdvancedCache otherCache) {
        this.cache(this.p0.node(0)).put((Object)this.conflictKey, (Object)PARTITION_0_VAL);
        this.cache(this.p1.node(0)).put((Object)this.conflictKey, (Object)PARTITION_1_VAL);
        this.assertCacheGet(this.conflictKey, PARTITION_0_VAL, this.p0.getNodes());
        this.assertCacheGet(this.conflictKey, PARTITION_1_VAL, this.p1.getNodes());
    }

    @Override
    protected void performMerge() {
        boolean modifyDuringMerge = this.mergeAction != MergeAction.NONE;
        CountDownLatch conflictLatch = new CountDownLatch(1);
        CountDownLatch stateTransferLatch = new CountDownLatch(1);
        try {
            IntStream.range(0, this.numMembersInCluster).forEach(i -> {
                TestingUtil.wrapInboundInvocationHandler(this.cache(i), handler -> new BlockStateResponseCommandHandler((PerCacheInboundInvocationHandler)handler, conflictLatch));
                EmbeddedCacheManager manager = this.manager(i);
                InboundInvocationHandler handler2 = TestingUtil.extractGlobalComponent((CacheContainer)manager, InboundInvocationHandler.class);
                BlockingInboundInvocationHandler ourHandler = new BlockingInboundInvocationHandler(handler2, stateTransferLatch);
                TestingUtil.replaceComponent((CacheContainer)manager, InboundInvocationHandler.class, ourHandler, true);
            });
            this.assertCacheGet(this.conflictKey, PARTITION_0_VAL, this.p0.getNodes());
            this.assertCacheGet(this.conflictKey, PARTITION_1_VAL, this.p1.getNodes());
            this.partition(0).merge(this.partition(1), false);
            this.assertCacheGet(this.conflictKey, PARTITION_0_VAL, this.p0.getNodes());
            this.assertCacheGet(this.conflictKey, PARTITION_1_VAL, this.p1.getNodes());
            if (modifyDuringMerge) {
                List<Address> allMembers = this.caches().stream().map(cache -> cache.getCacheManager().getAddress()).collect(Collectors.toList());
                TestingUtil.waitForTopologyPhase(allMembers, CacheTopology.Phase.CONFLICT_RESOLUTION, this.caches().toArray(new Cache[this.numMembersInCluster]));
                if (this.mergeAction == MergeAction.PUT) {
                    this.cache(0).put((Object)this.conflictKey, (Object)this.mergeAction.value);
                } else {
                    this.cache(0).remove((Object)this.conflictKey);
                }
            }
            conflictLatch.countDown();
            stateTransferLatch.countDown();
            TestingUtil.waitForNoRebalance(this.caches());
            this.assertCacheGetValAllCaches(this.mergeAction);
        }
        catch (Throwable t) {
            conflictLatch.countDown();
            stateTransferLatch.countDown();
            throw t;
        }
    }

    private void assertCacheGetValAllCaches(MergeAction action) {
        this.assertCacheGet(this.conflictKey, action.value, this.cacheIndexes());
    }

    private void awaitLatch(CountDownLatch latch) {
        try {
            if (!latch.await(120L, TimeUnit.SECONDS)) {
                AssertJUnit.fail((String)"CountDownLatch await timedout");
            }
        }
        catch (InterruptedException ignore) {
            AssertJUnit.fail((String)"CountDownLatch Interrupted");
        }
    }

    private class BlockStateResponseCommandHandler
    extends AbstractDelegatingHandler {
        final CountDownLatch latch;

        BlockStateResponseCommandHandler(PerCacheInboundInvocationHandler delegate, CountDownLatch latch) {
            super(delegate);
            this.latch = latch;
        }

        public void handle(CacheRpcCommand command, Reply reply, DeliverOrder order) {
            if (command instanceof StateResponseCommand) {
                OperationsDuringMergeConflictTest.this.awaitLatch(this.latch);
            }
            this.delegate.handle(command, reply, order);
        }
    }

    private class BlockingInboundInvocationHandler
    implements InboundInvocationHandler {
        final InboundInvocationHandler delegate;
        final CountDownLatch latch;

        BlockingInboundInvocationHandler(InboundInvocationHandler delegate, CountDownLatch latch) {
            this.delegate = delegate;
            this.latch = latch;
        }

        public void handleFromCluster(Address origin, ReplicableCommand command, Reply reply, DeliverOrder order) {
            if (command instanceof TopologyUpdateCommand && ((TopologyUpdateCommand)command).getPhase() == CacheTopology.Phase.READ_OLD_WRITE_ALL) {
                OperationsDuringMergeConflictTest.this.awaitLatch(this.latch);
            }
            this.delegate.handleFromCluster(origin, command, reply, order);
        }

        public void handleFromRemoteSite(String origin, XSiteReplicateCommand command, Reply reply, DeliverOrder order) {
            this.delegate.handleFromRemoteSite(origin, command, reply, order);
        }
    }

    private static enum MergeAction {
        PUT("D"),
        REMOVE(null),
        NONE("C");

        String value;

        private MergeAction(String value) {
            this.value = value;
        }
    }
}

