/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.management.infrastructure.kafka.service;

import java.util.Properties;
import java.util.concurrent.TimeUnit;
import kafka.admin.RackAwareMode;
import kafka.log.LogConfig;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.KafkaTopics;
import pl.allegro.tech.hermes.management.config.TopicProperties;
import pl.allegro.tech.hermes.management.domain.topic.BrokerTopicManagement;

public class KafkaBrokerTopicManagement
implements BrokerTopicManagement {
    private final TopicProperties topicProperties;
    private final AdminZkClient adminZkClient;
    private final KafkaZkClient kafkaZkClient;
    private final KafkaNamesMapper kafkaNamesMapper;

    public KafkaBrokerTopicManagement(TopicProperties topicProperties, AdminZkClient adminZkClient, KafkaZkClient kafkaZkClient, KafkaNamesMapper kafkaNamesMapper) {
        this.topicProperties = topicProperties;
        this.adminZkClient = adminZkClient;
        this.kafkaZkClient = kafkaZkClient;
        this.kafkaNamesMapper = kafkaNamesMapper;
    }

    @Override
    public void createTopic(Topic topic) {
        Properties config = this.createTopicConfig(topic.getRetentionTime().getDuration(), this.topicProperties);
        this.kafkaNamesMapper.toKafkaTopics(topic).forEach(k -> this.adminZkClient.createTopic(k.name().asString(), this.topicProperties.getPartitions(), this.topicProperties.getReplicationFactor(), config, (RackAwareMode)RackAwareMode.Enforced$.MODULE$));
    }

    @Override
    public void removeTopic(Topic topic) {
        this.kafkaNamesMapper.toKafkaTopics(topic).forEach(k -> this.adminZkClient.deleteTopic(k.name().asString()));
    }

    @Override
    public void updateTopic(Topic topic) {
        Properties config = this.createTopicConfig(topic.getRetentionTime().getDuration(), this.topicProperties);
        KafkaTopics kafkaTopics = this.kafkaNamesMapper.toKafkaTopics(topic);
        if (this.isMigrationToNewKafkaTopic(kafkaTopics)) {
            this.adminZkClient.createTopic(kafkaTopics.getPrimary().name().asString(), this.topicProperties.getPartitions(), this.topicProperties.getReplicationFactor(), config, (RackAwareMode)RackAwareMode.Enforced$.MODULE$);
        } else {
            this.adminZkClient.changeTopicConfig(kafkaTopics.getPrimary().name().asString(), config);
        }
        kafkaTopics.getSecondary().ifPresent(secondary -> this.adminZkClient.changeTopicConfig(secondary.name().asString(), config));
    }

    @Override
    public boolean topicExists(Topic topic) {
        return this.kafkaNamesMapper.toKafkaTopics(topic).allMatch(kafkaTopic -> this.kafkaZkClient.topicExists(kafkaTopic.name().asString()));
    }

    private boolean isMigrationToNewKafkaTopic(KafkaTopics kafkaTopics) {
        return kafkaTopics.getSecondary().isPresent() && !this.kafkaZkClient.topicExists(kafkaTopics.getPrimary().name().asString());
    }

    private Properties createTopicConfig(int retentionPolicy, TopicProperties topicProperties) {
        Properties props = new Properties();
        props.put(LogConfig.RetentionMsProp(), String.valueOf(TimeUnit.DAYS.toMillis(retentionPolicy)));
        props.put(LogConfig.UncleanLeaderElectionEnableProp(), Boolean.toString(topicProperties.isUncleanLeaderElectionEnabled()));
        props.put(LogConfig.MaxMessageBytesProp(), String.valueOf(topicProperties.getMaxMessageSize()));
        return props;
    }
}

