/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.conflict.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commands.remote.ClusteredGetCommand;
import org.infinispan.commands.statetransfer.StateResponseCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.test.Exceptions;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.conflict.ConflictManager;
import org.infinispan.conflict.ConflictManagerFactory;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheValue;
import org.infinispan.container.entries.NullCacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.distribution.MagicKey;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.DataRehashed;
import org.infinispan.notifications.cachelistener.event.DataRehashedEvent;
import org.infinispan.partitionhandling.BasePartitionHandlingTest;
import org.infinispan.partitionhandling.PartitionHandling;
import org.infinispan.remoting.inboundhandler.AbstractDelegatingHandler;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.inboundhandler.PerCacheInboundInvocationHandler;
import org.infinispan.remoting.inboundhandler.Reply;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateChunk;
import org.infinispan.test.TestingUtil;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="conflict.resolution.ConflictManagerTest")
public class ConflictManagerTest
extends BasePartitionHandlingTest {
    private static final String CACHE_NAME = "conflict-cache";
    private static final int NUMBER_OF_OWNERS = 2;
    private static final int NUMBER_OF_CACHE_ENTRIES = 100;
    private static final int INCONSISTENT_VALUE_INCREMENT = 10;
    private static final int NULL_VALUE_FREQUENCY = 20;

    public ConflictManagerTest() {
        this.cacheMode = CacheMode.DIST_SYNC;
        this.partitionHandling = PartitionHandling.ALLOW_READ_WRITES;
    }

    @Override
    protected void createCacheManagers() throws Throwable {
        super.createCacheManagers();
        ConfigurationBuilder builder = ConflictManagerTest.getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC);
        builder.clustering().partitionHandling().whenSplit(this.partitionHandling).mergePolicy(null).stateTransfer().fetchInMemoryState(true);
        this.defineConfigurationOnAllManagers(CACHE_NAME, builder);
    }

    public void testGetAllVersionsDuringStateTransfer() throws Exception {
        boolean key = true;
        boolean value = true;
        this.createCluster();
        this.getCache(2).put((Object)1, (Object)1);
        this.splitCluster();
        RehashListener listener = new RehashListener();
        this.getCache(0).addListener((Object)listener);
        CountDownLatch latch = new CountDownLatch(1);
        this.delayStateTransferCompletion(latch);
        Future<Void> mergeFuture = this.fork(() -> this.partition(0).merge(this.partition(1)));
        AssertJUnit.assertTrue((boolean)listener.latch.await(10L, TimeUnit.SECONDS));
        Future<Map> versionFuture = this.fork(() -> this.getAllVersions(0, 1));
        TestingUtil.assertNotDone(versionFuture);
        latch.countDown();
        mergeFuture.get(30L, TimeUnit.SECONDS);
        Map versionMap = versionFuture.get(60L, TimeUnit.SECONDS);
        AssertJUnit.assertTrue((versionMap != null ? 1 : 0) != 0);
        AssertJUnit.assertTrue((!versionMap.isEmpty() ? 1 : 0) != 0);
        AssertJUnit.assertEquals((String)String.format("Returned versionMap %s", versionMap), (int)2, (int)versionMap.size());
    }

    public void testGetAllVersionsTimeout() throws Throwable {
        ConfigurationBuilder builder = ConflictManagerTest.getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC);
        builder.clustering().remoteTimeout(5000L).stateTransfer().fetchInMemoryState(true);
        String cacheName = "conflict-cache2";
        this.defineConfigurationOnAllManagers(cacheName, builder);
        this.waitForClusterToForm(cacheName);
        this.dropClusteredGetCommands();
        Exceptions.expectException(CacheException.class, (String)".* encountered when attempting '.*.' on cache '.*.'", () -> this.getAllVersions(0, "Test"));
    }

    public void testGetConflictsDuringStateTransfer() throws Throwable {
        this.createCluster();
        this.splitCluster();
        RehashListener listener = new RehashListener();
        this.getCache(0).addListener((Object)listener);
        CountDownLatch latch = new CountDownLatch(1);
        this.delayStateTransferCompletion(latch);
        this.fork(() -> this.partition(0).merge(this.partition(1), false));
        listener.latch.await();
        Exceptions.expectException(IllegalStateException.class, (String)".* Unable to retrieve conflicts as StateTransfer is currently in progress for cache .*", () -> this.getConflicts(0));
        latch.countDown();
    }

    public void testAllVersionsOfKeyReturned() {
        this.waitForClusterToForm(CACHE_NAME);
        IntStream.range(0, 100).forEach(i -> this.getCache(0).put((Object)i, (Object)("v" + i)));
        this.compareCacheValuesForKey(10, true);
        this.introduceCacheConflicts();
        this.compareCacheValuesForKey(10, false);
        this.compareCacheValuesForKey(20, false);
    }

    public void testConsecutiveInvocationOfAllVersionsForKey() throws Exception {
        this.waitForClusterToForm(CACHE_NAME);
        int key = 1;
        Map<Address, InternalCacheValue<Object>> result1 = this.getAllVersions(0, key);
        Map<Address, InternalCacheValue<Object>> result2 = this.getAllVersions(0, key);
        AssertJUnit.assertNotSame(result1, result2);
        AssertJUnit.assertEquals(result1, result2);
    }

    public void testConflictsDetected() {
        this.waitForClusterToForm(CACHE_NAME);
        IntStream.range(0, 100).forEach(i -> this.getCache(0).put((Object)i, (Object)("v" + i)));
        int cacheIndex = this.numMembersInCluster - 1;
        AssertJUnit.assertEquals((long)0L, (long)this.getConflicts(cacheIndex).count());
        this.introduceCacheConflicts();
        List conflicts = this.getConflicts(cacheIndex).collect(Collectors.toList());
        AssertJUnit.assertEquals((int)10, (int)conflicts.size());
        for (Map map : conflicts) {
            AssertJUnit.assertEquals((int)2, (int)map.keySet().size());
            Collection mapValues = map.values();
            int key = mapValues.stream().filter(e -> !(e instanceof NullCacheEntry)).mapToInt(e -> (Integer)e.getKey()).findAny().orElse(-1);
            AssertJUnit.assertTrue((key > -1 ? 1 : 0) != 0);
            if (key % 20 == 0) {
                AssertJUnit.assertTrue((boolean)map.values().stream().anyMatch(NullCacheEntry.class::isInstance));
                continue;
            }
            List icvs = map.values().stream().map(CacheEntry::getValue).distinct().collect(Collectors.toList());
            AssertJUnit.assertEquals((int)2, (int)icvs.size());
            AssertJUnit.assertTrue((String)"Expected one of the conflicting string values to be 'INCONSISTENT'", (boolean)icvs.contains("INCONSISTENT"));
        }
    }

    public void testConflictsResolvedWithProvidedMergePolicy() {
        this.createCluster();
        AdvancedCache<Object, Object> cache = this.getCache(0);
        ConflictManager cm = ConflictManagerFactory.get(cache);
        MagicKey key = new MagicKey(this.cache(0), this.cache(1));
        cache.put((Object)key, (Object)1);
        cache.withFlags(Flag.CACHE_MODE_LOCAL).put((Object)key, (Object)2);
        AssertJUnit.assertEquals((long)1L, (long)this.getConflicts(0).count());
        cm.resolveConflicts((preferredEntry, otherEntries) -> preferredEntry);
        AssertJUnit.assertEquals((long)0L, (long)this.getConflicts(0).count());
    }

    public void testCacheOperationOnConflictStream() {
        this.createCluster();
        AdvancedCache<Object, Object> cache = this.getCache(0);
        ConflictManager cm = ConflictManagerFactory.get(cache);
        MagicKey key = new MagicKey(this.cache(0), this.cache(1));
        cache.put((Object)key, (Object)1);
        cache.withFlags(Flag.CACHE_MODE_LOCAL).put((Object)key, (Object)2);
        cm.getConflicts().forEach(map -> {
            CacheEntry entry = (CacheEntry)map.values().iterator().next();
            Object conflictKey = entry.getKey();
            cache.remove(conflictKey);
        });
        AssertJUnit.assertTrue((boolean)cache.isEmpty());
    }

    public void testNoEntryMergePolicyConfigured() {
        Exceptions.expectException(CacheException.class, () -> ConflictManagerFactory.get(this.getCache(0)).resolveConflicts());
    }

    private void introduceCacheConflicts() {
        LocalizedCacheTopology topology = this.getCache(0).getDistributionManager().getCacheTopology();
        for (int i = 0; i < 100; i += 10) {
            Address primary = topology.getDistribution((Object)i).primary();
            AdvancedCache primaryCache = this.manager(primary).getCache(CACHE_NAME).getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL);
            if (i % 20 == 0) {
                primaryCache.remove((Object)i);
                continue;
            }
            primaryCache.put((Object)i, (Object)"INCONSISTENT");
        }
    }

    private void compareCacheValuesForKey(int key, boolean expectEquality) {
        ArrayList<Map<Address, InternalCacheValue<Object>>> cacheVersions = new ArrayList<Map<Address, InternalCacheValue<Object>>>();
        for (int i = 0; i < this.numMembersInCluster; ++i) {
            cacheVersions.add(this.getAllVersions(i, key));
        }
        boolean allowNullValues = key % 20 == 0;
        int expectedValues = allowNullValues ? 1 : 2;
        for (Map map : cacheVersions) {
            AssertJUnit.assertEquals((String)map.toString(), (int)2, (int)map.keySet().size());
            if (!allowNullValues) {
                AssertJUnit.assertTrue((String)"Version map contains null entries.", (!map.values().contains(null) ? 1 : 0) != 0);
            }
            List values = map.values().stream().filter(Objects::nonNull).map(InternalCacheValue::getValue).collect(Collectors.toList());
            AssertJUnit.assertEquals((String)values.toString(), (int)expectedValues, (int)values.size());
            if (expectEquality) {
                AssertJUnit.assertTrue((String)"Inconsistent values returned, they should be the same", (boolean)values.stream().allMatch(v -> v.equals(values.get(0))));
                continue;
            }
            AssertJUnit.assertTrue((String)"Expected inconsistent values, but all values were equal", (map.values().stream().distinct().count() > 1L ? 1 : 0) != 0);
        }
    }

    private void createCluster() {
        this.waitForClusterToForm(CACHE_NAME);
        List members = this.getCache(0).getRpcManager().getMembers();
        TestingUtil.waitForNoRebalance(this.caches());
        AssertJUnit.assertTrue((members.size() == 4 ? 1 : 0) != 0);
    }

    private void splitCluster() {
        this.splitCluster({0, 1}, {2, 3});
        TestingUtil.blockUntilViewsChanged(10000L, 2, new Cache[]{this.getCache(0), this.getCache(1), this.getCache(2), this.getCache(3)});
        TestingUtil.waitForNoRebalance(new Cache[]{this.getCache(0), this.getCache(1)});
        TestingUtil.waitForNoRebalance(new Cache[]{this.getCache(2), this.getCache(3)});
    }

    private AdvancedCache<Object, Object> getCache(int index) {
        return this.advancedCache(index, CACHE_NAME);
    }

    private Stream<Map<Address, CacheEntry<Object, Object>>> getConflicts(int index) {
        return ConflictManagerFactory.get(this.getCache(index)).getConflicts();
    }

    private Map<Address, InternalCacheValue<Object>> getAllVersions(int index, Object key) {
        return ConflictManagerFactory.get(this.getCache(index)).getAllVersions(key);
    }

    private void dropClusteredGetCommands() {
        IntStream.range(0, this.numMembersInCluster).forEach(i -> TestingUtil.wrapInboundInvocationHandler(this.getCache(i), x$0 -> new DropClusteredGetCommandHandler((PerCacheInboundInvocationHandler)x$0)));
    }

    private void delayStateTransferCompletion(CountDownLatch latch) {
        IntStream.range(0, this.numMembersInCluster).forEach(i -> TestingUtil.wrapInboundInvocationHandler(this.getCache(i), delegate -> new DelayStateResponseCommandHandler(latch, (PerCacheInboundInvocationHandler)delegate)));
    }

    @Listener
    private class RehashListener {
        final CountDownLatch latch = new CountDownLatch(1);

        private RehashListener() {
        }

        @DataRehashed
        public void onDataRehashed(DataRehashedEvent event) {
            if (event.isPre()) {
                this.latch.countDown();
            }
        }
    }

    private class DropClusteredGetCommandHandler
    extends AbstractDelegatingHandler {
        DropClusteredGetCommandHandler(PerCacheInboundInvocationHandler delegate) {
            super(delegate);
        }

        public void handle(CacheRpcCommand command, Reply reply, DeliverOrder order) {
            if (!(command instanceof ClusteredGetCommand)) {
                this.delegate.handle(command, reply, order);
            }
        }
    }

    private class DelayStateResponseCommandHandler
    extends AbstractDelegatingHandler {
        final CountDownLatch latch;

        DelayStateResponseCommandHandler(CountDownLatch latch, PerCacheInboundInvocationHandler delegate) {
            super(delegate);
            this.latch = latch;
        }

        public void handle(CacheRpcCommand command, Reply reply, DeliverOrder order) {
            StateResponseCommand stc;
            boolean isLastChunk;
            if (command instanceof StateResponseCommand && (isLastChunk = (stc = (StateResponseCommand)command).getStateChunks().stream().anyMatch(StateChunk::isLastChunk))) {
                try {
                    this.latch.await(60L, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
            this.delegate.handle(command, reply, order);
        }
    }
}

