package pl.allegro.tech.hermes.management.domain.topic;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.management.domain.subscription.SubscriptionService;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService;

@Component
/* loaded from: input_file:pl/allegro/tech/hermes/management/domain/topic/TopicContentTypeMigrationService.class */
public class TopicContentTypeMigrationService {
    private static final Logger logger = LoggerFactory.getLogger(TopicContentTypeMigrationService.class);
    public static final Duration CHECK_OFFSETS_AVAILABLE_TIMEOUT = Duration.ofSeconds(1);
    public static final Duration INTERVAL_BETWEEN_OFFSETS_AVAILABLE_CHECK_RETRIES = Duration.ofMillis(500);
    private final SubscriptionService subscriptionService;
    private final MultiDCAwareService multiDCAwareService;
    private final Clock clock;

    @Autowired
    public TopicContentTypeMigrationService(SubscriptionService subscriptionService, MultiDCAwareService multiDCAwareService, Clock clock) {
        this.subscriptionService = subscriptionService;
        this.multiDCAwareService = multiDCAwareService;
        this.clock = clock;
    }

    public void notifySubscriptions(Topic topic, Instant instant) {
        waitUntilOffsetsAvailableOnAllKafkaTopics(topic, CHECK_OFFSETS_AVAILABLE_TIMEOUT);
        this.subscriptionService.listSubscriptionNames(topic.getName()).forEach(str -> {
            notifySingleSubscription(topic, instant, str);
        });
    }

    private void notifySingleSubscription(Topic topic, Instant instant, String str) {
        this.multiDCAwareService.moveOffset(topic, str, Long.valueOf(instant.toEpochMilli()), false);
    }

    private void waitUntilOffsetsAvailableOnAllKafkaTopics(Topic topic, Duration duration) {
        Instant plus = this.clock.instant().plus((TemporalAmount) duration);
        while (!this.multiDCAwareService.areOffsetsAvailableOnAllKafkaTopics(topic)) {
            if (this.clock.instant().isAfter(plus)) {
                throw new OffsetsNotAvailableException(topic);
            }
            logger.debug("Not all offsets related to hermes topic {} are available, will retry", topic.getQualifiedName());
            sleep(INTERVAL_BETWEEN_OFFSETS_AVAILABLE_CHECK_RETRIES);
        }
    }

    private void sleep(Duration duration) {
        try {
            Thread.sleep(duration.toMillis());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}
