package pl.allegro.tech.hermes.management.infrastructure.kafka.service;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.KafkaTopics;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.management.domain.message.RetransmissionService;
import pl.allegro.tech.hermes.management.domain.topic.BrokerTopicManagement;
import pl.allegro.tech.hermes.management.domain.topic.SingleMessageReader;

/* loaded from: input_file:pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.class */
public class BrokersClusterService {
    private static final Logger logger = LoggerFactory.getLogger(BrokersClusterService.class);
    private final String clusterName;
    private final SingleMessageReader singleMessageReader;
    private final RetransmissionService retransmissionService;
    private final BrokerTopicManagement brokerTopicManagement;
    private final KafkaNamesMapper kafkaNamesMapper;
    private final OffsetsAvailableChecker offsetsAvailableChecker;
    private final AdminClient adminClient;

    public BrokersClusterService(String str, SingleMessageReader singleMessageReader, RetransmissionService retransmissionService, BrokerTopicManagement brokerTopicManagement, KafkaNamesMapper kafkaNamesMapper, OffsetsAvailableChecker offsetsAvailableChecker, AdminClient adminClient) {
        this.clusterName = str;
        this.singleMessageReader = singleMessageReader;
        this.retransmissionService = retransmissionService;
        this.brokerTopicManagement = brokerTopicManagement;
        this.kafkaNamesMapper = kafkaNamesMapper;
        this.offsetsAvailableChecker = offsetsAvailableChecker;
        this.adminClient = adminClient;
    }

    public String getClusterName() {
        return this.clusterName;
    }

    public void manageTopic(Consumer<BrokerTopicManagement> consumer) {
        consumer.accept(this.brokerTopicManagement);
    }

    public String readMessageFromPrimary(Topic topic, Integer num, Long l) {
        return this.singleMessageReader.readMessageAsJson(topic, this.kafkaNamesMapper.toKafkaTopics(topic).getPrimary(), num.intValue(), l.longValue());
    }

    public List<PartitionOffset> indicateOffsetChange(Topic topic, String str, Long l, boolean z) {
        return this.retransmissionService.indicateOffsetChange(topic, str, this.clusterName, l.longValue(), z);
    }

    public boolean areOffsetsAvailableOnAllKafkaTopics(Topic topic) {
        KafkaTopics kafkaTopics = this.kafkaNamesMapper.toKafkaTopics(topic);
        OffsetsAvailableChecker offsetsAvailableChecker = this.offsetsAvailableChecker;
        offsetsAvailableChecker.getClass();
        return kafkaTopics.allMatch(offsetsAvailableChecker::areOffsetsAvailable);
    }

    public boolean topicExists(Topic topic) {
        return this.brokerTopicManagement.topicExists(topic);
    }

    public boolean areOffsetsMoved(Topic topic, String str) {
        return this.retransmissionService.areOffsetsMoved(topic, str, this.clusterName);
    }

    public boolean allSubscriptionsHaveConsumersAssigned(Topic topic, List<Subscription> list) {
        try {
            return numberOfAssignmentsForConsumersGroups((List) list.stream().map(subscription -> {
                return this.kafkaNamesMapper.toConsumerGroupId(subscription.getQualifiedName()).asString();
            }).collect(Collectors.toList())) == numberOfPartitionsForTopic(topic) * list.size();
        } catch (Exception e) {
            logger.error("Failed to check assignments for topic " + topic.getQualifiedName() + " subscriptions", e);
            return false;
        }
    }

    private int numberOfAssignmentsForConsumersGroups(List<String> list) throws ExecutionException, InterruptedException {
        return ((List) ((Map) this.adminClient.describeConsumerGroups(list).all().get()).values().stream().flatMap(consumerGroupDescription -> {
            return consumerGroupDescription.members().stream();
        }).flatMap(memberDescription -> {
            return memberDescription.assignment().topicPartitions().stream();
        }).collect(Collectors.toList())).size();
    }

    private int numberOfPartitionsForTopic(Topic topic) throws ExecutionException, InterruptedException {
        return ((Integer) ((Map) this.adminClient.describeTopics((List) this.kafkaNamesMapper.toKafkaTopics(topic).stream().map(kafkaTopic -> {
            return kafkaTopic.name().asString();
        }).collect(Collectors.toList())).all().get()).values().stream().map(topicDescription -> {
            return Integer.valueOf(topicDescription.partitions().size());
        }).reduce(0, (v0, v1) -> {
            return Integer.sum(v0, v1);
        })).intValue();
    }
}
