/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.reactive.publisher.impl;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.infinispan.Cache;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.ImmortalCacheEntry;
import org.infinispan.container.entries.NullCacheEntry;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.NonTxInvocationContext;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.marshall.core.MarshallableFunctions;
import org.infinispan.reactive.publisher.PublisherReducers;
import org.infinispan.reactive.publisher.impl.ClusterPublisherManager;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
import org.infinispan.reactive.publisher.impl.PublisherHandler;
import org.infinispan.reactive.publisher.impl.SegmentCompletionPublisher;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.InCacheMode;
import org.infinispan.util.function.SerializableFunction;
import org.mockito.Mockito;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="reactive.publisher.impl.SimpleClusterPublisherManagerTest")
@InCacheMode(value={CacheMode.REPL_SYNC, CacheMode.DIST_SYNC, CacheMode.SCATTERED_SYNC})
public class SimpleClusterPublisherManagerTest
extends MultipleCacheManagersTest {
    @Override
    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder builder = SimpleClusterPublisherManagerTest.getDefaultClusteredCacheConfig(this.cacheMode, false);
        builder.clustering().hash().numSegments(10);
        this.createCluster(builder, 4);
        this.waitForClusterToForm();
    }

    private Map<Integer, String> insert(Cache<Integer, String> cache) {
        int amount = 24;
        HashMap<Integer, String> values = new HashMap<Integer, String>(amount);
        HashMap keysBySegment = log.isTraceEnabled() ? new HashMap() : null;
        KeyPartitioner kp = TestingUtil.extractComponent(cache, KeyPartitioner.class);
        IntStream.range(0, amount).forEach(i -> {
            values.put(i, "value-" + i);
            if (keysBySegment != null) {
                int segment = kp.getSegment((Object)i);
                IntSet keys = keysBySegment.computeIfAbsent(segment, IntSets::mutableEmptySet);
                keys.set(i);
            }
        });
        if (keysBySegment != null) {
            log.tracef("Keys by segment are: " + keysBySegment, new Object[0]);
        }
        cache.putAll(values);
        return values;
    }

    private ClusterPublisherManager<Integer, String> cpm(Cache<Integer, String> cache) {
        return TestingUtil.extractComponent(cache, ClusterPublisherManager.class);
    }

    @DataProvider(name="GuaranteeParallelEntry")
    public Object[][] collectionAndVersionsProvider() {
        return (Object[][])Arrays.stream(DeliveryGuarantee.values()).flatMap(dg -> Stream.of(Boolean.TRUE, Boolean.FALSE).flatMap(parallel -> Stream.of(Boolean.TRUE, Boolean.FALSE).map(entry -> new Object[]{dg, parallel, entry}))).toArray(x$0 -> new Object[x$0][]);
    }

    @Test(dataProvider="GuaranteeParallelEntry")
    public void testCount(DeliveryGuarantee deliveryGuarantee, boolean parallel, boolean isEntry) {
        Cache cache = this.cache(0);
        int insertAmount = this.insert(cache).size();
        ClusterPublisherManager<Integer, String> cpm = this.cpm(cache);
        CompletionStage stageCount = isEntry ? cpm.entryReduction(parallel, null, null, null, false, deliveryGuarantee, PublisherReducers.count(), PublisherReducers.add()) : cpm.keyReduction(parallel, null, null, null, false, deliveryGuarantee, PublisherReducers.count(), PublisherReducers.add());
        Long actualCount = (Long)stageCount.toCompletableFuture().join();
        AssertJUnit.assertEquals((int)insertAmount, (int)actualCount.intValue());
    }

    @Test(dataProvider="GuaranteeParallelEntry")
    public void testCountSegments(DeliveryGuarantee deliveryGuarantee, boolean parallel, boolean isEntry) {
        Cache cache = this.cache(0);
        int insertAmount = this.insert(cache).size();
        IntSet targetSegments = IntSets.mutableEmptySet();
        for (int i = 2; i <= 8; ++i) {
            targetSegments.set(i);
        }
        ClusterPublisherManager<Integer, String> cpm = this.cpm(cache);
        CompletionStage stageCount = isEntry ? cpm.entryReduction(parallel, targetSegments, null, null, false, deliveryGuarantee, PublisherReducers.count(), PublisherReducers.add()) : cpm.keyReduction(parallel, targetSegments, null, null, false, deliveryGuarantee, PublisherReducers.count(), PublisherReducers.add());
        Long actualCount = (Long)stageCount.toCompletableFuture().join();
        int expected = SimpleClusterPublisherManagerTest.findHowManyInSegments(insertAmount, targetSegments, TestingUtil.extractComponent(cache, KeyPartitioner.class));
        AssertJUnit.assertEquals((int)expected, (int)actualCount.intValue());
    }

    @Test(dataProvider="GuaranteeParallelEntry")
    public void testCountSpecificKeys(DeliveryGuarantee deliveryGuarantee, boolean parallel, boolean isEntry) {
        Cache cache = this.cache(0);
        int insertAmount = this.insert(cache).size();
        HashSet<Integer> keysToInclude = new HashSet<Integer>();
        for (int i = 0; i < insertAmount; i += 2) {
            keysToInclude.add(i);
        }
        keysToInclude.add(insertAmount + 1);
        ClusterPublisherManager<Integer, String> cpm = this.cpm(cache);
        CompletionStage stageCount = isEntry ? cpm.entryReduction(parallel, null, keysToInclude, null, false, deliveryGuarantee, PublisherReducers.count(), PublisherReducers.add()) : cpm.keyReduction(parallel, null, keysToInclude, null, false, deliveryGuarantee, PublisherReducers.count(), PublisherReducers.add());
        Long actualCount = (Long)stageCount.toCompletableFuture().join();
        int expected = insertAmount / 2;
        AssertJUnit.assertEquals((int)expected, (int)actualCount.intValue());
    }

    @Test(dataProvider="GuaranteeParallelEntry")
    public void testCountWithContext(DeliveryGuarantee deliveryGuarantee, boolean parallel, boolean isEntry) {
        Cache cache = this.cache(0);
        int insertAmount = this.insert(cache).size();
        NonTxInvocationContext ctx = new NonTxInvocationContext(null);
        ctx.putLookedUpEntry((Object)0, (CacheEntry)NullCacheEntry.getInstance());
        ctx.putLookedUpEntry((Object)(insertAmount - 2), (CacheEntry)Mockito.when((Object)((CacheEntry)Mockito.mock(CacheEntry.class)).isRemoved()).thenReturn((Object)true).getMock());
        ctx.putLookedUpEntry((Object)(insertAmount + 1), (CacheEntry)new ImmortalCacheEntry((Object)(insertAmount + 1), (Object)(insertAmount + 1)));
        ClusterPublisherManager<Integer, String> cpm = this.cpm(cache);
        CompletionStage stageCount = isEntry ? cpm.entryReduction(parallel, null, null, (InvocationContext)ctx, false, deliveryGuarantee, PublisherReducers.count(), PublisherReducers.add()) : cpm.keyReduction(parallel, null, null, (InvocationContext)ctx, false, deliveryGuarantee, PublisherReducers.count(), PublisherReducers.add());
        Long actualCount = (Long)stageCount.toCompletableFuture().join();
        int expected = insertAmount - 1;
        AssertJUnit.assertEquals((int)expected, (int)actualCount.intValue());
    }

    @Test(dataProvider="GuaranteeParallelEntry")
    public void testCountWithContextSegments(DeliveryGuarantee deliveryGuarantee, boolean parallel, boolean isEntry) {
        Cache cache = this.cache(0);
        int insertAmount = this.insert(cache).size();
        IntSet targetSegments = IntSets.mutableEmptySet();
        for (int i = 2; i <= 8; ++i) {
            targetSegments.set(i);
        }
        KeyPartitioner keyPartitioner = TestingUtil.extractComponent(cache, KeyPartitioner.class);
        int expected = SimpleClusterPublisherManagerTest.findHowManyInSegments(insertAmount, targetSegments, keyPartitioner);
        AtomicInteger contextChange = new AtomicInteger();
        NonTxInvocationContext ctx = new NonTxInvocationContext(null);
        ctx.putLookedUpEntry((Object)0, (CacheEntry)NullCacheEntry.getInstance());
        ctx.putLookedUpEntry((Object)(insertAmount - 2), (CacheEntry)Mockito.when((Object)((CacheEntry)Mockito.mock(CacheEntry.class)).isRemoved()).thenReturn((Object)true).getMock());
        ctx.putLookedUpEntry((Object)(insertAmount + 1), (CacheEntry)new ImmortalCacheEntry((Object)(insertAmount + 1), (Object)(insertAmount + 1)));
        ctx.forEachEntry((o, e) -> {
            if (targetSegments.contains(keyPartitioner.getSegment(o))) {
                contextChange.addAndGet(e.isRemoved() || e.isNull() ? -1 : 1);
            }
        });
        ClusterPublisherManager<Integer, String> cpm = this.cpm(cache);
        CompletionStage stageCount = isEntry ? cpm.entryReduction(parallel, targetSegments, null, (InvocationContext)ctx, false, deliveryGuarantee, PublisherReducers.count(), PublisherReducers.add()) : cpm.keyReduction(parallel, targetSegments, null, (InvocationContext)ctx, false, deliveryGuarantee, PublisherReducers.count(), PublisherReducers.add());
        Long actualCount = (Long)stageCount.toCompletableFuture().join();
        AssertJUnit.assertEquals((int)(expected + contextChange.get()), (int)actualCount.intValue());
    }

    static int findHowManyInSegments(int insertAmount, IntSet targetSegments, KeyPartitioner kp) {
        int count = 0;
        for (int i = 0; i < insertAmount; ++i) {
            int segment = kp.getSegment((Object)i);
            if (!targetSegments.contains(segment)) continue;
            ++count;
        }
        return count;
    }

    @AfterMethod
    public void verifyNoDanglingRequests() {
        for (Cache cache : this.caches()) {
            this.eventuallyEquals(0, () -> TestingUtil.extractComponent(cache, PublisherHandler.class).openPublishers());
        }
    }

    @DataProvider(name="GuaranteeEntry")
    public Object[][] guaranteesEntryType() {
        return (Object[][])Arrays.stream(DeliveryGuarantee.values()).flatMap(dg -> Stream.of(Boolean.TRUE, Boolean.FALSE).map(entry -> new Object[]{dg, entry})).toArray(x$0 -> new Object[x$0][]);
    }

    private <I> void performPublisherOperation(DeliveryGuarantee deliveryGuarantee, boolean isEntry, IntSet segments, Set<Integer> keys, InvocationContext context, Map<Integer, String> expectedValues) {
        Consumer<Object> assertConsumer;
        SegmentCompletionPublisher publisher;
        ClusterPublisherManager<Integer, String> cpm = this.cpm(this.cache(0));
        if (isEntry) {
            publisher = cpm.entryPublisher(segments, keys, context, false, deliveryGuarantee, 10, MarshallableFunctions.identity());
            assertConsumer = obj -> {
                Map.Entry entry = (Map.Entry)obj;
                Object value = expectedValues.get(entry.getKey());
                AssertJUnit.assertEquals(value, entry.getValue());
            };
        } else {
            publisher = cpm.keyPublisher(segments, keys, context, false, deliveryGuarantee, 10, MarshallableFunctions.identity());
            assertConsumer = obj -> AssertJUnit.assertTrue((boolean)expectedValues.containsKey(obj));
        }
        int expectedSize = expectedValues.size();
        List results = (List)Flowable.fromPublisher((Publisher)publisher).toList(expectedSize).blockingGet();
        if (expectedSize != results.size()) {
            log.fatal((Object)("SIZE MISMATCH expected: " + expectedValues + " was: " + results));
        }
        AssertJUnit.assertEquals((int)expectedSize, (int)results.size());
        results.forEach(assertConsumer);
    }

    @Test(dataProvider="GuaranteeEntry")
    public void testSimpleIteration(DeliveryGuarantee deliveryGuarantee, boolean isEntry) {
        Cache cache = this.cache(0);
        Map<Integer, String> values = this.insert(cache);
        this.performPublisherOperation(deliveryGuarantee, isEntry, null, null, null, values);
    }

    @Test(dataProvider="GuaranteeEntry")
    public void testIterationSegments(DeliveryGuarantee deliveryGuarantee, boolean isEntry) {
        Cache cache = this.cache(0);
        Map<Integer, String> values = this.insert(cache);
        IntSet targetSegments = IntSets.mutableEmptySet();
        for (int i = 2; i <= 7; ++i) {
            targetSegments.set(i);
        }
        this.removeEntriesNotInSegment(values, TestingUtil.extractComponent(cache, KeyPartitioner.class), targetSegments);
        this.performPublisherOperation(deliveryGuarantee, isEntry, targetSegments, null, null, values);
    }

    @Test(dataProvider="GuaranteeEntry")
    public void testContextIteration(DeliveryGuarantee deliveryGuarantee, boolean isEntry) {
        Cache cache = this.cache(0);
        Map<Integer, String> values = this.insert(cache);
        NonTxInvocationContext ctx = new NonTxInvocationContext(null);
        ctx.putLookedUpEntry((Object)0, (CacheEntry)NullCacheEntry.getInstance());
        values.remove(0);
        ctx.putLookedUpEntry((Object)7, (CacheEntry)Mockito.when((Object)((CacheEntry)Mockito.mock(CacheEntry.class)).isRemoved()).thenReturn((Object)true).getMock());
        values.remove(7);
        ctx.putLookedUpEntry((Object)156, (CacheEntry)new ImmortalCacheEntry((Object)156, (Object)"value-156"));
        values.put(156, "value-156");
        this.performPublisherOperation(deliveryGuarantee, isEntry, null, null, (InvocationContext)ctx, values);
    }

    @Test(dataProvider="GuaranteeEntry")
    public void testSpecificKeyIteration(DeliveryGuarantee deliveryGuarantee, boolean isEntry) {
        Cache cache = this.cache(0);
        Map<Integer, String> values = this.insert(cache);
        HashSet<Integer> keysToUse = new HashSet<Integer>();
        keysToUse.add(1);
        keysToUse.add(4);
        keysToUse.add(7);
        keysToUse.add(123);
        values.entrySet().removeIf(e -> !keysToUse.contains(e.getKey()));
        this.performPublisherOperation(deliveryGuarantee, isEntry, null, keysToUse, null, values);
    }

    @Test(dataProvider="GuaranteeEntry")
    public void testMapIteration(DeliveryGuarantee deliveryGuarantee, boolean isEntry) {
        SegmentCompletionPublisher publisher;
        List mappedValues;
        Cache cache = this.cache(0);
        ClusterPublisherManager<Integer, String> cpm = this.cpm(this.cache(0));
        Map<Integer, String> values = this.insert(cache);
        if (isEntry) {
            mappedValues = values.entrySet().stream().map(Map.Entry::getValue).map(String::valueOf).collect(Collectors.toList());
            publisher = cpm.entryPublisher(null, null, null, false, deliveryGuarantee, 10, (Function)(SerializableFunction & Serializable)entryPublisher -> Flowable.fromPublisher((Publisher)entryPublisher).map(Map.Entry::getValue).map(String::valueOf));
        } else {
            mappedValues = values.keySet().stream().map(String::valueOf).collect(Collectors.toList());
            publisher = cpm.keyPublisher(null, null, null, false, deliveryGuarantee, 10, (Function)(SerializableFunction & Serializable)entryPublisher -> Flowable.fromPublisher((Publisher)entryPublisher).map(String::valueOf));
        }
        this.performFunctionPublisherOperation((Publisher)publisher, mappedValues);
    }

    @Test(dataProvider="GuaranteeEntry")
    public void testEmptySegmentNotification(DeliveryGuarantee deliveryGuarantee, boolean isEntry) throws InterruptedException {
        this.performSegmentPublisherOperation(deliveryGuarantee, isEntry, null, null, null, null);
    }

    private <I, R> void performSegmentPublisherOperation(DeliveryGuarantee deliveryGuarantee, boolean isEntry, IntSet segments, Set<Integer> keys, InvocationContext context, Map<Integer, String> expectedValues) throws InterruptedException {
        ClusterPublisherManager<Integer, String> cpm = this.cpm(this.cache(0));
        SegmentCompletionPublisher publisher = isEntry ? cpm.entryPublisher(segments, keys, context, false, deliveryGuarantee, 10, MarshallableFunctions.identity()) : cpm.keyPublisher(segments, keys, context, false, deliveryGuarantee, 10, MarshallableFunctions.identity());
        IntSet mutableIntSet = IntSets.concurrentSet((int)10);
        TestSubscriber testSubscriber = TestSubscriber.create();
        publisher.subscribe((Subscriber)testSubscriber, arg_0 -> ((IntSet)mutableIntSet).set(arg_0));
        testSubscriber.await(10L, TimeUnit.SECONDS);
        AssertJUnit.assertEquals((Object)IntSets.immutableRangeSet((int)10), (Object)mutableIntSet);
    }

    private <I, R> void performFunctionPublisherOperation(Publisher<R> publisher, Collection<R> expectedValues) {
        List results;
        int expectedSize = expectedValues.size();
        if (expectedSize != (results = (List)Flowable.fromPublisher(publisher).toList(expectedSize).blockingGet()).size()) {
            log.fatal((Object)("SIZE MISMATCH was: " + results.size()));
        }
        AssertJUnit.assertEquals((int)expectedSize, (int)results.size());
        results.forEach(result -> AssertJUnit.assertTrue((boolean)expectedValues.contains(result)));
    }

    private void removeEntriesNotInSegment(Map<?, ?> map, KeyPartitioner kp, IntSet segments) {
        Iterator<?> keyIter = map.keySet().iterator();
        while (keyIter.hasNext()) {
            Object key = keyIter.next();
            int segment = kp.getSegment(key);
            if (segments.contains(segment)) continue;
            keyIter.remove();
        }
    }
}

