/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.affinity.impl;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.infinispan.Cache;
import org.infinispan.affinity.impl.KeyAffinityServiceImpl;
import org.infinispan.distribution.BaseDistFunctionalTest;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.manager.CacheContainer;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.TestingUtil;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;

public abstract class BaseKeyAffinityServiceTest
extends BaseDistFunctionalTest<Object, String> {
    protected ThreadFactory threadFactory = this.getTestThreadFactory("KeyGeneratorThread");
    protected ExecutorService executor = Executors.newSingleThreadExecutor(this.threadFactory);
    protected KeyAffinityServiceImpl<Object> keyAffinityService;

    @AfterClass(alwaysRun=true)
    public void stopExecutorService() throws InterruptedException {
        if (this.keyAffinityService != null) {
            this.keyAffinityService.stop();
        }
        if (this.executor != null) {
            this.executor.shutdown();
            boolean terminatedGracefully = this.executor.awaitTermination(100L, TimeUnit.MILLISECONDS);
            this.executor.shutdownNow();
            if (!terminatedGracefully) {
                AssertJUnit.fail((String)"KeyGenerator Executor not terminated in expected time");
            }
        }
    }

    protected void assertMapsToAddress(Object o, Address addr) {
        LocalizedCacheTopology cacheTopology = ((Cache)this.caches.get(0)).getAdvancedCache().getDistributionManager().getCacheTopology();
        List addresses = cacheTopology.getDistribution(o).writeOwners();
        AssertJUnit.assertEquals((String)("Expected key " + o + " to map to address " + addr + ". List of addresses is" + addresses), (boolean)true, (boolean)addresses.contains(addr));
    }

    protected List<Address> topology() {
        return this.topology((CacheContainer)((Cache)this.caches.get(0)).getCacheManager());
    }

    protected List<Address> topology(CacheContainer cm) {
        return cm.getCache(this.cacheName).getAdvancedCache().getRpcManager().getTransport().getMembers();
    }

    protected void assertEventualFullCapacity() throws InterruptedException {
        List<Address> addresses = this.topology();
        this.assertEventualFullCapacity(addresses);
    }

    protected void assertCorrectCapacity() throws InterruptedException {
        this.assertCorrectCapacity(this.topology());
    }

    protected void assertEventualFullCapacity(List<Address> addresses) throws InterruptedException {
        int capacity = 100;
        this.eventuallyEquals(capacity * addresses.size(), () -> this.keyAffinityService.getMaxNumberOfKeys());
        Map blockingQueueMap = this.keyAffinityService.getAddress2KeysMapping();
        for (Address addr : addresses) {
            BlockingQueue queue = (BlockingQueue)blockingQueueMap.get(addr);
            this.eventuallyEquals(capacity, queue::size);
        }
        this.eventuallyEquals(capacity * addresses.size(), () -> this.keyAffinityService.existingKeyCount.get());
        Thread.sleep(200L);
        AssertJUnit.assertFalse((boolean)this.keyAffinityService.isKeyGeneratorThreadActive());
    }

    protected void assertCorrectCapacity(List<Address> addresses) throws InterruptedException {
        Map blockingQueueMap = this.keyAffinityService.getAddress2KeysMapping();
        long maxWaitTime = 300000L;
        for (Address addr : addresses) {
            BlockingQueue queue = (BlockingQueue)blockingQueueMap.get(addr);
            long giveupTime = System.currentTimeMillis() + maxWaitTime;
            while ((float)queue.size() < 50.0f && System.currentTimeMillis() < giveupTime) {
                Thread.sleep(100L);
            }
            assert ((float)queue.size() >= 50.0f) : "Obtained " + queue.size();
        }
    }

    protected void assertKeyAffinityCorrectness() {
        List<Address> addressList = this.topology();
        this.assertKeyAffinityCorrectness(addressList);
    }

    protected void assertKeyAffinityCorrectness(Collection<Address> addressList) {
        Map blockingQueueMap = this.keyAffinityService.getAddress2KeysMapping();
        for (Address addr : addressList) {
            BlockingQueue queue = (BlockingQueue)blockingQueueMap.get(addr);
            AssertJUnit.assertEquals((int)100, (int)queue.size());
            for (Object o : queue) {
                this.assertMapsToAddress(o, addr);
            }
        }
    }

    protected void waitForClusterToResize() {
        TestingUtil.blockUntilViewsReceived(10000, false, this.caches);
        TestingUtil.waitForNoRebalance(this.caches);
        AssertJUnit.assertEquals((int)this.caches.size(), (int)this.topology().size());
    }
}

