package pl.allegro.tech.hermes.management.infrastructure.kafka.service;

import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.ConsumerGroupId;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.management.config.kafka.KafkaProperties;
import pl.allegro.tech.hermes.management.domain.subscription.ConsumerGroupManager;

/* loaded from: input_file:pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaConsumerGroupManager.class */
public class KafkaConsumerGroupManager implements ConsumerGroupManager {
    private final Logger logger = LoggerFactory.getLogger(KafkaConsumerGroupManager.class);
    private final KafkaNamesMapper kafkaNamesMapper;
    private final String clusterName;
    private final String bootstrapKafkaServer;
    private final KafkaProperties kafkaProperties;

    public KafkaConsumerGroupManager(KafkaNamesMapper kafkaNamesMapper, String str, String str2, KafkaProperties kafkaProperties) {
        this.kafkaNamesMapper = kafkaNamesMapper;
        this.clusterName = str;
        this.bootstrapKafkaServer = str2;
        this.kafkaProperties = kafkaProperties;
    }

    @Override // pl.allegro.tech.hermes.management.domain.subscription.ConsumerGroupManager
    public void createConsumerGroup(Topic topic, Subscription subscription) {
        this.logger.info("Creating consumer group for subscription {}, cluster: {}", subscription.getQualifiedName(), this.clusterName);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties(this.kafkaNamesMapper.toConsumerGroupId(subscription.getQualifiedName())));
        try {
            Set set = (Set) kafkaConsumer.partitionsFor(this.kafkaNamesMapper.toKafkaTopics(topic).getPrimary().name().asString()).stream().map(partitionInfo -> {
                return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
            }).collect(Collectors.toSet());
            this.logger.info("Received partitions: {}, cluster: {}", set, this.clusterName);
            kafkaConsumer.assign(set);
            kafkaConsumer.commitSync((Map) set.stream().map(topicPartition -> {
                return ImmutablePair.of(topicPartition, new OffsetAndMetadata(kafkaConsumer.position(topicPartition)));
            }).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            })));
            kafkaConsumer.close();
            this.logger.info("Successfully created consumer group for subscription {}, cluster: {}", subscription.getQualifiedName(), this.clusterName);
        } catch (Exception e) {
            this.logger.error("Failed to create consumer group for subscription {}, cluster: {}", new Object[]{subscription.getQualifiedName(), this.clusterName, e});
        }
    }

    private Properties properties(ConsumerGroupId consumerGroupId) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.bootstrapKafkaServer);
        properties.put("group.id", consumerGroupId.asString());
        properties.put("enable.auto.commit", false);
        properties.put("request.timeout.ms", 5000);
        properties.put("default.api.timeout.ms", 5000);
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        if (this.kafkaProperties.getSaslConfig().isEnabled()) {
            properties.put("sasl.mechanism", this.kafkaProperties.getSaslConfig().getMechanism());
            properties.put("security.protocol", this.kafkaProperties.getSaslConfig().getProtocol());
            properties.put("sasl.jaas.config", this.kafkaProperties.getSaslConfig().getJaasConfig());
        }
        return properties;
    }
}
