/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.slf4j.Logger;

public class InternalTopicManager {
    private static final String INTERRUPTED_ERROR_MESSAGE = "Thread got interrupted. This indicates a bug. Please report at https://issues.apache.org/jira/projects/KAFKA or dev-mailing list (https://kafka.apache.org/contact).";
    private final Logger log;
    private final long windowChangeLogAdditionalRetention;
    private final Map<String, String> defaultTopicConfigs = new HashMap<String, String>();
    private final short replicationFactor;
    private final Admin adminClient;
    private final int retries;
    private final long retryBackOffMs;

    public InternalTopicManager(Admin adminClient, StreamsConfig streamsConfig) {
        this.adminClient = adminClient;
        LogContext logContext = new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName()));
        this.log = logContext.logger(this.getClass());
        this.replicationFactor = streamsConfig.getInt("replication.factor").shortValue();
        this.windowChangeLogAdditionalRetention = streamsConfig.getLong("windowstore.changelog.additional.retention.ms");
        ClientUtils.QuietAdminClientConfig adminConfigs = new ClientUtils.QuietAdminClientConfig(streamsConfig);
        this.retries = adminConfigs.getInt("retries");
        this.retryBackOffMs = adminConfigs.getLong("retry.backoff.ms");
        this.log.debug("Configs:" + Utils.NL + "\t{} = {}" + Utils.NL + "\t{} = {}" + Utils.NL + "\t{} = {}", new Object[]{"retries", this.retries, "replication.factor", this.replicationFactor, "windowstore.changelog.additional.retention.ms", this.windowChangeLogAdditionalRetention});
        for (Map.Entry entry : streamsConfig.originalsWithPrefix("topic.").entrySet()) {
            if (entry.getValue() == null) continue;
            this.defaultTopicConfigs.put((String)entry.getKey(), entry.getValue().toString());
        }
    }

    public Set<String> makeReady(Map<String, InternalTopicConfig> topics) {
        this.log.debug("Starting to validate internal topics {} in partition assignor.", topics);
        int remainingRetries = this.retries;
        Set<String> topicsNotReady = new HashSet<String>(topics.keySet());
        HashSet<String> newlyCreatedTopics = new HashSet<String>();
        while (!topicsNotReady.isEmpty() && remainingRetries >= 0) {
            topicsNotReady = this.validateTopics(topicsNotReady, topics);
            newlyCreatedTopics.addAll(topicsNotReady);
            if (!topicsNotReady.isEmpty()) {
                HashSet<NewTopic> newTopics = new HashSet<NewTopic>();
                for (String topicName : topicsNotReady) {
                    InternalTopicConfig internalTopicConfig = Objects.requireNonNull(topics.get(topicName));
                    Map<String, String> topicConfig = internalTopicConfig.getProperties(this.defaultTopicConfigs, this.windowChangeLogAdditionalRetention);
                    this.log.debug("Going to create topic {} with {} partitions and config {}.", new Object[]{internalTopicConfig.name(), internalTopicConfig.numberOfPartitions(), topicConfig});
                    newTopics.add(new NewTopic(internalTopicConfig.name(), internalTopicConfig.numberOfPartitions(), Optional.of(this.replicationFactor)).configs(topicConfig));
                }
                CreateTopicsResult createTopicsResult = this.adminClient.createTopics(newTopics);
                for (Map.Entry createTopicResult : createTopicsResult.values().entrySet()) {
                    String topicName = (String)createTopicResult.getKey();
                    try {
                        ((KafkaFuture)createTopicResult.getValue()).get();
                        topicsNotReady.remove(topicName);
                    }
                    catch (InterruptedException fatalException) {
                        Thread.currentThread().interrupt();
                        this.log.error(INTERRUPTED_ERROR_MESSAGE, (Throwable)fatalException);
                        throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException);
                    }
                    catch (ExecutionException executionException) {
                        Throwable cause = executionException.getCause();
                        if (cause instanceof TopicExistsException) {
                            this.log.info("Could not create topic {}. Topic is probably marked for deletion (number of partitions is unknown).\nWill retry to create this topic in {} ms (to let broker finish async delete operation first).\nError message was: {}", new Object[]{topicName, this.retryBackOffMs, cause.toString()});
                            continue;
                        }
                        this.log.error("Unexpected error during topic creation for {}.\nError message was: {}", (Object)topicName, (Object)cause.toString());
                        throw new StreamsException(String.format("Could not create topic %s.", topicName), cause);
                    }
                }
            }
            if (topicsNotReady.isEmpty()) continue;
            this.log.info("Topics {} can not be made ready with {} retries left", topicsNotReady, (Object)this.retries);
            Utils.sleep((long)this.retryBackOffMs);
            --remainingRetries;
        }
        if (!topicsNotReady.isEmpty()) {
            String timeoutAndRetryError = String.format("Could not create topics after %d retries. This can happen if the Kafka cluster is temporary not available. You can increase admin client config `retries` to be resilient against this error.", this.retries);
            this.log.error(timeoutAndRetryError);
            throw new StreamsException(timeoutAndRetryError);
        }
        this.log.debug("Completed validating internal topics and created {}", newlyCreatedTopics);
        return newlyCreatedTopics;
    }

    protected Map<String, Integer> getNumPartitions(Set<String> topics) {
        this.log.debug("Trying to check if topics {} have been created with expected number of partitions.", topics);
        DescribeTopicsResult describeTopicsResult = this.adminClient.describeTopics(topics);
        Map futures = describeTopicsResult.values();
        HashMap<String, Integer> existedTopicPartition = new HashMap<String, Integer>();
        for (Map.Entry topicFuture : futures.entrySet()) {
            String topicName = (String)topicFuture.getKey();
            try {
                TopicDescription topicDescription = (TopicDescription)((KafkaFuture)topicFuture.getValue()).get();
                existedTopicPartition.put((String)topicFuture.getKey(), topicDescription.partitions().size());
            }
            catch (InterruptedException fatalException) {
                Thread.currentThread().interrupt();
                this.log.error(INTERRUPTED_ERROR_MESSAGE, (Throwable)fatalException);
                throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException);
            }
            catch (ExecutionException couldNotDescribeTopicException) {
                Throwable cause = couldNotDescribeTopicException.getCause();
                if (cause instanceof UnknownTopicOrPartitionException || cause instanceof LeaderNotAvailableException) {
                    this.log.debug("Topic {} is unknown or not found, hence not existed yet: {}", (Object)topicName, (Object)cause.toString());
                    continue;
                }
                this.log.error("Unexpected error during topic description for {}.\nError message was: {}", (Object)topicName, (Object)cause.toString());
                throw new StreamsException(String.format("Could not create topic %s.", topicName), cause);
            }
        }
        return existedTopicPartition;
    }

    private Set<String> validateTopics(Set<String> topicsToValidate, Map<String, InternalTopicConfig> topicsMap) {
        if (!topicsMap.keySet().containsAll(topicsToValidate)) {
            throw new IllegalStateException("The topics map " + topicsMap.keySet() + " does not contain all the topics " + topicsToValidate + " trying to validate.");
        }
        Map<String, Integer> existedTopicPartition = this.getNumPartitions(topicsToValidate);
        HashSet<String> topicsToCreate = new HashSet<String>();
        for (String topicName : topicsToValidate) {
            Optional<Integer> numberOfPartitions = topicsMap.get(topicName).numberOfPartitions();
            if (!numberOfPartitions.isPresent()) {
                this.log.error("Found undefined number of partitions for topic {}", (Object)topicName);
                throw new StreamsException("Topic " + topicName + " number of partitions not defined");
            }
            if (existedTopicPartition.containsKey(topicName)) {
                if (existedTopicPartition.get(topicName).equals(numberOfPartitions.get())) continue;
                String errorMsg = String.format("Existing internal topic %s has invalid partitions: expected: %d; actual: %d. Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.", topicName, numberOfPartitions.get(), existedTopicPartition.get(topicName));
                this.log.error(errorMsg);
                throw new StreamsException(errorMsg);
            }
            topicsToCreate.add(topicName);
        }
        return topicsToCreate;
    }
}

