/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution;

import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.impl.exception.ShutdownInProgressException;
import com.hazelcast.jet.impl.execution.Tasklet;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.NonCompletableFuture;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.util.concurrent.BackoffIdleStrategy;
import com.hazelcast.util.concurrent.IdleStrategy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiConsumer;
import java.util.regex.Matcher;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

public class TaskletExecutionService {
    private static final IdleStrategy IDLER_COOPERATIVE = new BackoffIdleStrategy(0L, 0L, TimeUnit.MICROSECONDS.toNanos(1L), TimeUnit.MILLISECONDS.toNanos(1L));
    private static final IdleStrategy IDLER_NON_COOPERATIVE = new BackoffIdleStrategy(0L, 0L, TimeUnit.MICROSECONDS.toNanos(1L), TimeUnit.MILLISECONDS.toNanos(5L));
    private final ExecutorService blockingTaskletExecutor = Executors.newCachedThreadPool(new BlockingTaskThreadFactory());
    private final CooperativeWorker[] cooperativeWorkers;
    private final Thread[] cooperativeThreadPool;
    private final String hzInstanceName;
    private final ILogger logger;
    private int cooperativeThreadIndex;
    @Probe
    private final AtomicInteger blockingWorkerCount = new AtomicInteger();
    private final AtomicReference<Boolean> gracefulShutdown = new AtomicReference<Object>(null);
    private final Object lock = new Object();

    public TaskletExecutionService(NodeEngineImpl nodeEngine, int threadCount) {
        this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
        this.cooperativeWorkers = new CooperativeWorker[threadCount];
        this.cooperativeThreadPool = new Thread[threadCount];
        this.logger = nodeEngine.getLoggingService().getLogger(TaskletExecutionService.class);
        nodeEngine.getMetricsRegistry().newProbeBuilder().withTag("module", "jet").scanAndRegister(this);
        Arrays.setAll(this.cooperativeWorkers, i -> new CooperativeWorker());
        Arrays.setAll(this.cooperativeThreadPool, i -> new Thread((Runnable)this.cooperativeWorkers[i], String.format("hz.%s.jet.cooperative.thread-%d", this.hzInstanceName, i)));
        Arrays.stream(this.cooperativeThreadPool).forEach(Thread::start);
        for (int i2 = 0; i2 < this.cooperativeWorkers.length; ++i2) {
            nodeEngine.getMetricsRegistry().newProbeBuilder().withTag("module", "jet").withTag("cooperativeWorker", String.valueOf(i2)).scanAndRegister(this.cooperativeWorkers[i2]);
        }
    }

    CompletableFuture<Void> beginExecute(@Nonnull List<? extends Tasklet> tasklets, @Nonnull CompletableFuture<Void> cancellationFuture, @Nonnull ClassLoader jobClassLoader) {
        if (this.gracefulShutdown.get() != null) {
            throw new ShutdownInProgressException();
        }
        ExecutionTracker executionTracker = new ExecutionTracker(tasklets.size(), cancellationFuture);
        try {
            Map<Boolean, List<Tasklet>> byCooperation = tasklets.stream().collect(Collectors.partitioningBy(Tasklet::isCooperative));
            this.submitCooperativeTasklets(executionTracker, jobClassLoader, byCooperation.get(true));
            this.submitBlockingTasklets(executionTracker, jobClassLoader, byCooperation.get(false));
        }
        catch (Throwable t) {
            executionTracker.future.internalCompleteExceptionally(t);
        }
        return executionTracker.future;
    }

    public void shutdown(boolean graceful) {
        if (this.gracefulShutdown.compareAndSet(null, graceful)) {
            if (graceful) {
                this.blockingTaskletExecutor.shutdown();
            } else {
                this.blockingTaskletExecutor.shutdownNow();
            }
        }
    }

    private void submitBlockingTasklets(ExecutionTracker executionTracker, ClassLoader jobClassLoader, List<Tasklet> tasklets) {
        CountDownLatch startedLatch = new CountDownLatch(tasklets.size());
        executionTracker.blockingFutures = tasklets.stream().map(t -> new BlockingWorker(new TaskletTracker((Tasklet)t, executionTracker, jobClassLoader), startedLatch)).map(this.blockingTaskletExecutor::submit).collect(Collectors.toList());
        Util.uncheckRun(startedLatch::await);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void submitCooperativeTasklets(ExecutionTracker executionTracker, ClassLoader jobClassLoader, List<Tasklet> tasklets) {
        List[] trackersByThread = new List[this.cooperativeWorkers.length];
        Arrays.setAll(trackersByThread, i -> new ArrayList());
        for (Tasklet t : tasklets) {
            t.init();
        }
        Object object = this.lock;
        synchronized (object) {
            for (Tasklet t : tasklets) {
                trackersByThread[this.cooperativeThreadIndex].add(new TaskletTracker(t, executionTracker, jobClassLoader));
                this.cooperativeThreadIndex = (this.cooperativeThreadIndex + 1) % trackersByThread.length;
            }
        }
        for (int i2 = 0; i2 < trackersByThread.length; ++i2) {
            this.cooperativeWorkers[i2].trackers.addAll(trackersByThread[i2]);
        }
        Arrays.stream(this.cooperativeThreadPool).forEach(LockSupport::unpark);
    }

    private String trackersToString() {
        return Arrays.stream(this.cooperativeWorkers).flatMap(w -> ((CooperativeWorker)w).trackers.stream()).map(Object::toString).sorted().collect(Collectors.joining("\n")) + "\n-----------------";
    }

    public void awaitWorkerTermination() {
        assert (this.gracefulShutdown.get() != null) : "Not shut down";
        try {
            while (!this.blockingTaskletExecutor.awaitTermination(1L, TimeUnit.MINUTES)) {
                this.logger.warning("Blocking tasklet executor did not terminate in 1 minute");
            }
            for (Thread t : this.cooperativeThreadPool) {
                t.join();
            }
        }
        catch (InterruptedException e) {
            ExceptionUtil.sneakyThrow(e);
        }
    }

    private final class ExecutionTracker {
        final NonCompletableFuture future = new NonCompletableFuture();
        volatile List<Future> blockingFutures = Collections.emptyList();
        private final AtomicInteger completionLatch;
        private final AtomicReference<Throwable> executionException = new AtomicReference();

        ExecutionTracker(int taskletCount, CompletableFuture<Void> cancellationFuture) {
            this.completionLatch = new AtomicInteger(taskletCount);
            cancellationFuture.whenComplete((BiConsumer)ExceptionUtil.withTryCatch(TaskletExecutionService.this.logger, (r, e) -> {
                if (e == null) {
                    e = new IllegalStateException("cancellationFuture should be completed exceptionally");
                }
                this.exception((Throwable)e);
                this.blockingFutures.forEach(f -> f.cancel(true));
            }));
        }

        void exception(Throwable t) {
            this.executionException.compareAndSet(null, t);
        }

        void taskletDone() {
            if (this.completionLatch.decrementAndGet() == 0) {
                Throwable ex = this.executionException.get();
                if (ex == null) {
                    this.future.internalComplete();
                } else {
                    this.future.internalCompleteExceptionally(ex);
                }
            }
        }

        boolean executionCompletedExceptionally() {
            return this.executionException.get() != null;
        }
    }

    private final class BlockingTaskThreadFactory
    implements ThreadFactory {
        private final AtomicInteger seq = new AtomicInteger();

        private BlockingTaskThreadFactory() {
        }

        @Override
        public Thread newThread(@Nonnull Runnable r) {
            return new Thread(r, String.format("hz.%s.jet.blocking.thread-%d", TaskletExecutionService.this.hzInstanceName, this.seq.getAndIncrement()));
        }
    }

    private static final class TaskletTracker {
        final Tasklet tasklet;
        final ExecutionTracker executionTracker;
        final ClassLoader jobClassLoader;

        TaskletTracker(Tasklet tasklet, ExecutionTracker executionTracker, ClassLoader jobClassLoader) {
            this.tasklet = tasklet;
            this.executionTracker = executionTracker;
            this.jobClassLoader = jobClassLoader;
        }

        public String toString() {
            return "Tracking " + this.tasklet;
        }
    }

    private final class CooperativeWorker
    implements Runnable {
        private static final int COOPERATIVE_LOGGING_THRESHOLD = 5;
        @Probe(name="taskletCount")
        private final List<TaskletTracker> trackers;
        @Probe
        private final AtomicLong iterationCount = new AtomicLong();

        CooperativeWorker() {
            this.trackers = new CopyOnWriteArrayList<TaskletTracker>();
        }

        @Override
        public void run() {
            Boolean gracefulShutdownLocal;
            Thread thread = Thread.currentThread();
            long idleCount = 0L;
            while ((gracefulShutdownLocal = (Boolean)TaskletExecutionService.this.gracefulShutdown.get()) == null || gracefulShutdownLocal.booleanValue() && !this.trackers.isEmpty()) {
                boolean madeProgress = false;
                for (TaskletTracker t2 : this.trackers) {
                    long elapsedMs;
                    long start = 0L;
                    if (TaskletExecutionService.this.logger.isFinestEnabled()) {
                        start = System.nanoTime();
                    }
                    try {
                        thread.setContextClassLoader(t2.jobClassLoader);
                        ProgressState result = t2.tasklet.call();
                        if (result.isDone()) {
                            this.dismissTasklet(t2);
                        } else {
                            madeProgress |= result.isMadeProgress();
                        }
                    }
                    catch (Throwable e) {
                        TaskletExecutionService.this.logger.warning("Exception in " + t2.tasklet, e);
                        t2.executionTracker.exception(new JetException("Exception in " + t2.tasklet + ": " + e, e));
                    }
                    if (t2.executionTracker.executionCompletedExceptionally()) {
                        this.dismissTasklet(t2);
                    }
                    if (!TaskletExecutionService.this.logger.isFinestEnabled() || (elapsedMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)) <= 5L) continue;
                    TaskletExecutionService.this.logger.finest("Cooperative tasklet call of '" + t2.tasklet + "' took more than " + 5 + " ms: " + elapsedMs + "ms");
                }
                Util.lazyIncrement(this.iterationCount);
                if (madeProgress) {
                    idleCount = 0L;
                    continue;
                }
                IDLER_COOPERATIVE.idle(++idleCount);
            }
            this.trackers.forEach(t -> t.executionTracker.taskletDone());
            this.trackers.clear();
        }

        private void dismissTasklet(TaskletTracker t) {
            LoggingUtil.logFinest(TaskletExecutionService.this.logger, "Tasklet %s is done", t.tasklet);
            t.executionTracker.taskletDone();
            this.trackers.remove(t);
        }
    }

    private final class BlockingWorker
    implements Runnable {
        private final TaskletTracker tracker;
        private final CountDownLatch startedLatch;

        private BlockingWorker(TaskletTracker tracker, CountDownLatch startedLatch) {
            this.tracker = tracker;
            this.startedLatch = startedLatch;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            ClassLoader clBackup = Thread.currentThread().getContextClassLoader();
            Tasklet t = this.tracker.tasklet;
            String oldName = Thread.currentThread().getName();
            Thread.currentThread().setContextClassLoader(this.tracker.jobClassLoader);
            try {
                ProgressState result;
                Thread.currentThread().setName(oldName.replaceAll(".thread-[0-9]+$", Matcher.quoteReplacement("." + this.tracker.tasklet)));
                assert (!oldName.equals(Thread.currentThread().getName())) : "unexpected thread name pattern: " + oldName;
                TaskletExecutionService.this.blockingWorkerCount.incrementAndGet();
                this.startedLatch.countDown();
                t.init();
                long idleCount = 0L;
                do {
                    if ((result = t.call()).isMadeProgress()) {
                        idleCount = 0L;
                        continue;
                    }
                    IDLER_NON_COOPERATIVE.idle(++idleCount);
                } while (!result.isDone() && !this.tracker.executionTracker.executionCompletedExceptionally() && !Boolean.FALSE.equals(TaskletExecutionService.this.gracefulShutdown.get()));
            }
            catch (Throwable e) {
                TaskletExecutionService.this.logger.warning("Exception in " + t, e);
                this.tracker.executionTracker.exception(new JetException("Exception in " + t + ": " + e, e));
            }
            finally {
                TaskletExecutionService.this.blockingWorkerCount.decrementAndGet();
                Thread.currentThread().setContextClassLoader(clBackup);
                Thread.currentThread().setName(oldName);
                this.tracker.executionTracker.taskletDone();
            }
        }
    }
}

