/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.StreamsKafkaClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InternalTopicManager {
    private static final Logger log = LoggerFactory.getLogger(InternalTopicManager.class);
    public static final String CLEANUP_POLICY_PROP = "cleanup.policy";
    public static final String RETENTION_MS = "retention.ms";
    public static final Long WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT = TimeUnit.MILLISECONDS.convert(1L, TimeUnit.DAYS);
    private static final int MAX_TOPIC_READY_TRY = 5;
    private final long windowChangeLogAdditionalRetention;
    private final int replicationFactor;
    private final StreamsKafkaClient streamsKafkaClient;

    public InternalTopicManager(StreamsKafkaClient streamsKafkaClient, int replicationFactor, long windowChangeLogAdditionalRetention) {
        this.streamsKafkaClient = streamsKafkaClient;
        this.replicationFactor = replicationFactor;
        this.windowChangeLogAdditionalRetention = windowChangeLogAdditionalRetention;
    }

    public void makeReady(Map<InternalTopicConfig, Integer> topics) {
        for (int i = 0; i < 5; ++i) {
            try {
                MetadataResponse metadata = this.streamsKafkaClient.fetchMetadata();
                Map<String, Integer> existingTopicPartitions = this.fetchExistingPartitionCountByTopic(metadata);
                Map<InternalTopicConfig, Integer> topicsToBeCreated = this.validateTopicPartitions(topics, existingTopicPartitions);
                this.streamsKafkaClient.createTopics(topicsToBeCreated, this.replicationFactor, this.windowChangeLogAdditionalRetention, metadata);
                return;
            }
            catch (StreamsException ex) {
                log.warn("Could not create internal topics: " + ex.getMessage() + " Retry #" + i);
                continue;
            }
        }
        throw new StreamsException("Could not create internal topics.");
    }

    public Map<String, Integer> getNumPartitions(Set<String> topics) {
        MetadataResponse metadata = this.streamsKafkaClient.fetchMetadata();
        Map<String, Integer> existingTopicPartitions = this.fetchExistingPartitionCountByTopic(metadata);
        existingTopicPartitions.keySet().retainAll(topics);
        return existingTopicPartitions;
    }

    public void close() {
        try {
            this.streamsKafkaClient.close();
        }
        catch (IOException e) {
            log.warn("Could not close StreamsKafkaClient.");
        }
    }

    private Map<InternalTopicConfig, Integer> validateTopicPartitions(Map<InternalTopicConfig, Integer> topicsPartitionsMap, Map<String, Integer> existingTopicNamesPartitions) {
        HashMap<InternalTopicConfig, Integer> topicsToBeCreated = new HashMap<InternalTopicConfig, Integer>();
        for (InternalTopicConfig topic : topicsPartitionsMap.keySet()) {
            if (existingTopicNamesPartitions.containsKey(topic.name())) {
                if (existingTopicNamesPartitions.get(topic.name()).equals(topicsPartitionsMap.get(topic))) continue;
                throw new StreamsException("Existing internal topic " + topic.name() + " has invalid partitions." + " Expected: " + topicsPartitionsMap.get(topic) + " Actual: " + existingTopicNamesPartitions.get(topic.name()) + ". Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.");
            }
            topicsToBeCreated.put(topic, topicsPartitionsMap.get(topic));
        }
        return topicsToBeCreated;
    }

    private Map<String, Integer> fetchExistingPartitionCountByTopic(MetadataResponse metadata) {
        HashMap<String, Integer> existingPartitionCountByTopic = new HashMap<String, Integer>();
        Collection topicsMetadata = metadata.topicMetadata();
        for (MetadataResponse.TopicMetadata topicMetadata : topicsMetadata) {
            existingPartitionCountByTopic.put(topicMetadata.topic(), topicMetadata.partitionMetadata().size());
        }
        return existingPartitionCountByTopic;
    }
}

