/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.management.domain.topic;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.domain.subscription.SubscriptionRepository;
import pl.allegro.tech.hermes.management.domain.auth.RequestUser;
import pl.allegro.tech.hermes.management.domain.topic.AssignmentsToSubscriptionsNotCompletedException;
import pl.allegro.tech.hermes.management.domain.topic.OffsetsNotAvailableException;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService;

@Component
public class TopicContentTypeMigrationService {
    private static final Logger logger = LoggerFactory.getLogger(TopicContentTypeMigrationService.class);
    private static final Duration CHECK_OFFSETS_AVAILABLE_TIMEOUT = Duration.ofSeconds(30L);
    private static final Duration INTERVAL_BETWEEN_OFFSETS_AVAILABLE_CHECK = Duration.ofMillis(500L);
    private static final Duration INTERVAL_BETWEEN_ASSIGNMENTS_COMPLETED_CHECK = Duration.ofMillis(500L);
    private final SubscriptionRepository subscriptionRepository;
    private final MultiDCAwareService multiDCAwareService;
    private final Clock clock;

    @Autowired
    public TopicContentTypeMigrationService(SubscriptionRepository subscriptionRepository, MultiDCAwareService multiDCAwareService, Clock clock) {
        this.subscriptionRepository = subscriptionRepository;
        this.multiDCAwareService = multiDCAwareService;
        this.clock = clock;
    }

    void notifySubscriptions(Topic topic, Instant beforeMigrationInstant, RequestUser requester) {
        this.waitUntilOffsetsAvailableOnAllKafkaTopics(topic, CHECK_OFFSETS_AVAILABLE_TIMEOUT);
        logger.info("Offsets available on all partitions of topic {}", (Object)topic.getQualifiedName());
        this.notSuspendedSubscriptionsForTopic(topic).map(Subscription::getName).forEach(sub -> this.notifySingleSubscription(topic, beforeMigrationInstant, (String)sub, requester));
    }

    void waitUntilAllSubscriptionsHasConsumersAssigned(Topic topic, Duration assignmentCompletedTimeout) {
        Instant abortAttemptsInstant = this.clock.instant().plus(assignmentCompletedTimeout);
        while (!this.allSubscriptionsHaveConsumersAssigned(topic)) {
            if (this.clock.instant().isAfter(abortAttemptsInstant)) {
                throw new AssignmentsToSubscriptionsNotCompletedException(topic);
            }
            this.sleep(INTERVAL_BETWEEN_ASSIGNMENTS_COMPLETED_CHECK);
        }
    }

    private void notifySingleSubscription(Topic topic, Instant beforeMigrationInstant, String subscriptionName, RequestUser requester) {
        this.multiDCAwareService.moveOffset(topic, subscriptionName, beforeMigrationInstant.toEpochMilli(), false, requester);
    }

    private void waitUntilOffsetsAvailableOnAllKafkaTopics(Topic topic, Duration offsetsAvailableTimeout) {
        Instant abortAttemptsInstant = this.clock.instant().plus(offsetsAvailableTimeout);
        while (!this.multiDCAwareService.areOffsetsAvailableOnAllKafkaTopics(topic)) {
            if (this.clock.instant().isAfter(abortAttemptsInstant)) {
                throw new OffsetsNotAvailableException(topic);
            }
            logger.info("Not all offsets related to hermes topic {} are available, will retry", (Object)topic.getQualifiedName());
            this.sleep(INTERVAL_BETWEEN_OFFSETS_AVAILABLE_CHECK);
        }
    }

    private void sleep(Duration sleepDuration) {
        try {
            Thread.sleep(sleepDuration.toMillis());
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private boolean allSubscriptionsHaveConsumersAssigned(Topic topic) {
        List<Subscription> notSuspendedSubscriptions = this.notSuspendedSubscriptionsForTopic(topic).collect(Collectors.toList());
        return this.multiDCAwareService.allSubscriptionsHaveConsumersAssigned(topic, notSuspendedSubscriptions);
    }

    private Stream<Subscription> notSuspendedSubscriptionsForTopic(Topic topic) {
        return this.subscriptionRepository.listSubscriptions(topic.getName()).stream().filter(sub -> Subscription.State.SUSPENDED != sub.getState());
    }
}

