package org.springframework.kafka.core;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.ProducerListenerInvokingCallback;

/* loaded from: input_file:org/springframework/kafka/core/KafkaTemplate.class */
public class KafkaTemplate<K, V> implements KafkaOperations<K, V> {
    protected final Log logger = LogFactory.getLog(getClass());
    private final ProducerFactory<K, V> producerFactory;
    private volatile Producer<K, V> producer;
    private volatile String defaultTopic;
    private volatile ProducerListener<K, V> producerListener;

    public KafkaTemplate(ProducerFactory<K, V> producerFactory) {
        this.producerFactory = producerFactory;
    }

    public String getDefaultTopic() {
        return this.defaultTopic;
    }

    public void setDefaultTopic(String str) {
        this.defaultTopic = str;
    }

    public void setProducerListener(ProducerListener<K, V> producerListener) {
        this.producerListener = producerListener;
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public Future<RecordMetadata> convertAndSend(V v) {
        return convertAndSend(this.defaultTopic, (String) v);
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public Future<RecordMetadata> convertAndSend(K k, V v) {
        return convertAndSend(this.defaultTopic, (String) k, (K) v);
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public Future<RecordMetadata> convertAndSend(int i, K k, V v) {
        return convertAndSend(this.defaultTopic, i, k, v);
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public Future<RecordMetadata> convertAndSend(String str, V v) {
        return doSend(new ProducerRecord<>(str, v));
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public Future<RecordMetadata> convertAndSend(String str, K k, V v) {
        return doSend(new ProducerRecord<>(str, k, v));
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public Future<RecordMetadata> convertAndSend(String str, int i, K k, V v) {
        return doSend(new ProducerRecord<>(str, Integer.valueOf(i), k, v));
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public RecordMetadata syncConvertAndSend(V v) throws InterruptedException, ExecutionException {
        Future<RecordMetadata> convertAndSend = convertAndSend(v);
        flush();
        return convertAndSend.get();
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public RecordMetadata syncConvertAndSend(K k, V v) throws InterruptedException, ExecutionException {
        Future<RecordMetadata> convertAndSend = convertAndSend((KafkaTemplate<K, V>) k, (K) v);
        flush();
        return convertAndSend.get();
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public RecordMetadata syncConvertAndSend(int i, K k, V v) throws InterruptedException, ExecutionException {
        Future<RecordMetadata> convertAndSend = convertAndSend(i, (int) k, (K) v);
        flush();
        return convertAndSend.get();
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public RecordMetadata syncConvertAndSend(String str, V v) throws InterruptedException, ExecutionException {
        Future<RecordMetadata> convertAndSend = convertAndSend(str, (String) v);
        flush();
        return convertAndSend.get();
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public RecordMetadata syncConvertAndSend(String str, K k, V v) throws InterruptedException, ExecutionException {
        Future<RecordMetadata> convertAndSend = convertAndSend(str, (String) k, (K) v);
        flush();
        return convertAndSend.get();
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public RecordMetadata syncConvertAndSend(String str, int i, K k, V v) throws InterruptedException, ExecutionException {
        Future<RecordMetadata> convertAndSend = convertAndSend(str, i, k, v);
        flush();
        return convertAndSend.get();
    }

    protected Future<RecordMetadata> doSend(ProducerRecord<K, V> producerRecord) {
        if (this.producer == null) {
            synchronized (this) {
                if (this.producer == null) {
                    this.producer = this.producerFactory.createProducer();
                }
            }
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Sending: " + producerRecord);
        }
        Future<RecordMetadata> send = this.producerListener == null ? this.producer.send(producerRecord) : this.producer.send(producerRecord, new ProducerListenerInvokingCallback(producerRecord.topic(), producerRecord.partition(), producerRecord.key(), producerRecord.value(), this.producerListener));
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Sent: " + producerRecord);
        }
        return send;
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public void flush() {
        this.producer.flush();
    }
}
