/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.util.concurrent;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.FlowableSubscriber;
import io.reactivex.rxjava3.processors.AsyncProcessor;
import io.reactivex.rxjava3.processors.UnicastProcessor;
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import org.infinispan.commons.test.BlockHoundHelper;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.Mocks;
import org.infinispan.util.concurrent.BlockingManager;
import org.infinispan.util.concurrent.BlockingManagerImpl;
import org.infinispan.util.concurrent.CompletionStages;
import org.mockito.MockSettings;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"unit"}, testName="util.concurrent.BlockingManagerTest")
public class BlockingManagerTest
extends AbstractInfinispanTest {
    Executor nonBlockingExecutor;
    Executor blockingExecutor;

    public void initializeMocks() {
        this.nonBlockingExecutor = (Executor)Mockito.mock(Executor.class, (MockSettings)Mockito.withSettings().defaultAnswer(Mocks.runWithExecutorAnswer(BlockHoundHelper.ensureNonBlockingExecutor())));
        this.blockingExecutor = (Executor)Mockito.mock(Executor.class, (MockSettings)Mockito.withSettings().defaultAnswer(Mocks.runWithExecutorAnswer(BlockHoundHelper.allowBlockingExecutor())));
    }

    private BlockingManager createBlockingManager(final boolean blockingInvocation) {
        this.initializeMocks();
        BlockingManagerImpl blockingManager = new BlockingManagerImpl(){

            protected boolean isCurrentThreadBlocking() {
                return blockingInvocation;
            }
        };
        blockingManager.nonBlockingExecutor = this.nonBlockingExecutor;
        blockingManager.blockingExecutor = this.blockingExecutor;
        blockingManager.start();
        return blockingManager;
    }

    public void testBlockingPublishToVoidStageInvokedBlockingThread() {
        BlockingManager blockingManager = this.createBlockingManager(true);
        CompletionStage stage = blockingManager.blockingPublisherToVoidStage((Publisher)Flowable.fromArray((Object[])new Object[]{1, 2, 3}).doOnNext(BlockHoundHelper::blockingConsume), null);
        AssertJUnit.assertTrue((boolean)CompletionStages.isCompletedSuccessfully((CompletionStage)stage));
        Mockito.verifyZeroInteractions((Object[])new Object[]{this.nonBlockingExecutor, this.blockingExecutor});
    }

    public void testBlockingPublishToVoidStageInvokedNonBlockingThread() {
        BlockingManager blockingManager = this.createBlockingManager(false);
        CompletionStage stage = blockingManager.blockingPublisherToVoidStage((Publisher)Flowable.just((Object)1).doOnNext(BlockHoundHelper::blockingConsume), null);
        AssertJUnit.assertTrue((boolean)CompletionStages.isCompletedSuccessfully((CompletionStage)stage));
        ((Executor)Mockito.verify((Object)this.blockingExecutor)).execute((Runnable)Mockito.any());
        Mockito.verifyZeroInteractions((Object[])new Object[]{this.nonBlockingExecutor});
    }

    public void testBlockingPublishToVoidStageInvokedNonBlockingThreadCompleteAfterSubscribe() {
        BlockingManager blockingManager = this.createBlockingManager(false);
        AsyncProcessor processor = AsyncProcessor.create();
        processor.onNext((Object)1);
        CompletionStage stage = blockingManager.blockingPublisherToVoidStage((Publisher)processor.doOnNext(BlockHoundHelper::blockingConsume), null);
        AssertJUnit.assertFalse((boolean)CompletionStages.isCompletedSuccessfully((CompletionStage)stage));
        processor.onComplete();
        AssertJUnit.assertTrue((boolean)CompletionStages.isCompletedSuccessfully((CompletionStage)stage));
        ((Executor)Mockito.verify((Object)this.blockingExecutor)).execute((Runnable)Mockito.any());
        ((Executor)Mockito.verify((Object)this.nonBlockingExecutor)).execute((Runnable)Mockito.any());
    }

    public void testBlockingPublisherInvokedBlockingThread() {
        BlockingManager blockingManager = this.createBlockingManager(true);
        Publisher publisher = blockingManager.blockingPublisher((Publisher)Flowable.just((Object)1).doOnNext(BlockHoundHelper::blockingConsume));
        TestSubscriber subscriber = TestSubscriber.create();
        publisher.subscribe((Subscriber)subscriber);
        subscriber.assertComplete();
        Mockito.verifyZeroInteractions((Object[])new Object[]{this.nonBlockingExecutor, this.blockingExecutor});
    }

    public void testBlockingPublisherInvokedBlockingThreadCompleteAfterSubscribe() {
        BlockingManager blockingManager = this.createBlockingManager(true);
        AsyncProcessor processor = AsyncProcessor.create();
        processor.onNext((Object)1);
        Publisher publisher = blockingManager.blockingPublisher((Publisher)processor.doOnNext(BlockHoundHelper::blockingConsume));
        TestSubscriber subscriber = TestSubscriber.create();
        publisher.subscribe((Subscriber)subscriber);
        subscriber.assertNotComplete();
        processor.onComplete();
        subscriber.assertComplete();
        Mockito.verifyZeroInteractions((Object[])new Object[]{this.nonBlockingExecutor, this.blockingExecutor});
    }

    public void testBlockingPublisherInvokedNonBlockingThread() {
        BlockingManager blockingManager = this.createBlockingManager(false);
        Publisher publisher = blockingManager.blockingPublisher((Publisher)Flowable.just((Object)1).doOnNext(BlockHoundHelper::blockingConsume));
        TestSubscriber subscriber = TestSubscriber.create();
        Flowable.fromPublisher((Publisher)publisher).doOnNext(ignore -> AssertJUnit.assertTrue((boolean)BlockHoundHelper.currentThreadRequiresNonBlocking())).subscribe((FlowableSubscriber)subscriber);
        subscriber.assertComplete();
        ((Executor)Mockito.verify((Object)this.blockingExecutor)).execute((Runnable)Mockito.any());
        ((Executor)Mockito.verify((Object)this.nonBlockingExecutor, (VerificationMode)Mockito.times((int)3))).execute((Runnable)Mockito.any());
    }

    public void testBlockingPublisherInvokedNonBlockingThreadCompleteAfterSubscribe() {
        BlockingManager blockingManager = this.createBlockingManager(false);
        UnicastProcessor processor = UnicastProcessor.create();
        processor.onNext((Object)1);
        Publisher publisher = blockingManager.blockingPublisher((Publisher)processor.doOnNext(BlockHoundHelper::blockingConsume));
        TestSubscriber subscriber = TestSubscriber.create();
        Flowable.fromPublisher((Publisher)publisher).doOnNext(ignore -> AssertJUnit.assertTrue((boolean)BlockHoundHelper.currentThreadRequiresNonBlocking())).subscribe((FlowableSubscriber)subscriber);
        subscriber.assertNotComplete();
        processor.onComplete();
        subscriber.assertComplete();
        ((Executor)Mockito.verify((Object)this.blockingExecutor)).execute((Runnable)Mockito.any());
        ((Executor)Mockito.verify((Object)this.nonBlockingExecutor, (VerificationMode)Mockito.times((int)3))).execute((Runnable)Mockito.any());
    }
}

