/*
 * Decompiled with CFR 0.152.
 */
package ru.yoomoney.tech.dbqueue.config;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yoomoney.tech.dbqueue.api.QueueConsumer;
import ru.yoomoney.tech.dbqueue.config.QueueShard;
import ru.yoomoney.tech.dbqueue.config.QueueShardId;
import ru.yoomoney.tech.dbqueue.config.QueueThreadFactory;
import ru.yoomoney.tech.dbqueue.config.TaskLifecycleListener;
import ru.yoomoney.tech.dbqueue.config.ThreadLifecycleListener;
import ru.yoomoney.tech.dbqueue.internal.processing.MillisTimeProvider;
import ru.yoomoney.tech.dbqueue.internal.processing.QueueLoop;
import ru.yoomoney.tech.dbqueue.internal.processing.QueueTaskPoller;
import ru.yoomoney.tech.dbqueue.internal.runner.QueueRunner;
import ru.yoomoney.tech.dbqueue.settings.QueueId;

class QueueExecutionPool {
    private static final Logger log = LoggerFactory.getLogger(QueueExecutionPool.class);
    @Nonnull
    private final QueueConsumer<?> queueConsumer;
    @Nonnull
    private final QueueShard<?> queueShard;
    @Nonnull
    private final QueueTaskPoller queueTaskPoller;
    @Nonnull
    private final ExecutorService executor;
    @Nonnull
    private final QueueRunner queueRunner;
    @Nonnull
    private final Supplier<QueueLoop> queueLoopFactory;
    @Nonnull
    private final List<QueueWorker> queueWorkers = new ArrayList<QueueWorker>();
    private boolean started;

    QueueExecutionPool(@Nonnull QueueConsumer<?> queueConsumer, @Nonnull QueueShard<?> queueShard, @Nonnull TaskLifecycleListener taskLifecycleListener, @Nonnull ThreadLifecycleListener threadLifecycleListener) {
        this(queueConsumer, queueShard, new QueueTaskPoller(threadLifecycleListener, new MillisTimeProvider.SystemMillisTimeProvider()), new ThreadPoolExecutor((int)queueConsumer.getQueueConfig().getSettings().getProcessingSettings().getThreadCount(), Integer.MAX_VALUE, 1L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new QueueThreadFactory(queueConsumer.getQueueConfig().getLocation(), queueShard.getShardId())), QueueRunner.Factory.create(queueConsumer, queueShard, taskLifecycleListener), QueueLoop.WakeupQueueLoop::new);
    }

    QueueExecutionPool(@Nonnull QueueConsumer<?> queueConsumer, @Nonnull QueueShard<?> queueShard, @Nonnull QueueTaskPoller queueTaskPoller, @Nonnull ExecutorService executor, @Nonnull QueueRunner queueRunner, @Nonnull Supplier<QueueLoop> queueLoopFactory) {
        this.queueConsumer = Objects.requireNonNull(queueConsumer);
        this.queueShard = Objects.requireNonNull(queueShard);
        this.queueTaskPoller = Objects.requireNonNull(queueTaskPoller);
        this.executor = Objects.requireNonNull(executor);
        this.queueRunner = Objects.requireNonNull(queueRunner);
        this.queueLoopFactory = Objects.requireNonNull(queueLoopFactory);
        queueConsumer.getQueueConfig().getSettings().getProcessingSettings().registerObserver((oldValue, newValue) -> this.resizePool(newValue.getThreadCount()));
    }

    private QueueId getQueueId() {
        return this.queueConsumer.getQueueConfig().getLocation().getQueueId();
    }

    QueueShardId getQueueShardId() {
        return this.queueShard.getShardId();
    }

    void start() {
        if (!this.started && !this.isShutdown()) {
            int threadCount = this.queueConsumer.getQueueConfig().getSettings().getProcessingSettings().getThreadCount();
            log.info("starting queue: queueId={}, shardId={}, threadCount={}", new Object[]{this.getQueueId(), this.queueShard.getShardId(), threadCount});
            for (int i = 0; i < threadCount; ++i) {
                this.startThread(true);
            }
            this.setupExecutor(threadCount);
            this.started = true;
        } else {
            log.info("execution pool is already started or underlying executor is closed");
        }
    }

    void resizePool(int newThreadCount) {
        int oldThreadCount = this.queueWorkers.size();
        if (newThreadCount == oldThreadCount) {
            return;
        }
        log.info("resizing queue execution pool: queueId={}, shardId={}, oldThreadCount={}, newThreadCount={}", new Object[]{this.queueConsumer.getQueueConfig().getLocation().getQueueId(), this.queueShard.getShardId(), oldThreadCount, newThreadCount});
        if (newThreadCount > oldThreadCount) {
            for (int i = oldThreadCount; i < newThreadCount; ++i) {
                this.startThread(!this.isPaused());
            }
        } else {
            for (int i = oldThreadCount; i > newThreadCount; --i) {
                this.disposeThread();
            }
        }
        this.setupExecutor(newThreadCount);
    }

    private void setupExecutor(int newThreadCount) {
        if (this.executor instanceof ThreadPoolExecutor) {
            ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor)this.executor;
            threadPoolExecutor.setCorePoolSize(newThreadCount);
            threadPoolExecutor.allowCoreThreadTimeOut(true);
            threadPoolExecutor.purge();
        }
    }

    private void startThread(boolean startProcessing) {
        QueueLoop queueLoop = this.queueLoopFactory.get();
        Future<?> future = this.executor.submit(() -> this.queueTaskPoller.start(queueLoop, this.queueShard.getShardId(), this.queueConsumer, this.queueRunner));
        if (startProcessing) {
            queueLoop.unpause();
        }
        this.queueWorkers.add(new QueueWorker(future, queueLoop));
    }

    private void disposeThread() {
        QueueWorker queueWorker = this.queueWorkers.get(this.queueWorkers.size() - 1);
        queueWorker.getFuture().cancel(true);
        this.queueWorkers.remove(this.queueWorkers.size() - 1);
    }

    void shutdown() {
        if (this.started && !this.isShutdown()) {
            log.info("shutting down queue: queueId={}, shardId={}", (Object)this.getQueueId(), (Object)this.queueShard.getShardId());
            this.resizePool(0);
            this.executor.shutdownNow();
            this.started = false;
        } else {
            log.info("execution pool is already stopped or underlying executor is closed");
        }
    }

    void pause() {
        log.info("pausing queue: queueId={}, shardId={}", (Object)this.getQueueId(), (Object)this.queueShard.getShardId());
        this.queueWorkers.forEach(queueWorker -> queueWorker.getLoop().pause());
    }

    void unpause() {
        log.info("unpausing queue: queueId={}, shardId={}", (Object)this.getQueueId(), (Object)this.queueShard.getShardId());
        this.queueWorkers.forEach(queueWorker -> queueWorker.getLoop().unpause());
    }

    boolean isPaused() {
        return this.queueWorkers.stream().allMatch(queueWorker -> queueWorker.getLoop().isPaused());
    }

    boolean isShutdown() {
        return this.executor.isShutdown();
    }

    boolean isTerminated() {
        return this.executor.isTerminated();
    }

    boolean awaitTermination(@Nonnull Duration timeout) {
        Objects.requireNonNull(timeout, "timeout");
        log.info("awaiting queue termination: queueId={}, shardId={}, timeout={}", new Object[]{this.getQueueId(), this.queueShard.getShardId(), timeout});
        try {
            return this.executor.awaitTermination(timeout.getSeconds(), TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    void wakeup() {
        this.queueWorkers.forEach(queueWorker -> queueWorker.getLoop().doContinue());
    }

    private static class QueueWorker {
        @Nonnull
        private final Future<?> future;
        @Nonnull
        private final QueueLoop loop;

        private QueueWorker(@Nonnull Future<?> future, @Nonnull QueueLoop loop) {
            this.future = Objects.requireNonNull(future);
            this.loop = Objects.requireNonNull(loop);
        }

        @Nonnull
        public Future<?> getFuture() {
            return this.future;
        }

        @Nonnull
        public QueueLoop getLoop() {
            return this.loop;
        }
    }
}

