/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.core;

import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaProducerException;
import org.springframework.kafka.core.KafkaResourceHolder;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.ProducerFactoryUtils;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;

public class KafkaTemplate<K, V>
implements KafkaOperations<K, V> {
    protected final Log logger = LogFactory.getLog(this.getClass());
    private final ProducerFactory<K, V> producerFactory;
    private final boolean autoFlush;
    private final boolean transactional;
    private final ThreadLocal<Producer<K, V>> producers = new ThreadLocal();
    private RecordMessageConverter messageConverter = new MessagingMessageConverter();
    private volatile String defaultTopic;
    private volatile ProducerListener<K, V> producerListener = new LoggingProducerListener();

    public KafkaTemplate(ProducerFactory<K, V> producerFactory) {
        this(producerFactory, false);
    }

    public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush) {
        this.producerFactory = producerFactory;
        this.autoFlush = autoFlush;
        this.transactional = producerFactory.transactionCapable();
    }

    public String getDefaultTopic() {
        return this.defaultTopic;
    }

    public void setDefaultTopic(String defaultTopic) {
        this.defaultTopic = defaultTopic;
    }

    public void setProducerListener(ProducerListener<K, V> producerListener) {
        this.producerListener = producerListener;
    }

    public MessageConverter getMessageConverter() {
        return this.messageConverter;
    }

    public void setMessageConverter(RecordMessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }

    @Override
    public ListenableFuture<SendResult<K, V>> sendDefault(V data) {
        return this.send(this.defaultTopic, data);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> sendDefault(K key, V data) {
        return this.send(this.defaultTopic, key, data);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data) {
        return this.send(this.defaultTopic, partition, key, data);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data) {
        return this.send(this.defaultTopic, partition, timestamp, key, data);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> send(String topic, V data) {
        ProducerRecord producerRecord = new ProducerRecord(topic, data);
        return this.doSend(producerRecord);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> send(String topic, K key, V data) {
        ProducerRecord producerRecord = new ProducerRecord(topic, key, data);
        return this.doSend(producerRecord);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data) {
        ProducerRecord producerRecord = new ProducerRecord(topic, partition, key, data);
        return this.doSend(producerRecord);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data) {
        ProducerRecord producerRecord = new ProducerRecord(topic, partition, timestamp, key, data);
        return this.doSend(producerRecord);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record) {
        return this.doSend(record);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> send(Message<?> message) {
        ProducerRecord<?, ?> producerRecord = this.messageConverter.fromMessage(message, this.defaultTopic);
        return this.doSend(producerRecord);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<PartitionInfo> partitionsFor(String topic) {
        Producer<K, V> producer = this.getTheProducer();
        try {
            List list = producer.partitionsFor(topic);
            return list;
        }
        finally {
            this.closeProducer(producer, this.inTransaction());
        }
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        Producer<K, V> producer = this.getTheProducer();
        try {
            Map map = producer.metrics();
            return map;
        }
        finally {
            this.closeProducer(producer, this.inTransaction());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <T> T execute(KafkaOperations.ProducerCallback<K, V, T> callback) {
        Producer<K, V> producer = this.getTheProducer();
        try {
            T t = callback.doInKafka(producer);
            return t;
        }
        finally {
            this.closeProducer(producer, this.inTransaction());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <T> T executeInTransaction(KafkaOperations.OperationsCallback<K, V, T> callback) {
        Assert.state((boolean)this.transactional, (String)"Producer factory does not support transactions");
        Producer<K, V> producer = this.producers.get();
        Assert.state((producer == null ? 1 : 0) != 0, (String)"Nested calls to 'executeInTransaction' are not allowed");
        producer = this.producerFactory.createProducer();
        try {
            producer.beginTransaction();
        }
        catch (Exception e) {
            this.closeProducer(producer, false);
            throw e;
        }
        this.producers.set(producer);
        T result = null;
        try {
            result = callback.doInOperations(this);
        }
        catch (Exception e) {
            try {
                producer.abortTransaction();
            }
            finally {
                this.producers.remove();
                this.closeProducer(producer, false);
                producer = null;
            }
        }
        if (producer != null) {
            try {
                producer.commitTransaction();
            }
            finally {
                this.closeProducer(producer, false);
                this.producers.remove();
            }
        }
        return result;
    }

    @Override
    public void flush() {
        Producer<K, V> producer = this.getTheProducer();
        try {
            producer.flush();
        }
        finally {
            this.closeProducer(producer, this.inTransaction());
        }
    }

    @Override
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets) {
        this.sendOffsetsToTransaction(offsets, ProducerFactoryUtils.getConsumerGroupId());
    }

    @Override
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) {
        KafkaResourceHolder resourceHolder = (KafkaResourceHolder)((Object)TransactionSynchronizationManager.getResource(this.producerFactory));
        Assert.isTrue((resourceHolder != null ? 1 : 0) != 0, (String)"No transaction in process");
        if (resourceHolder.getProducer() != null) {
            resourceHolder.getProducer().sendOffsetsToTransaction(offsets, consumerGroupId);
        }
    }

    protected void closeProducer(Producer<K, V> producer, boolean inLocalTx) {
        if (!inLocalTx) {
            producer.close();
        }
    }

    protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
        if (this.transactional) {
            Assert.state((boolean)this.inTransaction(), (String)"No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record");
        }
        final Producer<K, V> producer = this.getTheProducer();
        if (this.logger.isTraceEnabled()) {
            this.logger.trace((Object)("Sending: " + producerRecord));
        }
        final SettableListenableFuture future = new SettableListenableFuture();
        producer.send(producerRecord, new Callback(){

            public void onCompletion(RecordMetadata metadata, Exception exception) {
                try {
                    if (exception == null) {
                        future.set(new SendResult(producerRecord, metadata));
                        if (KafkaTemplate.this.producerListener != null && KafkaTemplate.this.producerListener.isInterestedInSuccess()) {
                            KafkaTemplate.this.producerListener.onSuccess(producerRecord.topic(), producerRecord.partition(), producerRecord.key(), producerRecord.value(), metadata);
                        }
                    } else {
                        future.setException((Throwable)((Object)new KafkaProducerException(producerRecord, "Failed to send", exception)));
                        if (KafkaTemplate.this.producerListener != null) {
                            KafkaTemplate.this.producerListener.onError(producerRecord.topic(), producerRecord.partition(), producerRecord.key(), producerRecord.value(), exception);
                        }
                    }
                }
                finally {
                    if (!KafkaTemplate.this.transactional) {
                        KafkaTemplate.this.closeProducer(producer, false);
                    }
                }
            }
        });
        if (this.autoFlush) {
            this.flush();
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace((Object)("Sent: " + producerRecord));
        }
        return future;
    }

    protected boolean inTransaction() {
        return this.transactional && (this.producers.get() != null || TransactionSynchronizationManager.getResource(this.producerFactory) != null || TransactionSynchronizationManager.isActualTransactionActive());
    }

    private Producer<K, V> getTheProducer() {
        if (this.transactional) {
            Producer<K, V> producer = this.producers.get();
            if (producer != null) {
                return producer;
            }
            KafkaResourceHolder<K, V> holder = ProducerFactoryUtils.getTransactionalResourceHolder(this.producerFactory);
            return holder.getProducer();
        }
        return this.producerFactory.createProducer();
    }
}

