package org.springframework.cloud.stream.binder.kafka.provisioning;

import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.Callable;
import kafka.common.ErrorMapping;
import kafka.utils.ZkUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.security.JaasUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.cloud.stream.binder.BinderException;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.utils.KafkaTopicUtils;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisioner.class */
public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>>, InitializingBean {
    private final Log logger = LogFactory.getLog(getClass());
    private final KafkaBinderConfigurationProperties configurationProperties;
    private final AdminUtilsOperation adminUtilsOperation;
    private RetryOperations metadataRetryOperations;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisioner$KafkaConsumerDestination.class */
    public static final class KafkaConsumerDestination implements ConsumerDestination {
        private final String consumerDestinationName;
        private final int partitions;
        private final String dlqName;

        KafkaConsumerDestination(String str) {
            this(str, 0, null);
        }

        KafkaConsumerDestination(String str, int i) {
            this(str, Integer.valueOf(i), null);
        }

        KafkaConsumerDestination(String str, Integer num, String str2) {
            this.consumerDestinationName = str;
            this.partitions = num.intValue();
            this.dlqName = str2;
        }

        public String getName() {
            return this.consumerDestinationName;
        }

        public String toString() {
            return "KafkaConsumerDestination{consumerDestinationName='" + this.consumerDestinationName + "', partitions=" + this.partitions + ", dlqName='" + this.dlqName + "'}";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisioner$KafkaProducerDestination.class */
    public static final class KafkaProducerDestination implements ProducerDestination {
        private final String producerDestinationName;
        private final int partitions;

        KafkaProducerDestination(String str) {
            this(str, 0);
        }

        KafkaProducerDestination(String str, Integer num) {
            this.producerDestinationName = str;
            this.partitions = num.intValue();
        }

        public String getName() {
            return this.producerDestinationName;
        }

        public String getNameForPartition(int i) {
            return this.producerDestinationName;
        }

        public String toString() {
            return "KafkaProducerDestination{producerDestinationName='" + this.producerDestinationName + "', partitions=" + this.partitions + '}';
        }
    }

    public KafkaTopicProvisioner(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties, AdminUtilsOperation adminUtilsOperation) {
        this.configurationProperties = kafkaBinderConfigurationProperties;
        this.adminUtilsOperation = adminUtilsOperation;
    }

    public void setMetadataRetryOperations(RetryOperations retryOperations) {
        this.metadataRetryOperations = retryOperations;
    }

    public void afterPropertiesSet() throws Exception {
        if (this.metadataRetryOperations == null) {
            RetryTemplate retryTemplate = new RetryTemplate();
            SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
            simpleRetryPolicy.setMaxAttempts(10);
            retryTemplate.setRetryPolicy(simpleRetryPolicy);
            ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
            exponentialBackOffPolicy.setInitialInterval(100L);
            exponentialBackOffPolicy.setMultiplier(2.0d);
            exponentialBackOffPolicy.setMaxInterval(1000L);
            retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);
            this.metadataRetryOperations = retryTemplate;
        }
    }

    public ProducerDestination provisionProducerDestination(String str, ExtendedProducerProperties<KafkaProducerProperties> extendedProducerProperties) {
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Using kafka topic for outbound: " + str);
        }
        KafkaTopicUtils.validateTopicName(str);
        createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(str, extendedProducerProperties.getPartitionCount(), false);
        if (!this.configurationProperties.isAutoCreateTopics() || this.adminUtilsOperation == null) {
            return new KafkaProducerDestination(str);
        }
        return new KafkaProducerDestination(str, Integer.valueOf(this.adminUtilsOperation.partitionSize(str, ZkUtils.apply(this.configurationProperties.getZkConnectionString(), this.configurationProperties.getZkSessionTimeout(), this.configurationProperties.getZkConnectionTimeout(), JaasUtils.isZkSecurityEnabled()))));
    }

    public ConsumerDestination provisionConsumerDestination(String str, String str2, ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        KafkaTopicUtils.validateTopicName(str);
        boolean z = !StringUtils.hasText(str2);
        Assert.isTrue((z && ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isEnableDlq()) ? false : true, "DLQ support is not available for anonymous subscriptions");
        if (extendedConsumerProperties.getInstanceCount() == 0) {
            throw new IllegalArgumentException("Instance count cannot be zero");
        }
        createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(str, extendedConsumerProperties.getInstanceCount() * extendedConsumerProperties.getConcurrency(), ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isAutoRebalanceEnabled());
        if (!this.configurationProperties.isAutoCreateTopics() || this.adminUtilsOperation == null) {
            return new KafkaConsumerDestination(str);
        }
        int partitionSize = this.adminUtilsOperation.partitionSize(str, ZkUtils.apply(this.configurationProperties.getZkConnectionString(), this.configurationProperties.getZkSessionTimeout(), this.configurationProperties.getZkConnectionTimeout(), JaasUtils.isZkSecurityEnabled()));
        if (!((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isEnableDlq() || z) {
            return new KafkaConsumerDestination(str, partitionSize);
        }
        String dlqName = StringUtils.hasText(((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getDlqName()) ? ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getDlqName() : "error." + str + "." + str2;
        createTopicAndPartitions(dlqName, partitionSize, ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isAutoRebalanceEnabled());
        return new KafkaConsumerDestination(str, Integer.valueOf(partitionSize), dlqName);
    }

    private void createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(String str, int i, boolean z) {
        if (this.configurationProperties.isAutoCreateTopics() && this.adminUtilsOperation != null) {
            createTopicAndPartitions(str, i, z);
            return;
        }
        if (this.configurationProperties.isAutoCreateTopics() && this.adminUtilsOperation == null) {
            this.logger.warn("Auto creation of topics is enabled, but Kafka AdminUtils class is not present on the classpath. No topic will be created by the binder");
        } else {
            if (this.configurationProperties.isAutoCreateTopics()) {
                return;
            }
            this.logger.info("Auto creation of topics is disabled.");
        }
    }

    private void createTopicAndPartitions(final String str, int i, boolean z) {
        final ZkUtils apply = ZkUtils.apply(this.configurationProperties.getZkConnectionString(), this.configurationProperties.getZkSessionTimeout(), this.configurationProperties.getZkConnectionTimeout(), JaasUtils.isZkSecurityEnabled());
        try {
            short errorCodeFromTopicMetadata = this.adminUtilsOperation.errorCodeFromTopicMetadata(str, apply);
            if (errorCodeFromTopicMetadata == ErrorMapping.NoError()) {
                int max = this.configurationProperties.isAutoAddPartitions() ? Math.max(this.configurationProperties.getMinPartitionCount(), i) : i;
                int partitionSize = this.adminUtilsOperation.partitionSize(str, apply);
                if (partitionSize < max) {
                    if (this.configurationProperties.isAutoAddPartitions()) {
                        this.adminUtilsOperation.invokeAddPartitions(apply, str, max, null, false);
                    } else {
                        if (!z) {
                            throw new ProvisioningException("The number of expected partitions was: " + i + ", but " + partitionSize + (partitionSize > 1 ? " have " : " has ") + "been found instead.Consider either increasing the partition count of the topic or enabling `autoAddPartitions`");
                        }
                        this.logger.warn("The number of expected partitions was: " + i + ", but " + partitionSize + (partitionSize > 1 ? " have " : " has ") + "been found instead.There will be " + (max - partitionSize) + " idle consumers");
                    }
                }
            } else {
                if (errorCodeFromTopicMetadata != ErrorMapping.UnknownTopicOrPartitionCode()) {
                    throw new ProvisioningException("Error fetching Kafka topic metadata: ", ErrorMapping.exceptionFor(errorCodeFromTopicMetadata));
                }
                final int max2 = Math.max(this.configurationProperties.getMinPartitionCount(), i);
                this.metadataRetryOperations.execute(new RetryCallback<Object, RuntimeException>() { // from class: org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.1
                    public Object doWithRetry(RetryContext retryContext) throws RuntimeException {
                        try {
                            KafkaTopicProvisioner.this.adminUtilsOperation.invokeCreateTopic(apply, str, max2, KafkaTopicProvisioner.this.configurationProperties.getReplicationFactor(), new Properties());
                            return null;
                        } catch (Exception e) {
                            String name = e.getClass().getName();
                            if (!name.equals("kafka.common.TopicExistsException") && !name.equals("org.apache.kafka.common.errors.TopicExistsException")) {
                                throw e;
                            }
                            if (!KafkaTopicProvisioner.this.logger.isWarnEnabled()) {
                                return null;
                            }
                            KafkaTopicProvisioner.this.logger.warn("Attempt to create topic: " + str + ". Topic already exists.");
                            return null;
                        }
                    }
                });
            }
        } finally {
            apply.close();
        }
    }

    public Collection<PartitionInfo> getPartitionsForTopic(final int i, final boolean z, final Callable<Collection<PartitionInfo>> callable) {
        try {
            return (Collection) this.metadataRetryOperations.execute(new RetryCallback<Collection<PartitionInfo>, Exception>() { // from class: org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.2
                /* renamed from: doWithRetry, reason: merged with bridge method [inline-methods] */
                public Collection<PartitionInfo> m5doWithRetry(RetryContext retryContext) throws Exception {
                    Collection<PartitionInfo> collection = (Collection) callable.call();
                    int size = collection.size();
                    if (size < i) {
                        if (!z) {
                            throw new IllegalStateException("The number of expected partitions was: " + i + ", but " + size + (size > 1 ? " have " : " has ") + "been found instead");
                        }
                        KafkaTopicProvisioner.this.logger.warn("The number of expected partitions was: " + i + ", but " + size + (size > 1 ? " have " : " has ") + "been found instead.There will be " + (i - size) + " idle consumers");
                    }
                    return collection;
                }
            });
        } catch (Exception e) {
            this.logger.error("Cannot initialize Binder", e);
            throw new BinderException("Cannot initialize binder:", e);
        }
    }
}
