/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.stream.stress;

import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.commands.StressTest;
import org.infinispan.commons.executors.BlockingThreadPoolExecutorFactory;
import org.infinispan.commons.executors.ThreadPoolExecutorFactory;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.container.entries.ImmortalCacheEntry;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.InCacheMode;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.test.fwk.TransportFlags;
import org.infinispan.util.function.SerializablePredicate;
import org.infinispan.util.function.SerializableSupplier;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups={"stress"}, testName="stream.stress.DistributedStreamRehashStressTest", timeOut=900000L)
@InCacheMode(value={CacheMode.DIST_SYNC, CacheMode.REPL_SYNC})
public class DistributedStreamRehashStressTest
extends StressTest {
    protected final String CACHE_NAME = "testCache";
    protected static final int CACHE_COUNT = 5;
    protected static final int THREAD_MULTIPLIER = 5;
    protected static final long CACHE_ENTRY_COUNT = 250000L;

    @Override
    protected void createCacheManagers() throws Throwable {
        this.builderUsed = new ConfigurationBuilder();
        this.builderUsed.clustering().cacheMode(this.cacheMode);
        this.builderUsed.clustering().hash().numOwners(3);
        this.builderUsed.clustering().stateTransfer().chunkSize(25000);
        this.builderUsed.clustering().remoteTimeout(12000000L);
        this.builderUsed.clustering().stateTransfer().timeout(240L, TimeUnit.SECONDS);
        this.createClusteredCaches(5, "testCache", this.builderUsed);
    }

    @Override
    protected EmbeddedCacheManager addClusterEnabledCacheManager(TransportFlags flags) {
        GlobalConfigurationBuilder gcb = GlobalConfigurationBuilder.defaultClusteredBuilder();
        TestCacheManagerFactory.amendGlobalConfiguration(gcb, flags);
        BlockingThreadPoolExecutorFactory executorFactory = new BlockingThreadPoolExecutorFactory(25, 25, 10000, 30000L);
        gcb.transport().transportThreadPool().threadPoolFactory((ThreadPoolExecutorFactory)executorFactory);
        gcb.transport().remoteCommandThreadPool().threadPoolFactory((ThreadPoolExecutorFactory)executorFactory);
        EmbeddedCacheManager cm = TestCacheManagerFactory.newDefaultCacheManager(true, gcb, new ConfigurationBuilder());
        this.cacheManagers.add(cm);
        return cm;
    }

    public void testStressNodesLeavingWhileMultipleCollectors() throws Throwable {
        this.testStressNodesLeavingWhilePerformingCallable((cache, masterValues, iteration) -> {
            SerializablePredicate & Serializable predicate = (SerializablePredicate & Serializable)e -> ((Integer)e.getKey() & 1) == 1;
            Map results = (Map)cache.entrySet().stream().filter((SerializablePredicate)predicate).collect((SerializableSupplier & Serializable)() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
            Map<Integer, Integer> filteredMasterValues = masterValues.entrySet().stream().filter(predicate).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
            KeyPartitioner keyPartitioner = TestingUtil.extractComponent(cache, KeyPartitioner.class);
            this.findMismatchedSegments(keyPartitioner, filteredMasterValues, results, iteration);
            Assert.assertEquals((long)125000L, (long)results.size());
        });
    }

    public void testStressNodesLeavingWhileMultipleCount() throws Throwable {
        this.testStressNodesLeavingWhilePerformingCallable((cache, masterValues, iteration) -> {
            long size = cache.entrySet().stream().count();
            Assert.assertEquals((long)250000L, (long)size, (String)("We didn't get a matching size! Expected 250000 but was " + size));
        });
    }

    public void testStressNodesLeavingWhileMultipleIterators() throws Throwable {
        this.testStressNodesLeavingWhilePerformingCallable((cache, masterValues, iteration) -> {
            HashMap<Integer, Integer> seenValues = new HashMap<Integer, Integer>();
            for (Map.Entry entry : cache.entrySet().stream().distributedBatchSize(50000)) {
                if (seenValues.containsKey(entry.getKey())) {
                    log.tracef("Seen values were: %s", seenValues);
                    throw new IllegalArgumentException(Thread.currentThread() + "-Found duplicate value: " + entry.getKey() + " on iteration " + iteration);
                }
                if (!((Integer)masterValues.get(entry.getKey())).equals(entry.getValue())) {
                    log.tracef("Seen values were: %s", seenValues);
                    throw new IllegalArgumentException(Thread.currentThread() + "-Found incorrect value: " + entry.getKey() + " with value " + entry.getValue() + " on iteration " + iteration);
                }
                seenValues.put((Integer)entry.getKey(), (Integer)entry.getValue());
            }
            if (seenValues.size() != masterValues.size()) {
                KeyPartitioner keyPartitioner = TestingUtil.extractComponent(cache, KeyPartitioner.class);
                this.findMismatchedSegments(keyPartitioner, masterValues, seenValues, iteration);
            }
        });
    }

    public void testStressNodesLeavingWhileMultipleIteratorsLocalSegments() throws Throwable {
        this.testStressNodesLeavingWhilePerformingCallable((cache, masterValues, iteration) -> {
            HashMap<Integer, Integer> seenValues = new HashMap<Integer, Integer>();
            KeyPartitioner keyPartitioner = TestingUtil.extractComponent(cache, KeyPartitioner.class);
            AdvancedCache advancedCache = cache.getAdvancedCache();
            LocalizedCacheTopology cacheTopology = advancedCache.getDistributionManager().getCacheTopology();
            Set targetSegments = cacheTopology.getWriteConsistentHash().getSegmentsForOwner(cacheTopology.getLocalAddress());
            masterValues = masterValues.entrySet().stream().filter(e -> targetSegments.contains(keyPartitioner.getSegment(e.getKey()))).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
            for (Map.Entry entry : cache.entrySet().stream().distributedBatchSize(50000).filterKeySegments(targetSegments)) {
                if (seenValues.containsKey(entry.getKey())) {
                    log.tracef("Seen values were: %s", seenValues);
                    throw new IllegalArgumentException(Thread.currentThread() + "-Found duplicate value: " + entry.getKey() + " on iteration " + iteration);
                }
                if (!masterValues.get(entry.getKey()).equals(entry.getValue())) {
                    log.tracef("Seen values were: %s", seenValues);
                    throw new IllegalArgumentException(Thread.currentThread() + "-Found incorrect value: " + entry.getKey() + " with value " + entry.getValue() + " on iteration " + iteration);
                }
                seenValues.put((Integer)entry.getKey(), (Integer)entry.getValue());
            }
            if (seenValues.size() != masterValues.size()) {
                this.findMismatchedSegments(keyPartitioner, masterValues, seenValues, iteration);
            }
        });
    }

    private void findMismatchedSegments(KeyPartitioner keyPartitioner, Map<Integer, Integer> masterValues, Map<Integer, Integer> seenValues, int iteration) {
        Map target = this.generateEntriesPerSegment(keyPartitioner, masterValues.entrySet());
        Map actual = this.generateEntriesPerSegment(keyPartitioner, seenValues.entrySet());
        for (Map.Entry entry : target.entrySet()) {
            Set entrySet = entry.getValue();
            Set actualEntries = actual.get(entry.getKey());
            if (actualEntries != null) {
                entrySet.removeAll(actualEntries);
            }
            if (entrySet.isEmpty()) continue;
            throw new IllegalArgumentException(Thread.currentThread() + "-Found incorrect amount " + (actualEntries != null ? actualEntries.size() : 0) + " of entries, expected " + entrySet.size() + " for segment " + entry.getKey() + " missing entries " + entrySet + " on iteration " + iteration);
        }
    }

    void testStressNodesLeavingWhilePerformingCallable(PerformOperation operation) throws Throwable {
        HashMap<Integer, Integer> masterValues = new HashMap<Integer, Integer>();
        int i = 0;
        while ((long)i < 250000L) {
            masterValues.put(i, i);
            ++i;
        }
        this.cache(0, "testCache").putAll(masterValues);
        System.out.println("Done with inserts!");
        List<Future<Void>> futures = this.forkWorkerThreads("testCache", 5, 5, new Object[25], (cache, args, iteration) -> operation.perform((Cache<Integer, Integer>)cache, masterValues, iteration));
        futures.add(this.forkRestartingThread(5));
        this.waitAndFinish(futures, 1, TimeUnit.MINUTES);
    }

    private <K, V> Map<Integer, Set<Map.Entry<K, V>>> generateEntriesPerSegment(KeyPartitioner keyPartitioner, Iterable<Map.Entry<K, V>> entries) {
        HashMap<Integer, Set<Map.Entry<Integer, Set>>> returnMap = new HashMap<Integer, Set<Map.Entry<Integer, Set>>>();
        for (Map.Entry<K, V> value : entries) {
            int segment = keyPartitioner.getSegment(value.getKey());
            Set set = returnMap.computeIfAbsent(segment, k -> new HashSet());
            set.add(new ImmortalCacheEntry(value.getKey(), value.getValue()));
        }
        return returnMap;
    }

    static interface PerformOperation {
        public void perform(Cache<Integer, Integer> var1, Map<Integer, Integer> var2, int var3);
    }
}

