package com.hazelcast.jet.impl.execution;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.internal.metrics.ProbeLevel;
import com.hazelcast.internal.metrics.impl.MetricsRegistryImpl;
import com.hazelcast.internal.util.ExceptionUtil;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.logging.LoggingService;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.executionservice.ExecutionService;
import com.hazelcast.spi.properties.HazelcastProperties;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

@RunWith(HazelcastParallelClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/impl/execution/TaskletExecutionServiceTest.class */
public class TaskletExecutionServiceTest extends JetTestSupport {
    private static final int THREAD_COUNT = 4;
    private TaskletExecutionService tes;
    private ExecutorService executor;
    private final CompletableFuture<Void> cancellationFuture = new CompletableFuture<>();
    private final ClassLoader classLoader = getClass().getClassLoader();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/hazelcast/jet/impl/execution/TaskletExecutionServiceTest$MockTasklet.class */
    public static class MockTasklet implements Tasklet {
        boolean isBlocking;
        boolean initFails;
        boolean callFails;
        int callsBeforeDone;
        private boolean willMakeProgress = true;
        private boolean isSleeping;
        private CountDownLatch latch;
        private long initLastsMillis;
        private volatile boolean initDone;

        MockTasklet() {
        }

        public boolean isCooperative() {
            return !this.isBlocking;
        }

        @Nonnull
        public ProgressState call() {
            if (this.callFails) {
                throw new RuntimeException("mock call failure");
            }
            if (this.isSleeping) {
                try {
                    Thread.sleep(500L);
                    return ProgressState.NO_PROGRESS;
                } catch (InterruptedException e) {
                    throw new RuntimeException("Sleeping interrupted, this should not happen because we don't interrupt blocking workers", e);
                }
            }
            if (this.latch != null) {
                try {
                    this.latch.await();
                } catch (InterruptedException e2) {
                    throw ExceptionUtil.sneakyThrow(e2);
                }
            }
            this.willMakeProgress = !this.willMakeProgress;
            int i = this.callsBeforeDone;
            this.callsBeforeDone = i - 1;
            return i == 0 ? ProgressState.DONE : this.willMakeProgress ? ProgressState.MADE_PROGRESS : ProgressState.NO_PROGRESS;
        }

        public void init() {
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(this.initLastsMillis));
            this.initDone = true;
            if (this.initFails) {
                throw new RuntimeException("mock init failure");
            }
        }

        MockTasklet blocking() {
            this.isBlocking = true;
            return this;
        }

        MockTasklet sleeping() {
            this.isSleeping = true;
            this.isBlocking = true;
            return this;
        }

        MockTasklet waitOnLatch(CountDownLatch countDownLatch) {
            this.latch = countDownLatch;
            return this;
        }

        MockTasklet initFails() {
            this.initFails = true;
            return this;
        }

        MockTasklet initLasts(long j) {
            this.initLastsMillis = j;
            return this;
        }

        MockTasklet callFails() {
            this.callFails = true;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public MockTasklet callsBeforeDone(int i) {
            this.callsBeforeDone = i;
            return this;
        }

        void assertInitDone() {
            Assert.assertTrue(this.initDone);
        }

        void assertDone() {
            Assert.assertEquals("Tasklet wasn't done", -1L, this.callsBeforeDone);
        }

        void assertNotDone() {
            Assert.assertNotEquals("Tasklet was done", -1L, this.callsBeforeDone);
        }
    }

    /* loaded from: input_file:com/hazelcast/jet/impl/execution/TaskletExecutionServiceTest$TaskletAssertingThreadLocal.class */
    private static class TaskletAssertingThreadLocal implements Tasklet {
        private static final ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> {
            return 0;
        });
        private int callCount;

        private TaskletAssertingThreadLocal() {
        }

        @Nonnull
        public ProgressState call() {
            Assert.assertEquals("the ThreadLocal was updated from multiple tasklets", this.callCount, threadLocal.get().intValue());
            threadLocal.set(Integer.valueOf(threadLocal.get().intValue() + 1));
            this.callCount++;
            LockSupport.parkNanos(10000000L);
            return this.callCount > 50 ? ProgressState.DONE : ProgressState.MADE_PROGRESS;
        }
    }

    @Before
    public void before() {
        this.executor = Executors.newCachedThreadPool();
        NodeEngine nodeEngine = (NodeEngine) Mockito.mock(NodeEngine.class);
        HazelcastInstance hazelcastInstance = (HazelcastInstance) Mockito.mock(HazelcastInstance.class);
        Mockito.when(nodeEngine.getHazelcastInstance()).thenReturn(hazelcastInstance);
        Mockito.when(hazelcastInstance.getName()).thenReturn("test-hz-instance");
        ExecutionService executionService = (ExecutionService) Mockito.mock(ExecutionService.class);
        Mockito.when(nodeEngine.getExecutionService()).thenReturn(executionService);
        Mockito.when(executionService.submit((String) ArgumentMatchers.eq("jet:tasklet_initClose"), (Runnable) ArgumentMatchers.any(Runnable.class))).then(invocationOnMock -> {
            return this.executor.submit((Runnable) invocationOnMock.getArgument(1));
        });
        LoggingService loggingService = (LoggingService) Mockito.mock(LoggingService.class);
        Mockito.when(nodeEngine.getLoggingService()).thenReturn(loggingService);
        Mockito.when(loggingService.getLogger(TaskletExecutionService.class)).thenReturn(Logger.getLogger(TaskletExecutionService.class));
        Mockito.when(nodeEngine.getMetricsRegistry()).thenReturn(new MetricsRegistryImpl((ILogger) Mockito.mock(ILogger.class), ProbeLevel.INFO));
        this.tes = new TaskletExecutionService(nodeEngine, 4, new HazelcastProperties(new Properties()));
    }

    @After
    public void after() {
        this.tes.shutdown();
        this.executor.shutdown();
    }

    @Test
    public void when_blockingTask_then_executed() {
        MockTasklet blocking = new MockTasklet().blocking();
        executeAndJoin(Collections.singletonList(blocking));
        blocking.assertDone();
    }

    @Test
    public void when_nonBlockingTask_then_executed() {
        MockTasklet mockTasklet = new MockTasklet();
        executeAndJoin(Collections.singletonList(mockTasklet));
        mockTasklet.assertDone();
    }

    @Test
    public void when_nonBlockingAndInitFails_then_futureFails() {
        List singletonList = Collections.singletonList(new MockTasklet().initFails());
        Assertions.assertThatThrownBy(() -> {
            executeAndJoin(singletonList);
        }).isInstanceOf(CompletionException.class).hasCauseInstanceOf(JetException.class).hasMessageContaining("mock init failure");
    }

    @Test
    public void when_manyNonBlockingAndSomeInitFails_then_allInitAwaited() {
        List<MockTasklet> asList = Arrays.asList(new MockTasklet().initLasts(100L), new MockTasklet().initLasts(200L).initFails(), new MockTasklet().initLasts(400L), new MockTasklet().initLasts(800L));
        try {
            executeAndJoin(asList);
            Assert.fail();
        } catch (CompletionException e) {
        }
        asList.forEach((v0) -> {
            v0.assertInitDone();
        });
    }

    @Test
    public void when_blockingAndInitFails_then_futureFails() {
        List singletonList = Collections.singletonList(new MockTasklet().blocking().initFails());
        Assertions.assertThatThrownBy(() -> {
            executeAndJoin(singletonList);
        }).isInstanceOf(CompletionException.class).hasCauseInstanceOf(JetException.class).hasMessageContaining("mock init failure");
    }

    @Test
    public void when_nonBlockingAndCallFails_then_futureFails() {
        List singletonList = Collections.singletonList(new MockTasklet().callFails());
        Assertions.assertThatThrownBy(() -> {
            executeAndJoin(singletonList);
        }).isInstanceOf(CompletionException.class).hasCauseInstanceOf(JetException.class).hasMessageContaining("mock call failure");
    }

    @Test
    public void when_blockingAndCallFails_then_futureFails() {
        List singletonList = Collections.singletonList(new MockTasklet().blocking().callFails());
        Assertions.assertThatThrownBy(() -> {
            executeAndJoin(singletonList);
        }).isInstanceOf(CompletionException.class).hasCauseInstanceOf(JetException.class).hasMessageContaining("mock call failure");
    }

    @Test
    public void when_manyCallsWithSomeStalling_then_eventuallyDone() {
        List<MockTasklet> asList = Arrays.asList(new MockTasklet().blocking().callsBeforeDone(10), new MockTasklet().callsBeforeDone(10));
        executeAndJoin(asList);
        asList.forEach((v0) -> {
            v0.assertDone();
        });
    }

    @Test
    public void when_workStealing_then_allComplete() {
        List<MockTasklet> list = (List) Stream.generate(() -> {
            return new MockTasklet().callsBeforeDone(1000);
        }).limit(100L).collect(Collectors.toList());
        executeAndJoin(list);
        list.forEach((v0) -> {
            v0.assertDone();
        });
    }

    @Test
    public void when_nonBlockingTaskletIsCancelled_then_completesEarly() {
        List list = (List) Stream.generate(() -> {
            return new MockTasklet().callsBeforeDone(Integer.MAX_VALUE);
        }).limit(100L).collect(Collectors.toList());
        CompletableFuture beginExecute = this.tes.beginExecute(list, this.cancellationFuture, this.classLoader);
        this.cancellationFuture.cancel(true);
        list.forEach((v0) -> {
            v0.assertNotDone();
        });
        Objects.requireNonNull(beginExecute);
        assertThrows(CancellationException.class, beginExecute::get);
    }

    @Test
    public void when_blockingTaskletIsCancelled_then_completeEarly() {
        List list = (List) Stream.generate(() -> {
            return new MockTasklet().blocking().callsBeforeDone(Integer.MAX_VALUE);
        }).limit(100L).collect(Collectors.toList());
        CompletableFuture beginExecute = this.tes.beginExecute(list, this.cancellationFuture, this.classLoader);
        this.cancellationFuture.cancel(true);
        list.forEach((v0) -> {
            v0.assertNotDone();
        });
        Objects.requireNonNull(beginExecute);
        assertThrows(CancellationException.class, beginExecute::get);
    }

    @Test
    public void when_blockingSleepingTaskletIsCancelled_then_completeEarly() {
        List list = (List) Stream.generate(() -> {
            return new MockTasklet().sleeping().callsBeforeDone(Integer.MAX_VALUE);
        }).limit(100L).collect(Collectors.toList());
        CompletableFuture beginExecute = this.tes.beginExecute(list, this.cancellationFuture, this.classLoader);
        this.cancellationFuture.cancel(true);
        list.forEach((v0) -> {
            v0.assertNotDone();
        });
        assertTrueEventually(() -> {
            Assert.assertTrue(beginExecute.isDone());
        }, 10L);
        CompletableFuture<Void> completableFuture = this.cancellationFuture;
        Objects.requireNonNull(completableFuture);
        assertThrows(CancellationException.class, completableFuture::get);
    }

    @Test
    public void when_nonBlockingCancelled_then_doneCallBackFiredAfterActualDone() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CompletableFuture beginExecute = this.tes.beginExecute((List) Stream.generate(() -> {
            return new MockTasklet().waitOnLatch(countDownLatch).callsBeforeDone(Integer.MAX_VALUE);
        }).limit(100L).collect(Collectors.toList()), this.cancellationFuture, this.classLoader);
        this.cancellationFuture.cancel(true);
        Assert.assertFalse("future should not be completed until tasklets are completed.", beginExecute.isDone());
        countDownLatch.countDown();
        assertTrueEventually(() -> {
            Assert.assertTrue("future should be completed eventually", beginExecute.isDone());
        });
        CompletableFuture<Void> completableFuture = this.cancellationFuture;
        Objects.requireNonNull(completableFuture);
        assertThrows(CancellationException.class, completableFuture::get);
    }

    @Test
    public void when_twoNonBlockingTasklets_then_differentWorker() {
        TaskletAssertingThreadLocal taskletAssertingThreadLocal = new TaskletAssertingThreadLocal();
        TaskletAssertingThreadLocal taskletAssertingThreadLocal2 = new TaskletAssertingThreadLocal();
        Assert.assertTrue(taskletAssertingThreadLocal.isCooperative());
        CompletableFuture beginExecute = this.tes.beginExecute(Collections.singletonList(taskletAssertingThreadLocal), new CompletableFuture(), this.classLoader);
        CompletableFuture beginExecute2 = this.tes.beginExecute(Collections.singletonList(taskletAssertingThreadLocal2), new CompletableFuture(), this.classLoader);
        beginExecute.join();
        beginExecute2.join();
    }

    @Test
    public void when_tryCompleteOnReturnedFuture_then_fails() {
        CompletableFuture beginExecute = this.tes.beginExecute(Collections.singletonList(new MockTasklet().callsBeforeDone(Integer.MAX_VALUE)), this.cancellationFuture, this.classLoader);
        assertThrows(UnsupportedOperationException.class, () -> {
            beginExecute.complete(null);
        });
    }

    @Test
    public void when_tryCompleteExceptionallyOnReturnedFuture_then_fails() {
        CompletableFuture beginExecute = this.tes.beginExecute(Collections.singletonList(new MockTasklet().callsBeforeDone(Integer.MAX_VALUE)), this.cancellationFuture, this.classLoader);
        assertThrows(UnsupportedOperationException.class, () -> {
            beginExecute.completeExceptionally(new RuntimeException());
        });
    }

    @Test
    public void when_tryCancelOnReturnedFuture_then_fails() {
        CompletableFuture beginExecute = this.tes.beginExecute(Collections.singletonList(new MockTasklet().callsBeforeDone(Integer.MAX_VALUE)), this.cancellationFuture, this.classLoader);
        assertThrows(UnsupportedOperationException.class, () -> {
            beginExecute.cancel(true);
        });
    }

    @Test
    public void when_cancellationFutureCompleted_then_fails() throws Throwable {
        CompletableFuture beginExecute = this.tes.beginExecute(Collections.singletonList(new MockTasklet().callsBeforeDone(Integer.MAX_VALUE)), this.cancellationFuture, this.classLoader);
        this.cancellationFuture.complete(null);
        assertThrows(IllegalStateException.class, () -> {
            try {
                beginExecute.join();
            } catch (CompletionException e) {
                throw com.hazelcast.jet.impl.util.ExceptionUtil.peel(e);
            }
        });
    }

    private void executeAndJoin(List<MockTasklet> list) {
        this.tes.beginExecute(list, this.cancellationFuture, this.classLoader).join();
    }
}
