package reactor.core.scheduler;

import java.util.ArrayList;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import reactor.core.Cancellation;
import reactor.core.scheduler.Scheduler;
import reactor.util.concurrent.OpenHashSet;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:reactor/core/scheduler/ElasticScheduler.class */
public final class ElasticScheduler implements Scheduler {
    final ThreadFactory factory;
    final int ttlSeconds;
    static final int DEFAULT_TTL_SECONDS = 60;
    final Queue<ExecutorServiceExpiry> cache = new ConcurrentLinkedQueue();
    final Queue<ExecutorService> all = new ConcurrentLinkedQueue();
    final ScheduledExecutorService evictor = Executors.newScheduledThreadPool(1, EVICTOR_FACTORY);
    volatile boolean shutdown;
    static final AtomicLong COUNTER = new AtomicLong();
    static final ThreadFactory EVICTOR_FACTORY = runnable -> {
        Thread thread = new Thread(runnable, "elastic-evictor-" + COUNTER.incrementAndGet());
        thread.setDaemon(true);
        return thread;
    };
    static final ExecutorService SHUTDOWN = Executors.newSingleThreadExecutor();

    /* loaded from: input_file:reactor/core/scheduler/ElasticScheduler$CachedWorker.class */
    static final class CachedWorker implements Scheduler.Worker {
        final ExecutorService executor;
        final ElasticScheduler parent;
        volatile boolean shutdown;
        OpenHashSet<CachedTask> tasks = new OpenHashSet<>();

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:reactor/core/scheduler/ElasticScheduler$CachedWorker$CachedTask.class */
        public static final class CachedTask extends AtomicReference<Future<?>> implements Runnable, Cancellation {
            private static final long serialVersionUID = 6799295393954430738L;
            final Runnable run;
            final CachedWorker parent;
            volatile boolean cancelled;
            static final FutureTask<Object> CANCELLED = new FutureTask<>(() -> {
            }, null);
            static final FutureTask<Object> FINISHED = new FutureTask<>(() -> {
            }, null);

            public CachedTask(Runnable runnable, CachedWorker cachedWorker) {
                this.run = runnable;
                this.parent = cachedWorker;
            }

            @Override // java.lang.Runnable
            public void run() {
                try {
                    if (!this.parent.shutdown && !this.cancelled) {
                        this.run.run();
                    }
                } catch (Throwable th) {
                    Schedulers.handleError(th);
                } finally {
                    lazySet(FINISHED);
                    this.parent.remove(this);
                }
            }

            @Override // reactor.core.Cancellation
            public void dispose() {
                this.cancelled = true;
                cancelFuture();
            }

            void setFuture(Future<?> future) {
                if (compareAndSet(null, future) || get() == FINISHED) {
                    return;
                }
                future.cancel(true);
            }

            void cancelFuture() {
                Future<?> andSet;
                Future<?> future = get();
                if (future == CANCELLED || future == FINISHED || (andSet = getAndSet(CANCELLED)) == null || andSet == CANCELLED || andSet == FINISHED) {
                    return;
                }
                andSet.cancel(true);
            }
        }

        public CachedWorker(ExecutorService executorService, ElasticScheduler elasticScheduler) {
            this.executor = executorService;
            this.parent = elasticScheduler;
        }

        @Override // reactor.core.scheduler.Scheduler.Worker
        public Cancellation schedule(Runnable runnable) {
            if (this.shutdown) {
                return Scheduler.REJECTED;
            }
            CachedTask cachedTask = new CachedTask(runnable, this);
            synchronized (this) {
                if (this.shutdown) {
                    return Scheduler.REJECTED;
                }
                this.tasks.add(cachedTask);
                try {
                    cachedTask.setFuture(this.executor.submit(cachedTask));
                    return cachedTask;
                } catch (RejectedExecutionException e) {
                    return Scheduler.REJECTED;
                }
            }
        }

        @Override // reactor.core.scheduler.Scheduler.Worker
        public void shutdown() {
            if (this.shutdown) {
                return;
            }
            synchronized (this) {
                if (this.shutdown) {
                    return;
                }
                this.shutdown = true;
                OpenHashSet<CachedTask> openHashSet = this.tasks;
                this.tasks = null;
                if (!openHashSet.isEmpty()) {
                    for (Object obj : openHashSet.keys()) {
                        if (obj != null) {
                            ((CachedTask) obj).cancelFuture();
                        }
                    }
                }
                this.parent.release(this.executor);
            }
        }

        void remove(CachedTask cachedTask) {
            if (this.shutdown) {
                return;
            }
            synchronized (this) {
                if (this.shutdown) {
                    return;
                }
                this.tasks.remove(cachedTask);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/core/scheduler/ElasticScheduler$ExecutorServiceExpiry.class */
    public static final class ExecutorServiceExpiry {
        final ExecutorService executor;
        final long expireMillis;

        public ExecutorServiceExpiry(ExecutorService executorService, long j) {
            this.executor = executorService;
            this.expireMillis = j;
        }
    }

    public ElasticScheduler(ThreadFactory threadFactory, int i) {
        this.ttlSeconds = i;
        this.factory = threadFactory;
        this.evictor.scheduleAtFixedRate(this::eviction, i, i, TimeUnit.SECONDS);
    }

    @Override // reactor.core.scheduler.Scheduler
    public void start() {
        throw new UnsupportedOperationException("Restarting not supported yet");
    }

    @Override // reactor.core.scheduler.Scheduler
    public void shutdown() {
        if (this.shutdown) {
            return;
        }
        this.shutdown = true;
        this.evictor.shutdownNow();
        this.cache.clear();
        while (true) {
            ExecutorService poll = this.all.poll();
            if (poll == null) {
                return;
            } else {
                poll.shutdownNow();
            }
        }
    }

    ExecutorService pick() {
        if (this.shutdown) {
            return SHUTDOWN;
        }
        ExecutorServiceExpiry poll = this.cache.poll();
        if (poll != null) {
            return poll.executor;
        }
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(this.factory);
        this.all.offer(newSingleThreadExecutor);
        if (!this.shutdown) {
            return newSingleThreadExecutor;
        }
        this.all.remove(newSingleThreadExecutor);
        return SHUTDOWN;
    }

    @Override // reactor.core.scheduler.Scheduler
    public Cancellation schedule(Runnable runnable) {
        ExecutorService pick = pick();
        try {
            Future<?> submit = pick.submit(() -> {
                try {
                    try {
                        runnable.run();
                    } catch (Throwable th) {
                        Schedulers.handleError(th);
                    }
                    release(pick);
                } catch (Throwable th2) {
                    release(pick);
                    throw th2;
                }
            });
            return () -> {
                submit.cancel(true);
            };
        } catch (RejectedExecutionException e) {
            return REJECTED;
        }
    }

    @Override // reactor.core.scheduler.Scheduler
    public Scheduler.Worker createWorker() {
        return new CachedWorker(pick(), this);
    }

    void release(ExecutorService executorService) {
        if (executorService == SHUTDOWN || this.shutdown) {
            return;
        }
        ExecutorServiceExpiry executorServiceExpiry = new ExecutorServiceExpiry(executorService, System.currentTimeMillis() + (this.ttlSeconds * 1000));
        this.cache.offer(executorServiceExpiry);
        if (this.shutdown && this.cache.remove(executorServiceExpiry)) {
            executorService.shutdownNow();
        }
    }

    void eviction() {
        long currentTimeMillis = System.currentTimeMillis();
        for (ExecutorServiceExpiry executorServiceExpiry : new ArrayList(this.cache)) {
            if (executorServiceExpiry.expireMillis < currentTimeMillis && this.cache.remove(executorServiceExpiry)) {
                executorServiceExpiry.executor.shutdownNow();
            }
        }
    }

    static {
        SHUTDOWN.shutdownNow();
    }
}
