/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.persistence;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.FlowableSubscriber;
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.infinispan.commons.test.CommonsTestingUtil;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.marshall.persistence.impl.MarshalledEntryUtil;
import org.infinispan.metadata.Metadata;
import org.infinispan.persistence.spi.MarshallableEntry;
import org.infinispan.persistence.support.WaitNonBlockingStore;
import org.infinispan.protostream.SerializationContextInitializer;
import org.infinispan.test.SingleCacheManagerTest;
import org.infinispan.test.TestDataSCI;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.util.concurrent.WithinThreadExecutor;
import org.reactivestreams.Publisher;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="persistence.ParallelIterationTest")
public abstract class ParallelIterationTest
extends SingleCacheManagerTest {
    private static final int NUM_THREADS = 10;
    private static final int NUM_ENTRIES = 200;
    protected WaitNonBlockingStore<Object, Object> store;
    protected ExecutorService executor;
    protected IntSet allSegments;

    @Override
    protected EmbeddedCacheManager createCacheManager() throws Exception {
        ConfigurationBuilder cb = this.getDefaultStandaloneCacheConfig(false);
        this.configurePersistence(cb);
        GlobalConfigurationBuilder global = new GlobalConfigurationBuilder();
        global.globalState().persistentLocation(CommonsTestingUtil.tmpDirectory(this.getClass()));
        global.serialization().addContextInitializer(this.getSerializationContextInitializer());
        EmbeddedCacheManager manager = TestCacheManagerFactory.createCacheManager(global, cb);
        this.store = TestingUtil.getFirstStoreWait(manager.getCache());
        this.executor = this.testExecutor();
        this.allSegments = IntSets.immutableRangeSet((int)manager.getCache().getCacheConfiguration().clustering().hash().numSegments());
        return manager;
    }

    protected abstract void configurePersistence(ConfigurationBuilder var1);

    protected SerializationContextInitializer getSerializationContextInitializer() {
        return TestDataSCI.INSTANCE;
    }

    public void testParallelIterationWithValue() {
        this.runIterationTest(this.executor, true);
    }

    public void testSequentialIterationWithValue() {
        this.runIterationTest((Executor)new WithinThreadExecutor(), true);
    }

    public void testParallelIterationWithoutValue() {
        this.runIterationTest(this.executor, false);
    }

    public void testSequentialIterationWithoutValue() {
        this.runIterationTest((Executor)new WithinThreadExecutor(), false);
    }

    private void runIterationTest(Executor executor, boolean fetchValues) {
        int i;
        ConcurrentHashMap entries = new ConcurrentHashMap();
        ConcurrentHashMap metadata = new ConcurrentHashMap();
        AtomicBoolean sameKeyMultipleTimes = new AtomicBoolean();
        Assert.assertEquals((long)this.store.sizeWait(this.allSegments), (long)0L);
        this.insertData();
        Flowable flowable = Flowable.fromPublisher((Publisher)this.store.publishEntries(this.allSegments, null, fetchValues));
        flowable = flowable.doOnNext(me -> {
            Integer existing;
            Integer key = this.unwrapKey(me.getKey());
            if (fetchValues && (existing = entries.put(key, this.unwrapValue(me.getValue()))) != null) {
                log.warnf("Already a value present for key %s: %s", (Object)key, (Object)existing);
                sameKeyMultipleTimes.set(true);
            }
            if (me.getMetadata() != null) {
                log.tracef("For key %d found metadata %s", (Object)key, (Object)me.getMetadata());
                Metadata prevMetadata = metadata.put(key, me.getMetadata());
                if (prevMetadata != null) {
                    log.warnf("Already a metadata present for key %s: %s", (Object)key, (Object)prevMetadata);
                    sameKeyMultipleTimes.set(true);
                }
            } else {
                log.tracef("No metadata found for key %d", (Object)key);
            }
        });
        TestSubscriber subscriber = TestSubscriber.create((long)0L);
        flowable.subscribe((FlowableSubscriber)subscriber);
        int batchsize = 10;
        for (i = 0; i < 200 / batchsize - 1; ++i) {
            executor.execute(() -> subscriber.request((long)batchsize));
        }
        subscriber.awaitCount(200 - batchsize);
        subscriber.request((long)(batchsize + 1));
        subscriber.awaitDone(10L, TimeUnit.SECONDS);
        subscriber.assertNoErrors();
        Assert.assertEquals((int)200, (int)subscriber.values().size());
        Assert.assertFalse((boolean)sameKeyMultipleTimes.get());
        for (i = 0; i < 200; ++i) {
            if (fetchValues) {
                Assert.assertEquals(entries.get(i), (Object)i, (String)("For key " + i));
            } else {
                Assert.assertNull(entries.get(i), (String)("For key " + i));
            }
            if (this.hasMetadata(fetchValues, i)) {
                Assert.assertNotNull(metadata.get(i), (String)("For key " + i));
                Assert.assertEquals((long)((Metadata)metadata.get(i)).lifespan(), (long)this.lifespan(i), (String)("For key " + i));
                Assert.assertEquals((long)((Metadata)metadata.get(i)).maxIdle(), (long)this.maxIdle(i), (String)("For key " + i));
                continue;
            }
            this.assertMetadataEmpty((Metadata)metadata.get(i));
        }
    }

    private void insertData() {
        for (int i = 0; i < 200; ++i) {
            long now = System.currentTimeMillis();
            Metadata metadata = this.insertMetadata(i) ? TestingUtil.metadata(this.lifespan(i), this.maxIdle(i)) : null;
            MarshallableEntry<Object, Object> me = MarshalledEntryUtil.create(this.wrapKey(i), this.wrapValue(i, i), metadata, now, now, this.cache);
            this.store.write(me);
        }
    }

    protected void assertMetadataEmpty(Metadata metadata) {
        Assert.assertNull((Object)metadata);
    }

    protected boolean insertMetadata(int i) {
        return i % 2 == 0;
    }

    protected boolean hasMetadata(boolean fetchValues, int i) {
        return this.insertMetadata(i);
    }

    protected long lifespan(int i) {
        return 1000L * (long)(i + 1000);
    }

    protected long maxIdle(int i) {
        return 10000L * (long)(i + 1000);
    }

    protected Object wrapKey(int key) {
        return key;
    }

    protected Integer unwrapKey(Object key) {
        return (Integer)key;
    }

    protected Object wrapValue(int key, int value) {
        return value;
    }

    protected Integer unwrapValue(Object value) {
        return (Integer)value;
    }
}

