package com.hazelcast.jet.impl.execution;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.NonCompletableFuture;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.util.concurrent.BackoffIdleStrategy;
import com.hazelcast.util.concurrent.IdleStrategy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.regex.Matcher;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/jet/impl/execution/TaskletExecutionService.class */
public class TaskletExecutionService {
    private static final IdleStrategy IDLER_COOPERATIVE = new BackoffIdleStrategy(0, 0, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MILLISECONDS.toNanos(1));
    private static final IdleStrategy IDLER_NON_COOPERATIVE = new BackoffIdleStrategy(0, 0, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MILLISECONDS.toNanos(5));
    private final CooperativeWorker[] cooperativeWorkers;
    private final Thread[] cooperativeThreadPool;
    private final String hzInstanceName;
    private final ILogger logger;
    private volatile boolean isShutdown;
    private final ExecutorService blockingTaskletExecutor = Executors.newCachedThreadPool(new BlockingTaskThreadFactory());
    private final AtomicInteger cooperativeThreadIndex = new AtomicInteger();

    /* loaded from: input_file:com/hazelcast/jet/impl/execution/TaskletExecutionService$BlockingTaskThreadFactory.class */
    private final class BlockingTaskThreadFactory implements ThreadFactory {
        private final AtomicInteger seq;

        private BlockingTaskThreadFactory() {
            this.seq = new AtomicInteger();
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(@Nonnull Runnable runnable) {
            return new Thread(runnable, String.format("hz.%s.jet.blocking.thread-%d", TaskletExecutionService.this.hzInstanceName, Integer.valueOf(this.seq.getAndIncrement())));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/impl/execution/TaskletExecutionService$BlockingWorker.class */
    public final class BlockingWorker implements Runnable {
        private final TaskletTracker tracker;
        private final CountDownLatch startedLatch;
        static final /* synthetic */ boolean $assertionsDisabled;

        private BlockingWorker(TaskletTracker taskletTracker, CountDownLatch countDownLatch) {
            this.tracker = taskletTracker;
            this.startedLatch = countDownLatch;
        }

        @Override // java.lang.Runnable
        public void run() {
            ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
            Tasklet tasklet = this.tracker.tasklet;
            Thread.currentThread().setContextClassLoader(this.tracker.jobClassLoader);
            String name = Thread.currentThread().getName();
            Thread.currentThread().setName(name.replaceAll(".thread-[0-9]+$", Matcher.quoteReplacement("." + this.tracker.tasklet)));
            if (!$assertionsDisabled && name.equals(Thread.currentThread().getName())) {
                throw new AssertionError("unexpected thread name pattern: " + name);
            }
            try {
                try {
                    this.startedLatch.countDown();
                    tasklet.init();
                    long j = 0;
                    do {
                        ProgressState call = tasklet.call();
                        if (call.isMadeProgress()) {
                            j = 0;
                        } else {
                            if (j < 2147483647L) {
                                j++;
                            }
                            TaskletExecutionService.IDLER_NON_COOPERATIVE.idle(j);
                        }
                        if (call.isDone() || this.tracker.executionTracker.executionCompletedExceptionally()) {
                            break;
                        }
                    } while (!TaskletExecutionService.this.isShutdown);
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                    Thread.currentThread().setName(name);
                    this.tracker.executionTracker.taskletDone();
                } catch (Throwable th) {
                    TaskletExecutionService.this.logger.warning("Exception in " + tasklet, th);
                    this.tracker.executionTracker.exception(new JetException("Exception in " + tasklet + ": " + th, th));
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                    Thread.currentThread().setName(name);
                    this.tracker.executionTracker.taskletDone();
                }
            } catch (Throwable th2) {
                Thread.currentThread().setContextClassLoader(contextClassLoader);
                Thread.currentThread().setName(name);
                this.tracker.executionTracker.taskletDone();
                throw th2;
            }
        }

        static {
            $assertionsDisabled = !TaskletExecutionService.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/impl/execution/TaskletExecutionService$CooperativeWorker.class */
    public final class CooperativeWorker implements Runnable {
        private static final int COOPERATIVE_LOGGING_THRESHOLD = 5;
        private final List<TaskletTracker> trackers = new CopyOnWriteArrayList();
        private final CooperativeWorker[] colleagues;

        CooperativeWorker(CooperativeWorker[] cooperativeWorkerArr) {
            this.colleagues = cooperativeWorkerArr;
        }

        @Override // java.lang.Runnable
        public void run() {
            Thread currentThread = Thread.currentThread();
            ClassLoader contextClassLoader = currentThread.getContextClassLoader();
            long j = 0;
            while (!TaskletExecutionService.this.isShutdown) {
                boolean z = false;
                for (TaskletTracker taskletTracker : this.trackers) {
                    long nanoTime = TaskletExecutionService.this.logger.isFinestEnabled() ? System.nanoTime() : 0L;
                    CooperativeWorker cooperativeWorker = taskletTracker.stealingWorker.get();
                    if (cooperativeWorker != null) {
                        taskletTracker.stealingWorker.set(null);
                        this.trackers.remove(taskletTracker);
                        cooperativeWorker.trackers.add(taskletTracker);
                        LoggingUtil.logFinest(TaskletExecutionService.this.logger, "Tasklet %s was stolen from this worker", taskletTracker.tasklet);
                    } else {
                        try {
                            currentThread.setContextClassLoader(taskletTracker.jobClassLoader);
                            ProgressState call = taskletTracker.tasklet.call();
                            if (call.isDone()) {
                                dismissTasklet(taskletTracker);
                            } else {
                                z |= call.isMadeProgress();
                            }
                        } catch (Throwable th) {
                            TaskletExecutionService.this.logger.warning("Exception in " + taskletTracker.tasklet, th);
                            taskletTracker.executionTracker.exception(new JetException("Exception in " + taskletTracker.tasklet + ": " + th, th));
                        }
                        if (taskletTracker.executionTracker.executionCompletedExceptionally()) {
                            dismissTasklet(taskletTracker);
                        }
                        if (TaskletExecutionService.this.logger.isFinestEnabled()) {
                            long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime);
                            if (millis > 5) {
                                TaskletExecutionService.this.logger.finest("Cooperative tasklet call of '" + taskletTracker.tasklet + "' took more than 5 ms: " + millis + "ms");
                            }
                        }
                    }
                }
                if (z) {
                    j = 0;
                } else {
                    currentThread.setContextClassLoader(contextClassLoader);
                    if (j < 2147483647L) {
                        j++;
                    }
                    TaskletExecutionService.IDLER_COOPERATIVE.idle(j);
                }
            }
            this.trackers.forEach(taskletTracker2 -> {
                taskletTracker2.executionTracker.taskletDone();
            });
            this.trackers.clear();
        }

        private void dismissTasklet(TaskletTracker taskletTracker) {
            taskletTracker.executionTracker.taskletDone();
            this.trackers.remove(taskletTracker);
            stealWork();
        }

        private void stealWork() {
            while (true) {
                List<TaskletTracker> list = this.trackers;
                for (CooperativeWorker cooperativeWorker : this.colleagues) {
                    if (cooperativeWorker.trackers.size() > list.size()) {
                        list = cooperativeWorker.trackers;
                    }
                }
                if (list.size() < this.trackers.size() + 2) {
                    return;
                }
                Iterator<TaskletTracker> it = list.iterator();
                while (it.hasNext()) {
                    if (it.next().stealingWorker.compareAndSet(null, this)) {
                        return;
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/impl/execution/TaskletExecutionService$ExecutionTracker.class */
    public final class ExecutionTracker {
        List<Future> blockingFutures;
        private final AtomicInteger completionLatch;
        final NonCompletableFuture future = new NonCompletableFuture();
        private final AtomicReference<Throwable> executionException = new AtomicReference<>();

        ExecutionTracker(int i, CompletableFuture<Void> completableFuture) {
            this.completionLatch = new AtomicInteger(i);
            completableFuture.whenComplete(ExceptionUtil.withTryCatch(TaskletExecutionService.this.logger, (r7, th) -> {
                if (!(th instanceof CancellationException)) {
                    exception(new IllegalStateException("cancellationFuture was completed with something other than CancellationException: " + th, th));
                } else {
                    exception(th);
                    this.blockingFutures.forEach(future -> {
                        future.cancel(true);
                    });
                }
            }));
        }

        void exception(Throwable th) {
            this.executionException.compareAndSet(null, th);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void taskletDone() {
            if (this.completionLatch.decrementAndGet() == 0) {
                Throwable th = this.executionException.get();
                if (th == null) {
                    this.future.internalComplete();
                } else {
                    this.future.internalCompleteExceptionally(th);
                }
            }
        }

        boolean executionCompletedExceptionally() {
            return this.executionException.get() != null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/impl/execution/TaskletExecutionService$TaskletTracker.class */
    public static final class TaskletTracker {
        final Tasklet tasklet;
        final ExecutionTracker executionTracker;
        final ClassLoader jobClassLoader;
        final AtomicReference<CooperativeWorker> stealingWorker = new AtomicReference<>();

        TaskletTracker(Tasklet tasklet, ExecutionTracker executionTracker, ClassLoader classLoader) {
            this.tasklet = tasklet;
            this.executionTracker = executionTracker;
            this.jobClassLoader = classLoader;
        }

        public String toString() {
            return "Tracking " + this.tasklet;
        }
    }

    public TaskletExecutionService(HazelcastInstance hazelcastInstance, int i) {
        this.hzInstanceName = hazelcastInstance.getName();
        this.cooperativeWorkers = new CooperativeWorker[i];
        this.cooperativeThreadPool = new Thread[i];
        this.logger = hazelcastInstance.getLoggingService().getLogger(TaskletExecutionService.class);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> beginExecute(@Nonnull List<? extends Tasklet> list, @Nonnull CompletableFuture<Void> completableFuture, @Nonnull ClassLoader classLoader) {
        ensureStillRunning();
        ExecutionTracker executionTracker = new ExecutionTracker(list.size(), completableFuture);
        try {
            Map map = (Map) list.stream().collect(Collectors.partitioningBy((v0) -> {
                return v0.isCooperative();
            }));
            submitCooperativeTasklets(executionTracker, classLoader, (List) map.get(true));
            submitBlockingTasklets(executionTracker, classLoader, (List) map.get(false));
        } catch (Throwable th) {
            executionTracker.future.internalCompleteExceptionally(th);
        }
        return executionTracker.future;
    }

    public void shutdown() {
        this.isShutdown = true;
        this.blockingTaskletExecutor.shutdownNow();
    }

    private void ensureStillRunning() {
        if (this.isShutdown) {
            throw new IllegalStateException("Execution service was already ordered to shut down");
        }
    }

    private void submitBlockingTasklets(ExecutionTracker executionTracker, ClassLoader classLoader, List<Tasklet> list) {
        CountDownLatch countDownLatch = new CountDownLatch(list.size());
        Stream<R> map = list.stream().map(tasklet -> {
            return new BlockingWorker(new TaskletTracker(tasklet, executionTracker, classLoader), countDownLatch);
        });
        ExecutorService executorService = this.blockingTaskletExecutor;
        executorService.getClass();
        executionTracker.blockingFutures = (List) map.map((v1) -> {
            return r2.submit(v1);
        }).collect(Collectors.toList());
        countDownLatch.getClass();
        Util.uncheckRun(countDownLatch::await);
    }

    private void submitCooperativeTasklets(ExecutionTracker executionTracker, ClassLoader classLoader, List<Tasklet> list) {
        ensureThreadsStarted();
        List[] listArr = new List[this.cooperativeWorkers.length];
        Arrays.setAll(listArr, i -> {
            return new ArrayList();
        });
        for (Tasklet tasklet : list) {
            tasklet.init();
            listArr[this.cooperativeThreadIndex.getAndUpdate(i2 -> {
                return (i2 + 1) % listArr.length;
            })].add(new TaskletTracker(tasklet, executionTracker, classLoader));
        }
        for (int i3 = 0; i3 < listArr.length; i3++) {
            this.cooperativeWorkers[i3].trackers.addAll(listArr[i3]);
        }
        Arrays.stream(this.cooperativeThreadPool).forEach(LockSupport::unpark);
    }

    private synchronized void ensureThreadsStarted() {
        if (this.cooperativeWorkers[0] != null) {
            return;
        }
        Arrays.setAll(this.cooperativeWorkers, i -> {
            return new CooperativeWorker(this.cooperativeWorkers);
        });
        Arrays.setAll(this.cooperativeThreadPool, i2 -> {
            return new Thread(this.cooperativeWorkers[i2], String.format("hz.%s.jet.cooperative.thread-%d", this.hzInstanceName, Integer.valueOf(i2)));
        });
        Arrays.stream(this.cooperativeThreadPool).forEach((v0) -> {
            v0.start();
        });
    }

    private String trackersToString() {
        return ((String) Arrays.stream(this.cooperativeWorkers).flatMap(cooperativeWorker -> {
            return cooperativeWorker.trackers.stream();
        }).map((v0) -> {
            return v0.toString();
        }).sorted().collect(Collectors.joining("\n"))) + "\n-----------------";
    }
}
