/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.core;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaProducerException;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;

public class KafkaTemplate<K, V>
implements KafkaOperations<K, V> {
    protected final Log logger = LogFactory.getLog(this.getClass());
    private final ProducerFactory<K, V> producerFactory;
    private final boolean autoFlush;
    private MessageConverter messageConverter = new MessagingMessageConverter();
    private volatile Producer<K, V> producer;
    private volatile String defaultTopic;
    private volatile ProducerListener<K, V> producerListener = new LoggingProducerListener();

    public KafkaTemplate(ProducerFactory<K, V> producerFactory) {
        this(producerFactory, false);
    }

    public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush) {
        this.producerFactory = producerFactory;
        this.autoFlush = autoFlush;
    }

    public String getDefaultTopic() {
        return this.defaultTopic;
    }

    public void setDefaultTopic(String defaultTopic) {
        this.defaultTopic = defaultTopic;
    }

    public void setProducerListener(ProducerListener<K, V> producerListener) {
        this.producerListener = producerListener;
    }

    public MessageConverter getMessageConverter() {
        return this.messageConverter;
    }

    public void setMessageConverter(MessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }

    @Override
    public ListenableFuture<SendResult<K, V>> sendDefault(V data) {
        return this.send(this.defaultTopic, data);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> sendDefault(K key, V data) {
        return this.send(this.defaultTopic, key, data);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> sendDefault(int partition, K key, V data) {
        return this.send(this.defaultTopic, partition, key, data);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> send(String topic, V data) {
        ProducerRecord producerRecord = new ProducerRecord(topic, data);
        return this.doSend(producerRecord);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> send(String topic, K key, V data) {
        ProducerRecord producerRecord = new ProducerRecord(topic, key, data);
        return this.doSend(producerRecord);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> send(String topic, int partition, V data) {
        ProducerRecord producerRecord = new ProducerRecord(topic, Integer.valueOf(partition), null, data);
        return this.doSend(producerRecord);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> send(String topic, int partition, K key, V data) {
        ProducerRecord producerRecord = new ProducerRecord(topic, Integer.valueOf(partition), key, data);
        return this.doSend(producerRecord);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> send(Message<?> message) {
        ProducerRecord<?, ?> producerRecord = this.messageConverter.fromMessage(message, this.defaultTopic);
        return this.doSend(producerRecord);
    }

    @Override
    public void flush() {
        Assert.state((this.producer != null ? 1 : 0) != 0, (String)"'producer' must not be null for flushing.");
        this.producer.flush();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
        if (this.producer == null) {
            KafkaTemplate kafkaTemplate = this;
            synchronized (kafkaTemplate) {
                if (this.producer == null) {
                    this.producer = this.producerFactory.createProducer();
                }
            }
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace((Object)("Sending: " + producerRecord));
        }
        final SettableListenableFuture future = new SettableListenableFuture();
        this.producer.send(producerRecord, new Callback(){

            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    future.set(new SendResult(producerRecord, metadata));
                    if (KafkaTemplate.this.producerListener != null && KafkaTemplate.this.producerListener.isInterestedInSuccess()) {
                        KafkaTemplate.this.producerListener.onSuccess(producerRecord.topic(), producerRecord.partition(), producerRecord.key(), producerRecord.value(), metadata);
                    }
                } else {
                    future.setException((Throwable)((Object)new KafkaProducerException(producerRecord, "Failed to send", exception)));
                    if (KafkaTemplate.this.producerListener != null) {
                        KafkaTemplate.this.producerListener.onError(producerRecord.topic(), producerRecord.partition(), producerRecord.key(), producerRecord.value(), exception);
                    }
                }
            }
        });
        if (this.autoFlush) {
            this.flush();
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace((Object)("Sent: " + producerRecord));
        }
        return future;
    }
}

