/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.management.infrastructure.kafka.service;

import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.ConsumerGroupId;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.management.config.kafka.KafkaProperties;
import pl.allegro.tech.hermes.management.domain.subscription.ConsumerGroupManager;

public class KafkaConsumerGroupManager
implements ConsumerGroupManager {
    private final Logger logger = LoggerFactory.getLogger(KafkaConsumerGroupManager.class);
    private final KafkaNamesMapper kafkaNamesMapper;
    private final String clusterName;
    private final String bootstrapKafkaServer;
    private final KafkaProperties kafkaProperties;

    public KafkaConsumerGroupManager(KafkaNamesMapper kafkaNamesMapper, String clusterName, String bootstrapKafkaServer, KafkaProperties kafkaProperties) {
        this.kafkaNamesMapper = kafkaNamesMapper;
        this.clusterName = clusterName;
        this.bootstrapKafkaServer = bootstrapKafkaServer;
        this.kafkaProperties = kafkaProperties;
    }

    @Override
    public void createConsumerGroup(Topic topic, Subscription subscription) {
        this.logger.info("Creating consumer group for subscription {}, cluster: {}", (Object)subscription.getQualifiedName(), (Object)this.clusterName);
        ConsumerGroupId groupId = this.kafkaNamesMapper.toConsumerGroupId(subscription.getQualifiedName());
        KafkaConsumer kafkaConsumer = new KafkaConsumer(this.properties(groupId));
        try {
            String kafkaTopicName = this.kafkaNamesMapper.toKafkaTopics(topic).getPrimary().name().asString();
            Set topicPartitions = kafkaConsumer.partitionsFor(kafkaTopicName).stream().map(info -> new TopicPartition(info.topic(), info.partition())).collect(Collectors.toSet());
            this.logger.info("Received partitions: {}, cluster: {}", topicPartitions, (Object)this.clusterName);
            kafkaConsumer.assign(topicPartitions);
            Map<TopicPartition, OffsetAndMetadata> topicPartitionByOffset = topicPartitions.stream().map(topicPartition -> {
                long offset = kafkaConsumer.position(topicPartition);
                return ImmutablePair.of((Object)topicPartition, (Object)new OffsetAndMetadata(offset));
            }).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
            kafkaConsumer.commitSync(topicPartitionByOffset);
            kafkaConsumer.close();
            this.logger.info("Successfully created consumer group for subscription {}, cluster: {}", (Object)subscription.getQualifiedName(), (Object)this.clusterName);
        }
        catch (Exception e) {
            this.logger.error("Failed to create consumer group for subscription {}, cluster: {}", new Object[]{subscription.getQualifiedName(), this.clusterName, e});
        }
    }

    private Properties properties(ConsumerGroupId groupId) {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.bootstrapKafkaServer);
        props.put("group.id", groupId.asString());
        props.put("enable.auto.commit", (Object)false);
        props.put("request.timeout.ms", (Object)5000);
        props.put("default.api.timeout.ms", (Object)5000);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        if (this.kafkaProperties.getSasl().isEnabled()) {
            props.put("sasl.mechanism", this.kafkaProperties.getSasl().getMechanism());
            props.put("security.protocol", this.kafkaProperties.getSasl().getProtocol());
            props.put("sasl.jaas.config", this.kafkaProperties.getSasl().getJaasConfig());
        }
        return props;
    }
}

