package ru.yoomoney.tech.dbqueue.config;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yoomoney.tech.dbqueue.api.QueueConsumer;
import ru.yoomoney.tech.dbqueue.internal.processing.MillisTimeProvider;
import ru.yoomoney.tech.dbqueue.internal.processing.QueueLoop;
import ru.yoomoney.tech.dbqueue.internal.processing.QueueTaskPoller;
import ru.yoomoney.tech.dbqueue.internal.runner.QueueRunner;
import ru.yoomoney.tech.dbqueue.settings.QueueId;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:ru/yoomoney/tech/dbqueue/config/QueueExecutionPool.class */
public class QueueExecutionPool {
    private static final Logger log = LoggerFactory.getLogger(QueueExecutionPool.class);

    @Nonnull
    private final QueueConsumer<?> queueConsumer;

    @Nonnull
    private final QueueShard<?> queueShard;

    @Nonnull
    private final QueueTaskPoller queueTaskPoller;

    @Nonnull
    private final ExecutorService executor;

    @Nonnull
    private final QueueRunner queueRunner;

    @Nonnull
    private final Supplier<QueueLoop> queueLoopFactory;

    @Nonnull
    private final List<QueueWorker> queueWorkers;
    private boolean started;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ru/yoomoney/tech/dbqueue/config/QueueExecutionPool$QueueWorker.class */
    public static class QueueWorker {

        @Nonnull
        private final Future<?> future;

        @Nonnull
        private final QueueLoop loop;

        private QueueWorker(@Nonnull Future<?> future, @Nonnull QueueLoop queueLoop) {
            this.future = (Future) Objects.requireNonNull(future);
            this.loop = (QueueLoop) Objects.requireNonNull(queueLoop);
        }

        @Nonnull
        public Future<?> getFuture() {
            return this.future;
        }

        @Nonnull
        public QueueLoop getLoop() {
            return this.loop;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public QueueExecutionPool(@Nonnull QueueConsumer<?> queueConsumer, @Nonnull QueueShard<?> queueShard, @Nonnull TaskLifecycleListener taskLifecycleListener, @Nonnull ThreadLifecycleListener threadLifecycleListener) {
        this(queueConsumer, queueShard, new QueueTaskPoller(threadLifecycleListener, new MillisTimeProvider.SystemMillisTimeProvider()), new ThreadPoolExecutor(queueConsumer.getQueueConfig().getSettings().getProcessingSettings().getThreadCount().intValue(), Integer.MAX_VALUE, 1L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new QueueThreadFactory(queueConsumer.getQueueConfig().getLocation(), queueShard.getShardId())), QueueRunner.Factory.create(queueConsumer, queueShard, taskLifecycleListener), QueueLoop.WakeupQueueLoop::new);
    }

    QueueExecutionPool(@Nonnull QueueConsumer<?> queueConsumer, @Nonnull QueueShard<?> queueShard, @Nonnull QueueTaskPoller queueTaskPoller, @Nonnull ExecutorService executorService, @Nonnull QueueRunner queueRunner, @Nonnull Supplier<QueueLoop> supplier) {
        this.queueWorkers = new ArrayList();
        this.queueConsumer = (QueueConsumer) Objects.requireNonNull(queueConsumer);
        this.queueShard = (QueueShard) Objects.requireNonNull(queueShard);
        this.queueTaskPoller = (QueueTaskPoller) Objects.requireNonNull(queueTaskPoller);
        this.executor = (ExecutorService) Objects.requireNonNull(executorService);
        this.queueRunner = (QueueRunner) Objects.requireNonNull(queueRunner);
        this.queueLoopFactory = (Supplier) Objects.requireNonNull(supplier);
        queueConsumer.getQueueConfig().getSettings().getProcessingSettings().registerObserver((processingSettings, processingSettings2) -> {
            resizePool(processingSettings2.getThreadCount().intValue());
        });
    }

    private QueueId getQueueId() {
        return this.queueConsumer.getQueueConfig().getLocation().getQueueId();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public QueueShardId getQueueShardId() {
        return this.queueShard.getShardId();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        if (this.started || isShutdown()) {
            log.info("execution pool is already started or underlying executor is closed");
            return;
        }
        int intValue = this.queueConsumer.getQueueConfig().getSettings().getProcessingSettings().getThreadCount().intValue();
        log.info("starting queue: queueId={}, shardId={}, threadCount={}", new Object[]{getQueueId(), this.queueShard.getShardId(), Integer.valueOf(intValue)});
        for (int i = 0; i < intValue; i++) {
            startThread(true);
        }
        setupExecutor(intValue);
        this.started = true;
    }

    void resizePool(int i) {
        int size = this.queueWorkers.size();
        if (i == size) {
            return;
        }
        log.info("resizing queue execution pool: queueId={}, shardId={}, oldThreadCount={}, newThreadCount={}", new Object[]{this.queueConsumer.getQueueConfig().getLocation().getQueueId(), this.queueShard.getShardId(), Integer.valueOf(size), Integer.valueOf(i)});
        if (i > size) {
            for (int i2 = size; i2 < i; i2++) {
                startThread(!isPaused());
            }
        } else {
            for (int i3 = size; i3 > i; i3--) {
                disposeThread();
            }
        }
        setupExecutor(i);
    }

    private void setupExecutor(int i) {
        if (this.executor instanceof ThreadPoolExecutor) {
            ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) this.executor;
            threadPoolExecutor.setCorePoolSize(i);
            threadPoolExecutor.allowCoreThreadTimeOut(true);
            threadPoolExecutor.purge();
        }
    }

    private void startThread(boolean z) {
        QueueLoop queueLoop = this.queueLoopFactory.get();
        Future<?> submit = this.executor.submit(() -> {
            this.queueTaskPoller.start(queueLoop, this.queueShard.getShardId(), this.queueConsumer, this.queueRunner);
        });
        if (z) {
            queueLoop.unpause();
        }
        this.queueWorkers.add(new QueueWorker(submit, queueLoop));
    }

    private void disposeThread() {
        this.queueWorkers.get(this.queueWorkers.size() - 1).getFuture().cancel(true);
        this.queueWorkers.remove(this.queueWorkers.size() - 1);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown() {
        if (!this.started || isShutdown()) {
            log.info("execution pool is already stopped or underlying executor is closed");
            return;
        }
        log.info("shutting down queue: queueId={}, shardId={}", getQueueId(), this.queueShard.getShardId());
        resizePool(0);
        this.executor.shutdownNow();
        this.started = false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void pause() {
        log.info("pausing queue: queueId={}, shardId={}", getQueueId(), this.queueShard.getShardId());
        this.queueWorkers.forEach(queueWorker -> {
            queueWorker.getLoop().pause();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void unpause() {
        log.info("unpausing queue: queueId={}, shardId={}", getQueueId(), this.queueShard.getShardId());
        this.queueWorkers.forEach(queueWorker -> {
            queueWorker.getLoop().unpause();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isPaused() {
        return this.queueWorkers.stream().allMatch(queueWorker -> {
            return queueWorker.getLoop().isPaused();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isShutdown() {
        return this.executor.isShutdown();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isTerminated() {
        return this.executor.isTerminated();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean awaitTermination(@Nonnull Duration duration) {
        Objects.requireNonNull(duration, "timeout");
        log.info("awaiting queue termination: queueId={}, shardId={}, timeout={}", new Object[]{getQueueId(), this.queueShard.getShardId(), duration});
        try {
            return this.executor.awaitTermination(duration.getSeconds(), TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void wakeup() {
        this.queueWorkers.forEach(queueWorker -> {
            queueWorker.getLoop().doContinue();
        });
    }
}
