package pl.allegro.tech.hermes.management.infrastructure.kafka;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.ConsumerGroup;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.admin.AdminTool;
import pl.allegro.tech.hermes.common.exception.InternalProcessingException;
import pl.allegro.tech.hermes.management.domain.topic.BrokerTopicManagement;
import pl.allegro.tech.hermes.management.domain.topic.TopicContentTypeMigrationService;
import pl.allegro.tech.hermes.management.domain.topic.UnableToMoveOffsetsException;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService;

/* loaded from: input_file:pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.class */
public class MultiDCAwareService {
    private static final Logger logger = LoggerFactory.getLogger(TopicContentTypeMigrationService.class);
    private final List<BrokersClusterService> clusters;
    private final AdminTool adminTool;
    private final Clock clock;
    private final Duration intervalBetweenCheckingIfOffsetsMoved;
    private final Duration offsetsMovedTimeout;

    public MultiDCAwareService(List<BrokersClusterService> list, AdminTool adminTool, Clock clock, Duration duration, Duration duration2) {
        this.clusters = list;
        this.adminTool = adminTool;
        this.clock = clock;
        this.intervalBetweenCheckingIfOffsetsMoved = duration;
        this.offsetsMovedTimeout = duration2;
    }

    public void manageTopic(Consumer<BrokerTopicManagement> consumer) {
        this.clusters.forEach(brokersClusterService -> {
            brokersClusterService.manageTopic(consumer);
        });
    }

    public String readMessageFromPrimary(String str, Topic topic, Integer num, Long l) {
        return this.clusters.stream().filter(brokersClusterService -> {
            return str.equals(brokersClusterService.getClusterName());
        }).findFirst().orElseThrow(() -> {
            return new BrokersClusterNotFoundException(str);
        }).readMessageFromPrimary(topic, num, l);
    }

    public MultiDCOffsetChangeSummary moveOffset(Topic topic, String str, Long l, boolean z) {
        MultiDCOffsetChangeSummary multiDCOffsetChangeSummary = new MultiDCOffsetChangeSummary();
        this.clusters.forEach(brokersClusterService -> {
            multiDCOffsetChangeSummary.addPartitionOffsetList(brokersClusterService.getClusterName(), brokersClusterService.indicateOffsetChange(topic, str, l, z));
        });
        if (!z) {
            logger.info("Preparing retransmission for subscription {}", topic.getQualifiedName() + "$" + str);
            this.adminTool.retransmit(new SubscriptionName(str, topic.getName()));
            this.clusters.forEach(brokersClusterService2 -> {
                waitUntilOffsetsAreMoved(topic, str);
            });
        }
        return multiDCOffsetChangeSummary;
    }

    public boolean areOffsetsAvailableOnAllKafkaTopics(Topic topic) {
        return this.clusters.stream().allMatch(brokersClusterService -> {
            return brokersClusterService.areOffsetsAvailableOnAllKafkaTopics(topic);
        });
    }

    public boolean topicExists(Topic topic) {
        return this.clusters.stream().allMatch(brokersClusterService -> {
            return brokersClusterService.topicExists(topic);
        });
    }

    private void waitUntilOffsetsAreMoved(Topic topic, String str) {
        Instant plus = this.clock.instant().plus((TemporalAmount) this.offsetsMovedTimeout);
        while (!areOffsetsMoved(topic, str)) {
            if (this.clock.instant().isAfter(plus)) {
                logger.error("Not all offsets related to hermes subscription {}${} were moved.", topic.getQualifiedName(), str);
                throw new UnableToMoveOffsetsException(topic, str);
            }
            logger.debug("Not all offsets related to hermes subscription {} were moved, will retry", topic.getQualifiedName());
            sleep(this.intervalBetweenCheckingIfOffsetsMoved);
        }
    }

    private boolean areOffsetsMoved(Topic topic, String str) {
        return this.clusters.stream().allMatch(brokersClusterService -> {
            return brokersClusterService.areOffsetsMoved(topic, str);
        });
    }

    private void sleep(Duration duration) {
        try {
            Thread.sleep(duration.toMillis());
        } catch (InterruptedException e) {
            throw new InternalProcessingException(e);
        }
    }

    public boolean allSubscriptionsHaveConsumersAssigned(Topic topic, List<Subscription> list) {
        return this.clusters.stream().allMatch(brokersClusterService -> {
            return brokersClusterService.allSubscriptionsHaveConsumersAssigned(topic, list);
        });
    }

    public List<ConsumerGroup> describeConsumerGroups(TopicName topicName, String str) {
        return (List) this.clusters.stream().map(brokersClusterService -> {
            return brokersClusterService.describeConsumerGroup(new SubscriptionName(str, topicName));
        }).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).collect(Collectors.toList());
    }
}
