package pl.allegro.tech.hermes.management.infrastructure.kafka.service;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.KafkaTopic;
import pl.allegro.tech.hermes.common.kafka.KafkaTopics;
import pl.allegro.tech.hermes.management.config.TopicProperties;
import pl.allegro.tech.hermes.management.domain.topic.BrokerTopicManagement;
import pl.allegro.tech.hermes.management.infrastructure.kafka.BrokersClusterCommunicationException;

/* loaded from: input_file:pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaBrokerTopicManagement.class */
public class KafkaBrokerTopicManagement implements BrokerTopicManagement {
    private final TopicProperties topicProperties;
    private final AdminClient kafkaAdminClient;
    private final KafkaNamesMapper kafkaNamesMapper;
    private final String datacenterName;
    private static final Logger logger = LoggerFactory.getLogger(KafkaBrokerTopicManagement.class);

    public KafkaBrokerTopicManagement(TopicProperties topicProperties, AdminClient adminClient, KafkaNamesMapper kafkaNamesMapper, String str) {
        this.topicProperties = topicProperties;
        this.kafkaAdminClient = adminClient;
        this.kafkaNamesMapper = kafkaNamesMapper;
        this.datacenterName = str;
    }

    @Override // pl.allegro.tech.hermes.management.domain.topic.BrokerTopicManagement
    public void createTopic(Topic topic) {
        Map<String, String> createTopicConfig = createTopicConfig(topic.getRetentionTime().getDurationInMillis(), this.topicProperties);
        this.kafkaNamesMapper.toKafkaTopics(topic).stream().map(kafkaTopic -> {
            return this.kafkaAdminClient.createTopics(Collections.singletonList(new NewTopic(kafkaTopic.name().asString(), this.topicProperties.getPartitions(), (short) this.topicProperties.getReplicationFactor()).configs(createTopicConfig)));
        }).map((v0) -> {
            return v0.all();
        }).forEach(this::waitForKafkaFuture);
    }

    @Override // pl.allegro.tech.hermes.management.domain.topic.BrokerTopicManagement
    public void removeTopic(Topic topic) {
        this.kafkaNamesMapper.toKafkaTopics(topic).stream().map(kafkaTopic -> {
            return this.kafkaAdminClient.deleteTopics(Collections.singletonList(kafkaTopic.name().asString()));
        }).map((v0) -> {
            return v0.all();
        }).forEach(kafkaFuture -> {
            logger.info("Removing topic: {} from Kafka dc: {}", topic, this.datacenterName);
            long currentTimeMillis = System.currentTimeMillis();
            waitForKafkaFuture(kafkaFuture);
            logger.info("Removed topic: {} from Kafka dc: {} in {} ms", new Object[]{topic, this.datacenterName, Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
        });
    }

    @Override // pl.allegro.tech.hermes.management.domain.topic.BrokerTopicManagement
    public void updateTopic(Topic topic) {
        Map<String, String> createTopicConfig = createTopicConfig(topic.getRetentionTime().getDurationInMillis(), this.topicProperties);
        KafkaTopics kafkaTopics = this.kafkaNamesMapper.toKafkaTopics(topic);
        if (isMigrationToNewKafkaTopic(kafkaTopics)) {
            waitForKafkaFuture(this.kafkaAdminClient.createTopics(Collections.singletonList(new NewTopic(kafkaTopics.getPrimary().name().asString(), this.topicProperties.getPartitions(), (short) this.topicProperties.getReplicationFactor()).configs(createTopicConfig))).all());
        } else {
            doUpdateTopic(kafkaTopics.getPrimary(), createTopicConfig);
        }
        kafkaTopics.getSecondary().ifPresent(kafkaTopic -> {
            doUpdateTopic(kafkaTopic, createTopicConfig);
        });
    }

    @Override // pl.allegro.tech.hermes.management.domain.topic.BrokerTopicManagement
    public boolean topicExists(Topic topic) {
        return this.kafkaNamesMapper.toKafkaTopics(topic).allMatch(this::doesTopicExist);
    }

    private boolean isMigrationToNewKafkaTopic(KafkaTopics kafkaTopics) {
        return kafkaTopics.getSecondary().isPresent() && !doesTopicExist(kafkaTopics.getPrimary());
    }

    private boolean doesTopicExist(KafkaTopic kafkaTopic) {
        return ((Boolean) waitForKafkaFuture(this.kafkaAdminClient.listTopics().names().thenApply(set -> {
            return Boolean.valueOf(set.contains(kafkaTopic.name().asString()));
        }))).booleanValue();
    }

    private void doUpdateTopic(KafkaTopic kafkaTopic, Map<String, String> map) {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, kafkaTopic.name().asString());
        Collection collection = (Collection) map.entrySet().stream().map(entry -> {
            return new ConfigEntry((String) entry.getKey(), (String) entry.getValue());
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(configResource, new Config(collection));
        waitForKafkaFuture(this.kafkaAdminClient.alterConfigs(hashMap).all());
    }

    private Map<String, String> createTopicConfig(long j, TopicProperties topicProperties) {
        HashMap hashMap = new HashMap();
        hashMap.put("retention.ms", String.valueOf(j));
        hashMap.put("unclean.leader.election.enable", Boolean.toString(topicProperties.isUncleanLeaderElectionEnabled()));
        hashMap.put("max.message.bytes", String.valueOf(topicProperties.getMaxMessageSize()));
        return hashMap;
    }

    private <T> T waitForKafkaFuture(KafkaFuture<T> kafkaFuture) {
        try {
            return (T) kafkaFuture.get();
        } catch (InterruptedException | ExecutionException e) {
            throw new BrokersClusterCommunicationException(e);
        }
    }
}
