package org.springframework.kafka.core;

import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;

/* loaded from: input_file:org/springframework/kafka/core/KafkaTemplate.class */
public class KafkaTemplate<K, V> implements KafkaOperations<K, V> {
    protected final Log logger;
    private final ProducerFactory<K, V> producerFactory;
    private final boolean autoFlush;
    private final boolean transactional;
    private final ThreadLocal<Producer<K, V>> producers;
    private RecordMessageConverter messageConverter;
    private volatile String defaultTopic;
    private volatile ProducerListener<K, V> producerListener;

    public KafkaTemplate(ProducerFactory<K, V> producerFactory) {
        this(producerFactory, false);
    }

    public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean z) {
        this.logger = LogFactory.getLog(getClass());
        this.producers = new ThreadLocal<>();
        this.messageConverter = new MessagingMessageConverter();
        this.producerListener = new LoggingProducerListener();
        this.producerFactory = producerFactory;
        this.autoFlush = z;
        this.transactional = producerFactory.transactionCapable();
    }

    public String getDefaultTopic() {
        return this.defaultTopic;
    }

    public void setDefaultTopic(String str) {
        this.defaultTopic = str;
    }

    public void setProducerListener(ProducerListener<K, V> producerListener) {
        this.producerListener = producerListener;
    }

    public MessageConverter getMessageConverter() {
        return this.messageConverter;
    }

    public void setMessageConverter(RecordMessageConverter recordMessageConverter) {
        this.messageConverter = recordMessageConverter;
    }

    public boolean isTransactional() {
        return this.transactional;
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public ListenableFuture<SendResult<K, V>> sendDefault(V v) {
        return send(this.defaultTopic, v);
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public ListenableFuture<SendResult<K, V>> sendDefault(K k, V v) {
        return send(this.defaultTopic, k, v);
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public ListenableFuture<SendResult<K, V>> sendDefault(Integer num, K k, V v) {
        return send(this.defaultTopic, num, k, v);
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public ListenableFuture<SendResult<K, V>> sendDefault(Integer num, Long l, K k, V v) {
        return send(this.defaultTopic, num, l, k, v);
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public ListenableFuture<SendResult<K, V>> send(String str, V v) {
        return doSend(new ProducerRecord<>(str, v));
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public ListenableFuture<SendResult<K, V>> send(String str, K k, V v) {
        return doSend(new ProducerRecord<>(str, k, v));
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public ListenableFuture<SendResult<K, V>> send(String str, Integer num, K k, V v) {
        return doSend(new ProducerRecord<>(str, num, k, v));
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public ListenableFuture<SendResult<K, V>> send(String str, Integer num, Long l, K k, V v) {
        return doSend(new ProducerRecord<>(str, num, l, k, v));
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> producerRecord) {
        return doSend(producerRecord);
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public ListenableFuture<SendResult<K, V>> send(Message<?> message) {
        byte[] bArr;
        ProducerRecord<?, ?> fromMessage = this.messageConverter.fromMessage(message, this.defaultTopic);
        if (!fromMessage.headers().iterator().hasNext() && (bArr = (byte[]) message.getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class)) != null) {
            fromMessage.headers().add(KafkaHeaders.CORRELATION_ID, bArr);
        }
        return doSend(fromMessage);
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public List<PartitionInfo> partitionsFor(String str) {
        Producer<K, V> theProducer = getTheProducer();
        try {
            List<PartitionInfo> partitionsFor = theProducer.partitionsFor(str);
            closeProducer(theProducer, inTransaction());
            return partitionsFor;
        } catch (Throwable th) {
            closeProducer(theProducer, inTransaction());
            throw th;
        }
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public Map<MetricName, ? extends Metric> metrics() {
        Producer<K, V> theProducer = getTheProducer();
        try {
            return theProducer.metrics();
        } finally {
            closeProducer(theProducer, inTransaction());
        }
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public <T> T execute(KafkaOperations.ProducerCallback<K, V, T> producerCallback) {
        Producer<K, V> theProducer = getTheProducer();
        try {
            T doInKafka = producerCallback.doInKafka(theProducer);
            closeProducer(theProducer, inTransaction());
            return doInKafka;
        } catch (Throwable th) {
            closeProducer(theProducer, inTransaction());
            throw th;
        }
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public <T> T executeInTransaction(KafkaOperations.OperationsCallback<K, V, T> operationsCallback) {
        Assert.state(this.transactional, "Producer factory does not support transactions");
        Assert.state(this.producers.get() == null, "Nested calls to 'executeInTransaction' are not allowed");
        Producer<K, V> createProducer = this.producerFactory.createProducer();
        try {
            createProducer.beginTransaction();
            this.producers.set(createProducer);
            try {
                T doInOperations = operationsCallback.doInOperations(this);
                try {
                    createProducer.commitTransaction();
                    this.producers.remove();
                    closeProducer(createProducer, false);
                    return doInOperations;
                } finally {
                }
            } catch (Exception e) {
                try {
                    createProducer.abortTransaction();
                    this.producers.remove();
                    closeProducer(createProducer, false);
                    throw e;
                } finally {
                }
            }
        } catch (Exception e2) {
            closeProducer(createProducer, false);
            throw e2;
        }
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public void flush() {
        Producer<K, V> theProducer = getTheProducer();
        try {
            theProducer.flush();
        } finally {
            closeProducer(theProducer, inTransaction());
        }
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map) {
        sendOffsetsToTransaction(map, ProducerFactoryUtils.getConsumerGroupId());
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, String str) {
        KafkaResourceHolder kafkaResourceHolder = (KafkaResourceHolder) TransactionSynchronizationManager.getResource(this.producerFactory);
        Assert.isTrue(kafkaResourceHolder != null, "No transaction in process");
        if (kafkaResourceHolder.getProducer() != null) {
            kafkaResourceHolder.getProducer().sendOffsetsToTransaction(map, str);
        }
    }

    protected void closeProducer(Producer<K, V> producer, boolean z) {
        if (z) {
            return;
        }
        producer.close();
    }

    protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
        if (this.transactional) {
            Assert.state(inTransaction(), "No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record");
        }
        final Producer<K, V> theProducer = getTheProducer();
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Sending: " + producerRecord);
        }
        final SettableListenableFuture settableListenableFuture = new SettableListenableFuture();
        theProducer.send(producerRecord, new Callback() { // from class: org.springframework.kafka.core.KafkaTemplate.1
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                try {
                    if (exc == null) {
                        settableListenableFuture.set(new SendResult(producerRecord, recordMetadata));
                        if (KafkaTemplate.this.producerListener != null) {
                            KafkaTemplate.this.producerListener.onSuccess(producerRecord, recordMetadata);
                        }
                        if (KafkaTemplate.this.logger.isTraceEnabled()) {
                            KafkaTemplate.this.logger.trace("Sent ok: " + producerRecord + ", metadata: " + recordMetadata);
                        }
                    } else {
                        settableListenableFuture.setException(new KafkaProducerException(producerRecord, "Failed to send", exc));
                        if (KafkaTemplate.this.producerListener != null) {
                            KafkaTemplate.this.producerListener.onError(producerRecord, exc);
                        }
                        if (KafkaTemplate.this.logger.isDebugEnabled()) {
                            KafkaTemplate.this.logger.debug("Failed to send: " + producerRecord, exc);
                        }
                    }
                } finally {
                    if (!KafkaTemplate.this.transactional) {
                        KafkaTemplate.this.closeProducer(theProducer, false);
                    }
                }
            }
        });
        if (this.autoFlush) {
            flush();
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Sent: " + producerRecord);
        }
        return settableListenableFuture;
    }

    protected boolean inTransaction() {
        return this.transactional && !(this.producers.get() == null && TransactionSynchronizationManager.getResource(this.producerFactory) == null && !TransactionSynchronizationManager.isActualTransactionActive());
    }

    private Producer<K, V> getTheProducer() {
        if (!this.transactional) {
            return this.producerFactory.createProducer();
        }
        Producer<K, V> producer = this.producers.get();
        return producer != null ? producer : ProducerFactoryUtils.getTransactionalResourceHolder(this.producerFactory).getProducer();
    }
}
