/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.unsafe.impl.batchimport.executor;

import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import org.junit.Assert;
import org.junit.Test;
import org.neo4j.kernel.impl.transaction.log.ParkStrategy;
import org.neo4j.test.DoubleLatch;
import org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor;
import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor;

public class DynamicTaskExecutorTest {
    @Test
    public void shouldExecuteTasksInParallel() throws Exception {
        DynamicTaskExecutor executor = new DynamicTaskExecutor(2, 5, (ParkStrategy)new ParkStrategy.Park(1), this.getClass().getSimpleName());
        ControlledTask task1 = new ControlledTask();
        Task task2 = new Task();
        executor.submit((Callable)task1);
        task1.latch.awaitStart();
        executor.submit((Callable)task2);
        while (task2.executed == 0) {
        }
        task1.latch.finish();
        while (task1.executed == 0) {
        }
        executor.shutdown(true);
        Assert.assertEquals((long)1L, (long)task1.executed);
        Assert.assertEquals((long)1L, (long)task2.executed);
    }

    @Test
    public void shouldIncrementNumberOfProcessorsWhenRunning() throws Exception {
        DynamicTaskExecutor executor = new DynamicTaskExecutor(1, 5, (ParkStrategy)new ParkStrategy.Park(1), this.getClass().getSimpleName());
        ControlledTask task1 = new ControlledTask();
        Task task2 = new Task();
        executor.submit((Callable)task1);
        task1.latch.awaitStart();
        executor.submit((Callable)task2);
        executor.setNumberOfProcessors(2);
        while (task2.executed == 0) {
        }
        task1.latch.finish();
        while (task1.executed == 0) {
        }
        executor.shutdown(true);
        Assert.assertEquals((long)1L, (long)task1.executed);
        Assert.assertEquals((long)1L, (long)task2.executed);
    }

    @Test
    public void shouldDecrementNumberOfProcessorsWhenRunning() throws Exception {
        DynamicTaskExecutor executor = new DynamicTaskExecutor(2, 5, (ParkStrategy)new ParkStrategy.Park(1), this.getClass().getSimpleName());
        ControlledTask task1 = new ControlledTask();
        ControlledTask task2 = new ControlledTask();
        ControlledTask task3 = new ControlledTask();
        Task task4 = new Task();
        executor.submit((Callable)task1);
        executor.submit((Callable)task2);
        task1.latch.awaitStart();
        task2.latch.awaitStart();
        executor.submit((Callable)task3);
        executor.submit((Callable)task4);
        executor.setNumberOfProcessors(1);
        task1.latch.finish();
        task2.latch.finish();
        task3.latch.awaitStart();
        Thread.sleep(200L);
        Assert.assertEquals((long)0L, (long)task4.executed);
        task3.latch.finish();
        executor.shutdown(true);
        Assert.assertEquals((long)1L, (long)task1.executed);
        Assert.assertEquals((long)1L, (long)task2.executed);
        Assert.assertEquals((long)1L, (long)task3.executed);
        Assert.assertEquals((long)1L, (long)task4.executed);
    }

    @Test
    public void shouldExecuteMultipleTasks() throws Exception {
        DynamicTaskExecutor executor = new DynamicTaskExecutor(30, 5, (ParkStrategy)new ParkStrategy.Park(1), this.getClass().getSimpleName());
        ExpensiveTask[] tasks = new ExpensiveTask[1000];
        for (int i = 0; i < tasks.length; ++i) {
            tasks[i] = new ExpensiveTask(10);
            executor.submit((Callable)tasks[i]);
        }
        executor.shutdown(true);
        for (ExpensiveTask task : tasks) {
            Assert.assertEquals((long)1L, (long)task.executed);
        }
    }

    @Test
    public void shouldShutDownOnTaskFailure() throws Exception {
        DynamicTaskExecutor executor = new DynamicTaskExecutor(30, 5, (ParkStrategy)new ParkStrategy.Park(1), this.getClass().getSimpleName());
        IOException exception = new IOException("Test message");
        FailingTask task = new FailingTask(exception);
        executor.submit((Callable)task);
        task.latch.await();
        this.assertExceptionOnSubmit((TaskExecutor)executor, exception);
    }

    @Test
    public void shouldShutDownOnTaskFailureEvenIfOtherTasksArePending() throws Exception {
        DynamicTaskExecutor executor = new DynamicTaskExecutor(2, 10, (ParkStrategy)new ParkStrategy.Park(1), this.getClass().getSimpleName());
        IOException exception = new IOException("Test message");
        ControlledTask firstBlockingTask = new ControlledTask();
        ControlledTask secondBlockingTask = new ControlledTask();
        executor.submit((Callable)firstBlockingTask);
        executor.submit((Callable)secondBlockingTask);
        firstBlockingTask.latch.awaitStart();
        secondBlockingTask.latch.awaitStart();
        FailingTask failingTask = new FailingTask(exception);
        executor.submit((Callable)failingTask);
        ControlledTask thirdBlockingTask = new ControlledTask();
        executor.submit((Callable)thirdBlockingTask);
        firstBlockingTask.latch.finish();
        failingTask.latch.await();
        this.assertExceptionOnSubmit((TaskExecutor)executor, exception);
        executor.shutdown(false);
    }

    private void assertExceptionOnSubmit(TaskExecutor executor, IOException exception) {
        Throwable submitException = null;
        for (int i = 0; i < 5 && submitException == null; ++i) {
            try {
                executor.submit((Callable)new EmptyTask());
                Thread.sleep(100L);
                continue;
            }
            catch (Exception e) {
                submitException = e;
            }
        }
        Assert.assertNotNull(submitException);
        Assert.assertEquals((Object)exception, (Object)submitException.getCause());
    }

    private static class ControlledTask
    extends Task {
        private final DoubleLatch latch = new DoubleLatch();

        private ControlledTask() {
        }

        @Override
        public Void call() {
            this.latch.startAndAwaitFinish();
            return super.call();
        }
    }

    private static class ExpensiveTask
    extends Task {
        private final int millis;

        ExpensiveTask(int millis) {
            this.millis = millis;
        }

        @Override
        public Void call() {
            try {
                Thread.sleep(this.millis);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return super.call();
        }
    }

    private static class FailingTask
    implements Callable<Void> {
        private final Exception exception;
        private final CountDownLatch latch = new CountDownLatch(1);

        public FailingTask(Exception exception) {
            this.exception = exception;
        }

        @Override
        public Void call() throws Exception {
            try {
                throw this.exception;
            }
            catch (Throwable throwable) {
                this.latch.countDown();
                throw throwable;
            }
        }
    }

    private static class EmptyTask
    implements Callable<Void> {
        private EmptyTask() {
        }

        @Override
        public Void call() throws Exception {
            return null;
        }
    }

    private static class Task
    implements Callable<Void> {
        protected volatile int executed;

        private Task() {
        }

        @Override
        public Void call() {
            ++this.executed;
            return null;
        }
    }
}

