package org.springframework.cloud.stream.binder.kafka;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import kafka.common.ErrorMapping;
import kafka.utils.ZkUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Utils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderException;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.context.Lifecycle;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.class */
public class KafkaMessageChannelBinder extends AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>, Collection<PartitionInfo>, String> implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties>, DisposableBean {
    private final KafkaBinderConfigurationProperties configurationProperties;
    private RetryOperations metadataRetryOperations;
    private final Map<String, Collection<PartitionInfo>> topicsInUse;
    private ProducerListener<byte[], byte[]> producerListener;
    private volatile Producer<byte[], byte[]> dlqProducer;
    private KafkaExtendedBindingProperties extendedBindingProperties;
    private AdminUtilsOperation adminUtilsOperation;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder$ProducerConfigurationMessageHandler.class */
    public final class ProducerConfigurationMessageHandler extends KafkaProducerMessageHandler<byte[], byte[]> implements Lifecycle {
        private boolean running;
        private final DefaultKafkaProducerFactory<byte[], byte[]> producerFactory;

        private ProducerConfigurationMessageHandler(KafkaTemplate<byte[], byte[]> kafkaTemplate, String str, ExtendedProducerProperties<KafkaProducerProperties> extendedProducerProperties, DefaultKafkaProducerFactory<byte[], byte[]> defaultKafkaProducerFactory) {
            super(kafkaTemplate);
            this.running = true;
            setTopicExpression(new LiteralExpression(str));
            setBeanFactory(KafkaMessageChannelBinder.this.getBeanFactory());
            if (extendedProducerProperties.isPartitioned()) {
                setPartitionIdExpression(new SpelExpressionParser().parseExpression("headers.partition"));
            }
            if (((KafkaProducerProperties) extendedProducerProperties.getExtension()).isSync()) {
                setSync(true);
            }
            this.producerFactory = defaultKafkaProducerFactory;
        }

        public void start() {
            try {
                super.onInit();
            } catch (Exception e) {
                this.logger.error("Initialization errors: ", e);
                throw new RuntimeException(e);
            }
        }

        public void stop() {
            this.producerFactory.stop();
            this.running = false;
        }

        public boolean isRunning() {
            return this.running;
        }
    }

    public KafkaMessageChannelBinder(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
        super(false, headersToMap(kafkaBinderConfigurationProperties));
        this.topicsInUse = new HashMap();
        this.extendedBindingProperties = new KafkaExtendedBindingProperties();
        this.configurationProperties = kafkaBinderConfigurationProperties;
    }

    private static String[] headersToMap(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
        String[] strArr;
        if (ObjectUtils.isEmpty(kafkaBinderConfigurationProperties.getHeaders())) {
            strArr = BinderHeaders.STANDARD_HEADERS;
        } else {
            String[] strArr2 = (String[]) Arrays.copyOfRange(BinderHeaders.STANDARD_HEADERS, 0, BinderHeaders.STANDARD_HEADERS.length + kafkaBinderConfigurationProperties.getHeaders().length);
            System.arraycopy(kafkaBinderConfigurationProperties.getHeaders(), 0, strArr2, BinderHeaders.STANDARD_HEADERS.length, kafkaBinderConfigurationProperties.getHeaders().length);
            strArr = strArr2;
        }
        return strArr;
    }

    public void setAdminUtilsOperation(AdminUtilsOperation adminUtilsOperation) {
        this.adminUtilsOperation = adminUtilsOperation;
    }

    public void setMetadataRetryOperations(RetryOperations retryOperations) {
        this.metadataRetryOperations = retryOperations;
    }

    public void setExtendedBindingProperties(KafkaExtendedBindingProperties kafkaExtendedBindingProperties) {
        this.extendedBindingProperties = kafkaExtendedBindingProperties;
    }

    public void onInit() throws Exception {
        if (this.metadataRetryOperations == null) {
            RetryTemplate retryTemplate = new RetryTemplate();
            SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
            simpleRetryPolicy.setMaxAttempts(10);
            retryTemplate.setRetryPolicy(simpleRetryPolicy);
            ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
            exponentialBackOffPolicy.setInitialInterval(100L);
            exponentialBackOffPolicy.setMultiplier(2.0d);
            exponentialBackOffPolicy.setMaxInterval(1000L);
            retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);
            this.metadataRetryOperations = retryTemplate;
        }
    }

    public void destroy() throws Exception {
        if (this.dlqProducer != null) {
            this.dlqProducer.close();
            this.dlqProducer = null;
        }
    }

    public void setProducerListener(ProducerListener<byte[], byte[]> producerListener) {
        this.producerListener = producerListener;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, Collection<PartitionInfo>> getTopicsInUse() {
        return this.topicsInUse;
    }

    /* renamed from: getExtendedConsumerProperties, reason: merged with bridge method [inline-methods] */
    public KafkaConsumerProperties m4getExtendedConsumerProperties(String str) {
        return this.extendedBindingProperties.m2getExtendedConsumerProperties(str);
    }

    /* renamed from: getExtendedProducerProperties, reason: merged with bridge method [inline-methods] */
    public KafkaProducerProperties m3getExtendedProducerProperties(String str) {
        return this.extendedBindingProperties.m1getExtendedProducerProperties(str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageHandler createProducerMessageHandler(String str, ExtendedProducerProperties<KafkaProducerProperties> extendedProducerProperties) throws Exception {
        KafkaTopicUtils.validateTopicName(str);
        createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(str, extendedProducerProperties.getPartitionCount());
        Collection<PartitionInfo> partitionsForTopic = getPartitionsForTopic(str, extendedProducerProperties.getPartitionCount());
        if (extendedProducerProperties.getPartitionCount() < partitionsForTopic.size() && this.logger.isInfoEnabled()) {
            this.logger.info("The `partitionCount` of the producer for topic " + str + " is " + extendedProducerProperties.getPartitionCount() + ", smaller than the actual partition count of " + partitionsForTopic.size() + " of the topic. The larger number will be used instead.");
        }
        this.topicsInUse.put(str, partitionsForTopic);
        DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = getProducerFactory(extendedProducerProperties);
        KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFactory);
        if (this.producerListener != null) {
            kafkaTemplate.setProducerListener(this.producerListener);
        }
        return new ProducerConfigurationMessageHandler(kafkaTemplate, str, extendedProducerProperties, producerFactory);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String createProducerDestinationIfNecessary(String str, ExtendedProducerProperties<KafkaProducerProperties> extendedProducerProperties) {
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Using kafka topic for outbound: " + str);
        }
        KafkaTopicUtils.validateTopicName(str);
        createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(str, extendedProducerProperties.getPartitionCount());
        Collection<PartitionInfo> partitionsForTopic = getPartitionsForTopic(str, extendedProducerProperties.getPartitionCount());
        if (extendedProducerProperties.getPartitionCount() < partitionsForTopic.size() && this.logger.isInfoEnabled()) {
            this.logger.info("The `partitionCount` of the producer for topic " + str + " is " + extendedProducerProperties.getPartitionCount() + ", smaller than the actual partition count of " + partitionsForTopic.size() + " of the topic. The larger number will be used instead.");
        }
        this.topicsInUse.put(str, partitionsForTopic);
        return str;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(ExtendedProducerProperties<KafkaProducerProperties> extendedProducerProperties) {
        HashMap hashMap = new HashMap();
        if (!ObjectUtils.isEmpty(this.configurationProperties.getConfiguration())) {
            hashMap.putAll(this.configurationProperties.getConfiguration());
        }
        hashMap.put("bootstrap.servers", this.configurationProperties.getKafkaConnectionString());
        hashMap.put("retries", 0);
        hashMap.put("batch.size", String.valueOf(((KafkaProducerProperties) extendedProducerProperties.getExtension()).getBufferSize()));
        hashMap.put("buffer.memory", 33554432);
        hashMap.put("key.serializer", ByteArraySerializer.class);
        hashMap.put("value.serializer", ByteArraySerializer.class);
        hashMap.put("acks", String.valueOf(this.configurationProperties.getRequiredAcks()));
        hashMap.put("linger.ms", String.valueOf(((KafkaProducerProperties) extendedProducerProperties.getExtension()).getBatchTimeout()));
        hashMap.put("compression.type", ((KafkaProducerProperties) extendedProducerProperties.getExtension()).getCompressionType().toString());
        if (!ObjectUtils.isEmpty(((KafkaProducerProperties) extendedProducerProperties.getExtension()).getConfiguration())) {
            hashMap.putAll(((KafkaProducerProperties) extendedProducerProperties.getExtension()).getConfiguration());
        }
        return new DefaultKafkaProducerFactory<>(hashMap);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Collection<PartitionInfo> createConsumerDestinationIfNecessary(String str, String str2, ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        Collection<PartitionInfo> collection;
        KafkaTopicUtils.validateTopicName(str);
        if (extendedConsumerProperties.getInstanceCount() == 0) {
            throw new IllegalArgumentException("Instance count cannot be zero");
        }
        int instanceCount = extendedConsumerProperties.getInstanceCount() * extendedConsumerProperties.getConcurrency();
        createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(str, instanceCount);
        Collection<PartitionInfo> partitionsForTopic = getPartitionsForTopic(str, instanceCount);
        if (((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isAutoRebalanceEnabled() || extendedConsumerProperties.getInstanceCount() == 1) {
            collection = partitionsForTopic;
        } else {
            collection = new ArrayList();
            for (PartitionInfo partitionInfo : partitionsForTopic) {
                if (partitionInfo.partition() % extendedConsumerProperties.getInstanceCount() == extendedConsumerProperties.getInstanceIndex()) {
                    collection.add(partitionInfo);
                }
            }
        }
        this.topicsInUse.put(str, collection);
        return collection;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageProducer createConsumerEndpoint(String str, String str2, Collection<PartitionInfo> collection, ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        boolean z = !StringUtils.hasText(str2);
        Assert.isTrue((z && ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isEnableDlq()) ? false : true, "DLQ support is not available for anonymous subscriptions");
        Map<String, Object> consumerConfig = getConsumerConfig(z, z ? "anonymous." + UUID.randomUUID().toString() : str2);
        if (!ObjectUtils.isEmpty(((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getConfiguration())) {
            consumerConfig.putAll(((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getConfiguration());
        }
        DefaultKafkaConsumerFactory defaultKafkaConsumerFactory = new DefaultKafkaConsumerFactory(consumerConfig);
        Assert.isTrue(!CollectionUtils.isEmpty(collection), "A list of partitions must be provided");
        ContainerProperties containerProperties = (z || ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isAutoRebalanceEnabled()) ? new ContainerProperties(new String[]{str}) : new ContainerProperties(getTopicPartitionInitialOffsets(collection));
        int min = Math.min(extendedConsumerProperties.getConcurrency(), collection.size());
        ConcurrentMessageListenerContainer concurrentMessageListenerContainer = new ConcurrentMessageListenerContainer(defaultKafkaConsumerFactory, containerProperties) { // from class: org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.1
            public void stop(Runnable runnable) {
                super.stop(runnable);
            }
        };
        concurrentMessageListenerContainer.setConcurrency(min);
        concurrentMessageListenerContainer.getContainerProperties().setAckOnError(isAutoCommitOnError(extendedConsumerProperties));
        if (!((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isAutoCommitOffset()) {
            concurrentMessageListenerContainer.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Listened partitions: " + StringUtils.collectionToCommaDelimitedString(collection));
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Listened partitions: " + StringUtils.collectionToCommaDelimitedString(collection));
        }
        KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter(concurrentMessageListenerContainer);
        kafkaMessageDrivenChannelAdapter.setBeanFactory(getBeanFactory());
        kafkaMessageDrivenChannelAdapter.setRetryTemplate(buildRetryTemplate(extendedConsumerProperties));
        if (((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isEnableDlq()) {
            final String str3 = "error." + str + "." + str2;
            initDlqProducer();
            concurrentMessageListenerContainer.getContainerProperties().setErrorHandler(new ErrorHandler() { // from class: org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.2
                public void handle(Exception exc, final ConsumerRecord consumerRecord) {
                    final byte[] array = consumerRecord.key() != null ? Utils.toArray(ByteBuffer.wrap((byte[]) consumerRecord.key())) : null;
                    final byte[] array2 = consumerRecord.value() != null ? Utils.toArray(ByteBuffer.wrap((byte[]) consumerRecord.value())) : null;
                    KafkaMessageChannelBinder.this.dlqProducer.send(new ProducerRecord(str3, array, array2), new Callback() { // from class: org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.2.1
                        public void onCompletion(RecordMetadata recordMetadata, Exception exc2) {
                            StringBuffer stringBuffer = new StringBuffer();
                            stringBuffer.append(" a message with key='" + KafkaMessageChannelBinder.this.toDisplayString(ObjectUtils.nullSafeToString(array), 50) + "'");
                            stringBuffer.append(" and payload='" + KafkaMessageChannelBinder.this.toDisplayString(ObjectUtils.nullSafeToString(array2), 50) + "'");
                            stringBuffer.append(" received from " + consumerRecord.partition());
                            if (exc2 != null) {
                                KafkaMessageChannelBinder.this.logger.error("Error sending to DLQ" + stringBuffer.toString(), exc2);
                            } else if (KafkaMessageChannelBinder.this.logger.isDebugEnabled()) {
                                KafkaMessageChannelBinder.this.logger.debug("Sent to DLQ " + stringBuffer.toString());
                            }
                        }
                    });
                }
            });
        }
        return kafkaMessageDrivenChannelAdapter;
    }

    private Map<String, Object> getConsumerConfig(boolean z, String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("key.deserializer", ByteArrayDeserializer.class);
        hashMap.put("value.deserializer", ByteArrayDeserializer.class);
        if (!ObjectUtils.isEmpty(this.configurationProperties.getConfiguration())) {
            hashMap.putAll(this.configurationProperties.getConfiguration());
        }
        hashMap.put("bootstrap.servers", this.configurationProperties.getKafkaConnectionString());
        hashMap.put("enable.auto.commit", false);
        hashMap.put("group.id", str);
        hashMap.put("auto.offset.reset", z ? "latest" : "earliest");
        hashMap.put("auto.commit.interval.ms", 100);
        return hashMap;
    }

    private boolean isAutoCommitOnError(ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        return ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getAutoCommitOnError() != null ? ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getAutoCommitOnError().booleanValue() : ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isAutoCommitOffset() && ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isEnableDlq();
    }

    private TopicPartitionInitialOffset[] getTopicPartitionInitialOffsets(Collection<PartitionInfo> collection) {
        TopicPartitionInitialOffset[] topicPartitionInitialOffsetArr = new TopicPartitionInitialOffset[collection.size()];
        int i = 0;
        for (PartitionInfo partitionInfo : collection) {
            int i2 = i;
            i++;
            topicPartitionInitialOffsetArr[i2] = new TopicPartitionInitialOffset(partitionInfo.topic(), partitionInfo.partition());
        }
        return topicPartitionInitialOffsetArr;
    }

    private void createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(String str, int i) {
        if (this.configurationProperties.isAutoCreateTopics() && this.adminUtilsOperation != null) {
            createTopicAndPartitions(str, i);
            return;
        }
        if (this.configurationProperties.isAutoCreateTopics() && this.adminUtilsOperation == null) {
            this.logger.warn("Auto creation of topics is enabled, but Kafka AdminUtils class is not present on the classpath. No topic will be created by the binder");
        } else {
            if (this.configurationProperties.isAutoCreateTopics()) {
                return;
            }
            this.logger.info("Auto creation of topics is disabled.");
        }
    }

    private void createTopicAndPartitions(final String str, int i) {
        final ZkUtils apply = ZkUtils.apply(this.configurationProperties.getZkConnectionString(), this.configurationProperties.getZkSessionTimeout(), this.configurationProperties.getZkConnectionTimeout(), JaasUtils.isZkSecurityEnabled());
        try {
            short errorCodeFromTopicMetadata = this.adminUtilsOperation.errorCodeFromTopicMetadata(str, apply);
            if (errorCodeFromTopicMetadata == ErrorMapping.NoError()) {
                int max = this.configurationProperties.isAutoAddPartitions() ? Math.max(this.configurationProperties.getMinPartitionCount(), i) : i;
                int partitionSize = this.adminUtilsOperation.partitionSize(str, apply);
                if (partitionSize < max) {
                    if (!this.configurationProperties.isAutoAddPartitions()) {
                        throw new BinderException("The number of expected partitions was: " + i + ", but " + partitionSize + (partitionSize > 1 ? " have " : " has ") + "been found instead.Consider either increasing the partition count of the topic or enabling `autoAddPartitions`");
                    }
                    this.adminUtilsOperation.invokeAddPartitions(apply, str, max, null, false);
                }
            } else {
                if (errorCodeFromTopicMetadata != ErrorMapping.UnknownTopicOrPartitionCode()) {
                    throw new BinderException("Error fetching Kafka topic metadata: ", ErrorMapping.exceptionFor(errorCodeFromTopicMetadata));
                }
                final int max2 = Math.max(this.configurationProperties.getMinPartitionCount(), i);
                this.metadataRetryOperations.execute(new RetryCallback<Object, RuntimeException>() { // from class: org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.3
                    public Object doWithRetry(RetryContext retryContext) throws RuntimeException {
                        KafkaMessageChannelBinder.this.adminUtilsOperation.invokeCreateTopic(apply, str, max2, KafkaMessageChannelBinder.this.configurationProperties.getReplicationFactor(), new Properties());
                        return null;
                    }
                });
            }
        } finally {
            apply.close();
        }
    }

    private Collection<PartitionInfo> getPartitionsForTopic(final String str, final int i) {
        try {
            return (Collection) this.metadataRetryOperations.execute(new RetryCallback<Collection<PartitionInfo>, Exception>() { // from class: org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.4
                /* renamed from: doWithRetry, reason: merged with bridge method [inline-methods] */
                public Collection<PartitionInfo> m5doWithRetry(RetryContext retryContext) throws Exception {
                    List partitionsFor = KafkaMessageChannelBinder.this.getProducerFactory(new ExtendedProducerProperties(new KafkaProducerProperties())).createProducer().partitionsFor(str);
                    if (partitionsFor.size() < i) {
                        throw new IllegalStateException("The number of expected partitions was: " + i + ", but " + partitionsFor.size() + (partitionsFor.size() > 1 ? " have " : " has ") + "been found instead");
                    }
                    return partitionsFor;
                }
            });
        } catch (Exception e) {
            this.logger.error("Cannot initialize Binder", e);
            throw new BinderException("Cannot initialize binder:", e);
        }
    }

    private synchronized void initDlqProducer() {
        try {
            if (this.dlqProducer == null) {
                synchronized (this) {
                    if (this.dlqProducer == null) {
                        HashMap hashMap = new HashMap();
                        hashMap.put("bootstrap.servers", this.configurationProperties.getKafkaConnectionString());
                        hashMap.put("retries", 0);
                        hashMap.put("batch.size", 16384);
                        hashMap.put("linger.ms", 1);
                        hashMap.put("buffer.memory", 33554432);
                        hashMap.put("key.serializer", ByteArraySerializer.class);
                        hashMap.put("value.serializer", ByteArraySerializer.class);
                        this.dlqProducer = new DefaultKafkaProducerFactory(hashMap).createProducer();
                    }
                }
            }
        } catch (Exception e) {
            throw new RuntimeException("Cannot initialize DLQ producer:", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String toDisplayString(String str, int i) {
        return str.length() <= i ? str : str.substring(0, i) + "...";
    }
}
