/*
 * Decompiled with CFR 0.152.
 */
package ru.yoomoney.tech.dbqueue.config;

import java.time.Duration;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yoomoney.tech.dbqueue.api.QueueConsumer;
import ru.yoomoney.tech.dbqueue.config.QueueExecutionPool;
import ru.yoomoney.tech.dbqueue.config.QueueShard;
import ru.yoomoney.tech.dbqueue.config.QueueShardId;
import ru.yoomoney.tech.dbqueue.config.TaskLifecycleListener;
import ru.yoomoney.tech.dbqueue.config.ThreadLifecycleListener;
import ru.yoomoney.tech.dbqueue.internal.processing.MillisTimeProvider;
import ru.yoomoney.tech.dbqueue.internal.processing.TimeLimiter;
import ru.yoomoney.tech.dbqueue.settings.QueueConfig;
import ru.yoomoney.tech.dbqueue.settings.QueueId;
import ru.yoomoney.tech.dbqueue.settings.QueueSettings;

@ThreadSafe
public class QueueService {
    private static final Logger log = LoggerFactory.getLogger(QueueService.class);
    @Nonnull
    private final Map<QueueId, Map<QueueShardId, QueueExecutionPool>> registeredQueues = new LinkedHashMap<QueueId, Map<QueueShardId, QueueExecutionPool>>();
    @Nonnull
    private final Map<QueueId, QueueConsumer<?>> registeredConsumer = new LinkedHashMap();
    @Nonnull
    private final List<QueueShard<?>> queueShards;
    @Nonnull
    private final BiFunction<QueueShard<?>, QueueConsumer<?>, QueueExecutionPool> queueExecutionPoolFactory;

    public QueueService(@Nonnull List<QueueShard<?>> queueShards, @Nonnull ThreadLifecycleListener threadLifecycleListener, @Nonnull TaskLifecycleListener taskLifecycleListener) {
        this(queueShards, (shard, consumer) -> new QueueExecutionPool((QueueConsumer<?>)consumer, (QueueShard<?>)shard, taskLifecycleListener, threadLifecycleListener));
    }

    QueueService(@Nonnull List<QueueShard<?>> queueShards, @Nonnull BiFunction<QueueShard<?>, QueueConsumer<?>, QueueExecutionPool> queueExecutionPoolFactory) {
        this.queueShards = Objects.requireNonNull(queueShards, "queueShards");
        this.queueExecutionPoolFactory = Objects.requireNonNull(queueExecutionPoolFactory, "queueExecutionPoolFactory");
    }

    private Map<QueueShardId, QueueExecutionPool> getQueuePools(@Nonnull QueueId queueId, @Nonnull String method) {
        Objects.requireNonNull(queueId, "queueId");
        Objects.requireNonNull(method, "method");
        if (!this.registeredQueues.containsKey(queueId)) {
            throw new IllegalArgumentException("cannot invoke " + method + ", queue is not registered: queueId=" + queueId);
        }
        return this.registeredQueues.get(queueId);
    }

    public synchronized <PayloadT> boolean registerQueue(@Nonnull QueueConsumer<PayloadT> consumer) {
        Objects.requireNonNull(consumer);
        QueueId queueId = consumer.getQueueConfig().getLocation().getQueueId();
        if (this.registeredQueues.containsKey(queueId)) {
            log.info("queue is already registered: queueId={}", (Object)queueId);
            return false;
        }
        LinkedHashMap queueShardPools = new LinkedHashMap();
        this.queueShards.forEach(shard -> queueShardPools.put(shard.getShardId(), this.queueExecutionPoolFactory.apply((QueueShard<?>)shard, consumer)));
        this.registeredQueues.put(queueId, queueShardPools);
        this.registeredConsumer.put(queueId, consumer);
        return true;
    }

    public synchronized Map<QueueId, String> updateQueueConfigs(@Nonnull Collection<QueueConfig> configs) {
        Objects.requireNonNull(configs);
        LinkedHashMap<QueueId, String> resultDiff = new LinkedHashMap<QueueId, String>();
        configs.forEach(newConfig -> {
            if (!this.registeredConsumer.containsKey(newConfig.getLocation().getQueueId())) {
                throw new IllegalArgumentException("cannot update queue configuration, queue is not registered: queueId=" + newConfig.getLocation().getQueueId());
            }
            StringJoiner queueDiff = new StringJoiner(",");
            QueueSettings actualSettings = this.registeredConsumer.get(newConfig.getLocation().getQueueId()).getQueueConfig().getSettings();
            QueueSettings newSettings = newConfig.getSettings();
            actualSettings.getProcessingSettings().setValue(newSettings.getProcessingSettings()).ifPresent(queueDiff::add);
            actualSettings.getPollSettings().setValue(newSettings.getPollSettings()).ifPresent(queueDiff::add);
            actualSettings.getFailureSettings().setValue(newSettings.getFailureSettings()).ifPresent(queueDiff::add);
            actualSettings.getReenqueueSettings().setValue(newSettings.getReenqueueSettings()).ifPresent(queueDiff::add);
            actualSettings.getExtSettings().setValue(newSettings.getExtSettings()).ifPresent(queueDiff::add);
            if (!queueDiff.toString().isEmpty()) {
                resultDiff.put(newConfig.getLocation().getQueueId(), queueDiff.toString());
            }
        });
        return resultDiff;
    }

    public synchronized void start() {
        log.info("starting all queues");
        this.registeredQueues.keySet().forEach(this::start);
    }

    public synchronized void start(@Nonnull QueueId queueId) {
        Objects.requireNonNull(queueId, "queueId");
        log.info("starting queue: queueId={}", (Object)queueId);
        this.getQueuePools(queueId, "start").values().forEach(QueueExecutionPool::start);
    }

    public synchronized void shutdown() {
        log.info("shutting down all queues");
        this.registeredQueues.keySet().forEach(this::shutdown);
    }

    public synchronized void shutdown(@Nonnull QueueId queueId) {
        Objects.requireNonNull(queueId, "queueId");
        log.info("shutting down queue: queueId={}", (Object)queueId);
        this.getQueuePools(queueId, "shutdown").values().forEach(QueueExecutionPool::shutdown);
    }

    public synchronized boolean isShutdown(@Nonnull QueueId queueId) {
        Objects.requireNonNull(queueId, "queueId");
        return this.getQueuePools(queueId, "isShutdown").values().stream().allMatch(QueueExecutionPool::isShutdown);
    }

    public synchronized boolean isShutdown() {
        return this.registeredQueues.keySet().stream().allMatch(this::isShutdown);
    }

    public synchronized boolean isTerminated(@Nonnull QueueId queueId) {
        Objects.requireNonNull(queueId, "queueId");
        return this.getQueuePools(queueId, "isTerminated").values().stream().allMatch(QueueExecutionPool::isTerminated);
    }

    public synchronized boolean isTerminated() {
        return this.registeredQueues.keySet().stream().allMatch(this::isTerminated);
    }

    public synchronized void pause(@Nonnull QueueId queueId) {
        Objects.requireNonNull(queueId, "queueId");
        log.info("pausing queue: queueId={}", (Object)queueId);
        this.getQueuePools(queueId, "pause").values().forEach(QueueExecutionPool::pause);
    }

    public synchronized void pause() {
        log.info("pausing all queues");
        this.registeredQueues.keySet().forEach(this::pause);
    }

    public synchronized void unpause(@Nonnull QueueId queueId) {
        Objects.requireNonNull(queueId, "queueId");
        log.info("unpausing queue: queueId={}", (Object)queueId);
        this.getQueuePools(queueId, "unpause").values().forEach(QueueExecutionPool::unpause);
    }

    public synchronized void unpause() {
        log.info("unpausing all queues");
        this.registeredQueues.keySet().forEach(this::unpause);
    }

    public synchronized boolean isPaused() {
        return this.registeredQueues.keySet().stream().allMatch(this::isPaused);
    }

    public synchronized boolean isPaused(@Nonnull QueueId queueId) {
        Objects.requireNonNull(queueId, "queueId");
        return this.getQueuePools(queueId, "isPaused").values().stream().allMatch(QueueExecutionPool::isPaused);
    }

    public synchronized List<QueueId> awaitTermination(@Nonnull Duration timeout) {
        Objects.requireNonNull(timeout, "timeout");
        log.info("awaiting all queues termination: timeout={}", (Object)timeout);
        TimeLimiter timeLimiter = new TimeLimiter(new MillisTimeProvider.SystemMillisTimeProvider(), timeout);
        this.registeredQueues.keySet().forEach(queueId -> timeLimiter.execute(remainingTimeout -> this.awaitTermination((QueueId)queueId, (Duration)remainingTimeout)));
        return this.registeredQueues.keySet().stream().filter(queueId -> !this.isTerminated((QueueId)queueId)).collect(Collectors.toList());
    }

    public synchronized List<QueueShardId> awaitTermination(@Nonnull QueueId queueId, @Nonnull Duration timeout) {
        Objects.requireNonNull(queueId, "queueId");
        Objects.requireNonNull(timeout, "timeout");
        log.info("awaiting queue termination: queueId={}, timeout={}", (Object)queueId, (Object)timeout);
        TimeLimiter timeLimiter = new TimeLimiter(new MillisTimeProvider.SystemMillisTimeProvider(), timeout);
        this.getQueuePools(queueId, "awaitTermination").values().forEach(queueExecutionPool -> timeLimiter.execute(queueExecutionPool::awaitTermination));
        return this.getQueuePools(queueId, "awaitTermination").values().stream().filter(queueExecutionPool -> !queueExecutionPool.isTerminated()).map(QueueExecutionPool::getQueueShardId).collect(Collectors.toList());
    }

    public synchronized void wakeup(@Nonnull QueueId queueId, @Nonnull QueueShardId queueShardId) {
        Objects.requireNonNull(queueId, "queueId");
        Objects.requireNonNull(queueShardId, "queueShardId");
        Map<QueueShardId, QueueExecutionPool> queuePools = this.getQueuePools(queueId, "wakeup");
        QueueExecutionPool queueExecutionPool = queuePools.get(queueShardId);
        if (queueExecutionPool == null) {
            throw new IllegalArgumentException("cannot wakeup, unknown shard: queueId=" + queueId + ", shardId=" + queueShardId);
        }
        queueExecutionPool.wakeup();
    }
}

