package com.cyngn.kafka.produce;

import com.cyngn.kafka.config.ConfigConstants;
import com.cyngn.kafka.produce.KafkaPublisher;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonObject;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cyngn/kafka/produce/MessageProducer.class */
public class MessageProducer extends AbstractVerticle {
    private KafkaProducer producer;
    private String busAddress;
    private String defaultTopic;
    private JsonObject producerConfig;
    private ExecutorService sender;
    public static String EVENTBUS_DEFAULT_ADDRESS = "kafka.message.publisher";
    private static final Logger logger = LoggerFactory.getLogger(MessageProducer.class);

    public void start(Future<Void> future) {
        try {
            this.producerConfig = config();
            Properties populateKafkaConfig = populateKafkaConfig();
            this.busAddress = this.producerConfig.getString(ConfigConstants.EVENTBUS_ADDRESS, EVENTBUS_DEFAULT_ADDRESS);
            this.defaultTopic = this.producerConfig.getString(ConfigConstants.DEFAULT_TOPIC);
            this.sender = Executors.newSingleThreadExecutor();
            this.producer = new KafkaProducer(populateKafkaConfig);
            this.vertx.eventBus().consumer(this.busAddress, message -> {
                sendMessage(message);
            });
            Runtime.getRuntime().addShutdownHook(new Thread() { // from class: com.cyngn.kafka.produce.MessageProducer.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    MessageProducer.this.shutdown();
                }
            });
            future.complete();
        } catch (Exception e) {
            logger.error("Message producer initialization failed with ex: {}", e);
            future.fail(e);
        }
    }

    public void sendMessage(Message<JsonObject> message) {
        ProducerRecord producerRecord;
        JsonObject jsonObject = (JsonObject) message.body();
        if (!jsonObject.containsKey(KafkaPublisher.TYPE_FIELD)) {
            logger.error("Invalid message sent missing {} field, msg: {}", KafkaPublisher.TYPE_FIELD, message);
            return;
        }
        KafkaPublisher.MessageType fromInt = KafkaPublisher.MessageType.fromInt(jsonObject.getInteger(KafkaPublisher.TYPE_FIELD).intValue());
        String string = jsonObject.getString(ConfigConstants.VALUE_FIELD);
        switch (fromInt) {
            case SIMPLE:
                producerRecord = new ProducerRecord(this.defaultTopic, string);
                break;
            case CUSTOM_TOPIC:
                producerRecord = new ProducerRecord(jsonObject.getString(ConfigConstants.TOPIC_FIELD), string);
                break;
            case CUSTOM_KEY:
                producerRecord = new ProducerRecord(jsonObject.getString(ConfigConstants.TOPIC_FIELD), jsonObject.getString(ConfigConstants.KEY_FIELD), string);
                break;
            case CUSTOM_PARTITION:
                producerRecord = new ProducerRecord(jsonObject.getString(ConfigConstants.TOPIC_FIELD), jsonObject.getInteger(ConfigConstants.PARTITION_FIELD), jsonObject.getString(ConfigConstants.KEY_FIELD), string);
                break;
            default:
                String format = String.format("Invalid type submitted: {} message being thrown away: %s", fromInt.toString(), string);
                logger.error(format);
                message.fail(-1, format);
                return;
        }
        ProducerRecord producerRecord2 = producerRecord;
        this.sender.submit(() -> {
            return this.producer.send(producerRecord2, (recordMetadata, exc) -> {
                if (exc == null) {
                    this.vertx.runOnContext(r5 -> {
                        message.reply(new JsonObject());
                    });
                    return;
                }
                this.vertx.runOnContext(r6 -> {
                    message.fail(-1, exc.getMessage());
                });
                exc.printStackTrace();
                logger.error("Failed to send message to kafka ex: ", exc);
            });
        });
    }

    private Properties populateKafkaConfig() {
        Properties properties = new Properties();
        properties.put(ConfigConstants.BOOTSTRAP_SERVERS, getRequiredConfig(ConfigConstants.BOOTSTRAP_SERVERS));
        String string = this.producerConfig.getString(ConfigConstants.SERIALIZER_CLASS, ConfigConstants.DEFAULT_SERIALIZER_CLASS);
        properties.put(ConfigConstants.SERIALIZER_CLASS, string);
        properties.put(ConfigConstants.KEY_SERIALIZER_CLASS, this.producerConfig.getString(ConfigConstants.KEY_SERIALIZER_CLASS, string));
        properties.put(ConfigConstants.VALUE_SERIALIZER_CLASS, this.producerConfig.getString(ConfigConstants.VALUE_SERIALIZER_CLASS, string));
        properties.put(ConfigConstants.PRODUCER_TYPE, this.producerConfig.getString(ConfigConstants.PRODUCER_TYPE, "async"));
        properties.put(ConfigConstants.MAX_BLOCK_MS, this.producerConfig.getLong(ConfigConstants.MAX_BLOCK_MS, new Long(60000L)));
        return properties;
    }

    private String getRequiredConfig(String str) {
        String string = this.producerConfig.getString(str, (String) null);
        if (null == string) {
            throw new IllegalArgumentException(String.format("Required config value not found key: %s", str));
        }
        return string;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void shutdown() {
        try {
            if (this.producer != null) {
                this.producer.close();
                this.producer = null;
            }
            if (this.sender != null) {
                this.sender.shutdown();
                this.sender = null;
            }
        } catch (Exception e) {
            logger.error("Failed to close producer", e);
        }
    }
}
