/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.management.infrastructure.kafka;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.ConsumerGroup;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.admin.AdminTool;
import pl.allegro.tech.hermes.common.exception.InternalProcessingException;
import pl.allegro.tech.hermes.management.domain.topic.BrokerTopicManagement;
import pl.allegro.tech.hermes.management.domain.topic.TopicContentTypeMigrationService;
import pl.allegro.tech.hermes.management.domain.topic.UnableToMoveOffsetsException;
import pl.allegro.tech.hermes.management.infrastructure.kafka.BrokersClusterNotFoundException;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCOffsetChangeSummary;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService;

public class MultiDCAwareService {
    private static final Logger logger = LoggerFactory.getLogger(TopicContentTypeMigrationService.class);
    private final List<BrokersClusterService> clusters;
    private final AdminTool adminTool;
    private final Clock clock;
    private final Duration intervalBetweenCheckingIfOffsetsMoved;
    private final Duration offsetsMovedTimeout;

    public MultiDCAwareService(List<BrokersClusterService> clusters, AdminTool adminTool, Clock clock, Duration intervalBetweenCheckingIfOffsetsMoved, Duration offsetsMovedTimeout) {
        this.clusters = clusters;
        this.adminTool = adminTool;
        this.clock = clock;
        this.intervalBetweenCheckingIfOffsetsMoved = intervalBetweenCheckingIfOffsetsMoved;
        this.offsetsMovedTimeout = offsetsMovedTimeout;
    }

    public void manageTopic(Consumer<BrokerTopicManagement> manageFunction) {
        this.clusters.forEach(kafkaService -> kafkaService.manageTopic(manageFunction));
    }

    public String readMessageFromPrimary(String clusterName, Topic topic, Integer partition, Long offset) {
        return this.clusters.stream().filter(cluster -> clusterName.equals(cluster.getClusterName())).findFirst().orElseThrow(() -> new BrokersClusterNotFoundException(clusterName)).readMessageFromPrimary(topic, partition, offset);
    }

    public MultiDCOffsetChangeSummary moveOffset(Topic topic, String subscriptionName, Long timestamp, boolean dryRun) {
        MultiDCOffsetChangeSummary multiDCOffsetChangeSummary = new MultiDCOffsetChangeSummary();
        this.clusters.forEach(cluster -> multiDCOffsetChangeSummary.addPartitionOffsetList(cluster.getClusterName(), cluster.indicateOffsetChange(topic, subscriptionName, timestamp, dryRun)));
        if (!dryRun) {
            logger.info("Preparing retransmission for subscription {}", (Object)(topic.getQualifiedName() + "$" + subscriptionName));
            this.adminTool.retransmit(new SubscriptionName(subscriptionName, topic.getName()));
            this.clusters.forEach(clusters -> this.waitUntilOffsetsAreMoved(topic, subscriptionName));
        }
        return multiDCOffsetChangeSummary;
    }

    public boolean areOffsetsAvailableOnAllKafkaTopics(Topic topic) {
        return this.clusters.stream().allMatch(cluster -> cluster.areOffsetsAvailableOnAllKafkaTopics(topic));
    }

    public boolean topicExists(Topic topic) {
        return this.clusters.stream().allMatch(brokersClusterService -> brokersClusterService.topicExists(topic));
    }

    private void waitUntilOffsetsAreMoved(Topic topic, String subscriptionName) {
        Instant abortAttemptsInstant = this.clock.instant().plus(this.offsetsMovedTimeout);
        while (!this.areOffsetsMoved(topic, subscriptionName)) {
            if (this.clock.instant().isAfter(abortAttemptsInstant)) {
                logger.error("Not all offsets related to hermes subscription {}${} were moved.", (Object)topic.getQualifiedName(), (Object)subscriptionName);
                throw new UnableToMoveOffsetsException(topic, subscriptionName);
            }
            logger.debug("Not all offsets related to hermes subscription {} were moved, will retry", (Object)topic.getQualifiedName());
            this.sleep(this.intervalBetweenCheckingIfOffsetsMoved);
        }
    }

    private boolean areOffsetsMoved(Topic topic, String subscriptionName) {
        return this.clusters.stream().allMatch(cluster -> cluster.areOffsetsMoved(topic, subscriptionName));
    }

    private void sleep(Duration sleepDuration) {
        try {
            Thread.sleep(sleepDuration.toMillis());
        }
        catch (InterruptedException e) {
            throw new InternalProcessingException((Throwable)e);
        }
    }

    public boolean allSubscriptionsHaveConsumersAssigned(Topic topic, List<Subscription> subscriptions) {
        return this.clusters.stream().allMatch(brokersClusterService -> brokersClusterService.allSubscriptionsHaveConsumersAssigned(topic, subscriptions));
    }

    public List<ConsumerGroup> describeConsumerGroups(TopicName topicName, String subscriptionName) {
        return this.clusters.stream().map(brokersClusterService -> brokersClusterService.describeConsumerGroup(new SubscriptionName(subscriptionName, topicName))).filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
    }
}

