/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.client;

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.impl.ClientTestUtil;
import com.hazelcast.client.impl.HazelcastClientInstanceImpl;
import com.hazelcast.client.spi.impl.ClientInvocation;
import com.hazelcast.client.spi.impl.ClientSmartInvocationServiceImpl;
import com.hazelcast.client.spi.properties.ClientProperty;
import com.hazelcast.client.test.bounce.ClientDriverFactory;
import com.hazelcast.config.Config;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastOverloadException;
import com.hazelcast.core.IMap;
import com.hazelcast.internal.util.ThreadLocalRandomProvider;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.SlowTest;
import com.hazelcast.test.bounce.BounceMemberRule;
import com.hazelcast.test.bounce.DriverFactory;
import java.lang.reflect.Field;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(value=HazelcastSerialClassRunner.class)
@Category(value={SlowTest.class})
public class ClientBackpressureBouncingTest
extends HazelcastTestSupport {
    private static final int MAX_CONCURRENT_INVOCATION_CONFIG = 100;
    private static final int WORKER_THREAD_COUNT = 5;
    private static final long TEST_DURATION_SECONDS = 240L;
    private static final long TEST_TIMEOUT_MILLIS = 600000L;
    private InvocationCheckingThread checkingThread;
    @Rule
    public BounceMemberRule bounceMemberRule = BounceMemberRule.with((Config)new Config()).driverFactory((DriverFactory)new ClientDriverFactory(){

        @Override
        protected ClientConfig getClientConfig(HazelcastInstance member) {
            ClientConfig clientConfig = new ClientConfig().setProperty(ClientProperty.MAX_CONCURRENT_INVOCATIONS.getName(), String.valueOf(100));
            clientConfig.getNetworkConfig().setRedoOperation(true);
            return clientConfig;
        }
    }).build();

    @After
    public void tearDown() throws InterruptedException {
        this.checkingThread.join();
    }

    @Test(timeout=600000L)
    public void testInFlightInvocationCountIsNotGrowing() throws Exception {
        HazelcastInstance driver = this.bounceMemberRule.getNextTestDriver();
        IMap map = driver.getMap(ClientBackpressureBouncingTest.randomMapName());
        this.startInvocationCheckingThread(driver);
        Runnable[] tasks = this.createTasks((IMap<Integer, Integer>)map);
        this.bounceMemberRule.testRepeatedly(tasks, 240L);
        System.out.println("Finished bouncing");
        this.checkingThread.assertInFlightInvocationsWereNotGrowing();
    }

    private void startInvocationCheckingThread(HazelcastInstance driver) throws Exception {
        this.checkingThread = new InvocationCheckingThread(driver);
        this.checkingThread.start();
    }

    private Runnable[] createTasks(IMap<Integer, Integer> map) {
        Runnable[] tasks = new Runnable[5];
        for (int i = 0; i < 5; ++i) {
            int workerNo = i;
            tasks[i] = new MyRunnable(map, workerNo);
        }
        return tasks;
    }

    private static class MyRunnable
    implements Runnable {
        private final IMap<Integer, Integer> map;
        private final AtomicLong progressCounter = new AtomicLong();
        private final AtomicLong failureCounter = new AtomicLong();
        private final AtomicLong backpressureCounter = new AtomicLong();
        private final ExecutionCallback<Integer> callback = new CountingCallback();
        private final int workerNo;

        public MyRunnable(IMap<Integer, Integer> map, int workerNo) {
            this.map = map;
            this.workerNo = workerNo;
        }

        @Override
        public void run() {
            block2: {
                try {
                    int key = ThreadLocalRandomProvider.get().nextInt();
                    this.map.getAsync((Object)key).andThen(this.callback);
                }
                catch (HazelcastOverloadException e) {
                    long current = this.backpressureCounter.incrementAndGet();
                    if (current % 250000L != 0L) break block2;
                    System.out.println("Worker no. " + this.workerNo + " backpressured. counter: " + current);
                }
            }
        }

        private class CountingCallback
        implements ExecutionCallback<Integer> {
            private CountingCallback() {
            }

            public void onResponse(Integer response) {
                long position = MyRunnable.this.progressCounter.incrementAndGet();
                if (position % 10000L == 0L) {
                    System.out.println("Worker no. " + MyRunnable.this.workerNo + " at " + position);
                }
            }

            public void onFailure(Throwable t) {
                long position = MyRunnable.this.failureCounter.incrementAndGet();
                if (position % 100L == 0L) {
                    System.out.println("Failure Worker no. " + MyRunnable.this.workerNo + " at " + position);
                }
            }
        }
    }

    private static class InvocationCheckingThread
    extends Thread {
        private final long deadLine;
        private final long warmUpDeadline;
        private final ConcurrentMap<Long, ClientInvocation> callIdMap;
        private int maxInvocationCountObserved;
        private int maxInvocationCountObservedDuringWarmup;

        private InvocationCheckingThread(HazelcastInstance client) throws Exception {
            long durationMillis = 240000L;
            this.warmUpDeadline = System.currentTimeMillis() + durationMillis / 5L;
            this.deadLine = System.currentTimeMillis() + durationMillis;
            this.callIdMap = this.extraCallIdMap(client);
        }

        @Override
        public void run() {
            while (System.currentTimeMillis() < this.deadLine) {
                int currentSize = this.callIdMap.size();
                this.maxInvocationCountObserved = Math.max(currentSize, this.maxInvocationCountObserved);
                if (System.currentTimeMillis() < this.warmUpDeadline) {
                    this.maxInvocationCountObservedDuringWarmup = Math.max(currentSize, this.maxInvocationCountObservedDuringWarmup);
                }
                HazelcastTestSupport.sleepAtLeastMillis((long)100L);
            }
        }

        private void assertInFlightInvocationsWereNotGrowing() throws InterruptedException {
            this.join();
            Assert.assertTrue((this.maxInvocationCountObserved > 0 ? 1 : 0) != 0);
            long maximumTolerableInvocationCount = (long)((double)this.maxInvocationCountObservedDuringWarmup * 1.2);
            Assert.assertTrue((String)("Apparently number of in-flight invocations is growing. Max. number of in-flight invocation during first fifth of test duration: " + this.maxInvocationCountObservedDuringWarmup + " Max. number of in-flight invocation in total: " + this.maxInvocationCountObserved), ((long)this.maxInvocationCountObserved <= maximumTolerableInvocationCount ? 1 : 0) != 0);
        }

        private ConcurrentMap<Long, ClientInvocation> extraCallIdMap(HazelcastInstance client) throws NoSuchFieldException, IllegalAccessException {
            HazelcastClientInstanceImpl clientImpl = ClientTestUtil.getHazelcastClientInstanceImpl(client);
            ClientSmartInvocationServiceImpl invocationService = (ClientSmartInvocationServiceImpl)clientImpl.getInvocationService();
            Field callIdMapField = ClientSmartInvocationServiceImpl.class.getSuperclass().getDeclaredField("callIdMap");
            callIdMapField.setAccessible(true);
            return (ConcurrentMap)callIdMapField.get(invocationService);
        }
    }
}

