package org.graylog2.radio.transports.kafka;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import javax.inject.Inject;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.RadioMessage;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.radio.Configuration;
import org.graylog2.radio.transports.RadioTransport;
import org.msgpack.MessagePack;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog2/radio/transports/kafka/KafkaProducer.class */
public class KafkaProducer implements RadioTransport {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaProducer.class);
    public static final String KAFKA_TOPIC = "graylog2-radio-messages";
    private final Producer<byte[], byte[]> producer;
    private final MessagePack pack = new MessagePack();
    private final Meter incomingMessages;
    private final Meter rejectedMessages;
    private final Timer processTime;

    @Inject
    public KafkaProducer(ServerStatus serverStatus, Configuration configuration, MetricRegistry metricRegistry) {
        this.pack.setClassLoader(new ClassLoader(Thread.currentThread().getContextClassLoader()) { // from class: org.graylog2.radio.transports.kafka.KafkaProducer.1
        });
        Properties properties = new Properties();
        properties.put("metadata.broker.list", configuration.getKafkaBrokers());
        properties.put("partitioner.class", "kafka.producer.DefaultPartitioner");
        properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
        properties.put("request.required.acks", String.valueOf(configuration.getKafkaRequiredAcks()));
        properties.put("client.id", "graylog2-radio-" + serverStatus.getNodeId().toString());
        properties.put("producer.type", configuration.getKafkaProducerType());
        properties.put("queue.buffering.max.ms", String.valueOf(configuration.getKafkaBatchMaxWaitMs()));
        properties.put("batch.num.messages", String.valueOf(configuration.getKafkaBatchSize()));
        this.producer = new Producer<>(new ProducerConfig(properties));
        this.incomingMessages = metricRegistry.meter(MetricRegistry.name((Class<?>) KafkaProducer.class, "incomingMessages"));
        this.rejectedMessages = metricRegistry.meter(MetricRegistry.name((Class<?>) KafkaProducer.class, "rejectedMessages"));
        this.processTime = metricRegistry.timer(MetricRegistry.name((Class<?>) KafkaProducer.class, "processTime"));
    }

    @Override // org.graylog2.radio.transports.RadioTransport
    public void send(Message message) {
        try {
            Timer.Context time = this.processTime.time();
            Throwable th = null;
            try {
                this.incomingMessages.mark();
                this.producer.send(new KeyedMessage<>(KAFKA_TOPIC, message.getId().getBytes(StandardCharsets.UTF_8), RadioMessage.serialize(this.pack, message)));
                if (time != null) {
                    if (0 != 0) {
                        try {
                            time.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        time.close();
                    }
                }
            } finally {
            }
        } catch (IOException e) {
            LOG.error("Could not serialize message.", (Throwable) e);
            this.rejectedMessages.mark();
        }
    }
}
