package com.cyngn.kafka.consume;

import com.cyngn.kafka.config.ConfigConstants;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cyngn/kafka/consume/SimpleConsumer.class */
public class SimpleConsumer extends AbstractVerticle {
    public static final String EVENTBUS_DEFAULT_ADDRESS = "kafka.message.consumer";
    public static final int DEFAULT_POLL_MS = 100;
    private String busAddress;
    private static final Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);
    private EventBus bus;
    private AtomicBoolean running;
    private KafkaConsumer consumer;
    private List<String> topics;
    private JsonObject verticleConfig;
    private ExecutorService backgroundConsumer;
    private int pollIntervalMs;

    public void start(Future<Void> future) {
        try {
            this.bus = this.vertx.eventBus();
            this.running = new AtomicBoolean(true);
            this.verticleConfig = config();
            Properties populateKafkaConfig = populateKafkaConfig(this.verticleConfig);
            JsonArray jsonArray = this.verticleConfig.getJsonArray(ConfigConstants.TOPICS);
            this.busAddress = this.verticleConfig.getString(ConfigConstants.EVENTBUS_ADDRESS, EVENTBUS_DEFAULT_ADDRESS);
            this.pollIntervalMs = this.verticleConfig.getInteger(ConfigConstants.CONSUMER_POLL_INTERVAL_MS, 100).intValue();
            Runtime.getRuntime().addShutdownHook(new Thread() { // from class: com.cyngn.kafka.consume.SimpleConsumer.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    SimpleConsumer.this.shutdown();
                }
            });
            this.backgroundConsumer = Executors.newSingleThreadExecutor();
            this.backgroundConsumer.submit(() -> {
                try {
                    this.consumer = new KafkaConsumer(populateKafkaConfig);
                    this.topics = new ArrayList();
                    for (int i = 0; i < jsonArray.size(); i++) {
                        this.topics.add(jsonArray.getString(i));
                        logger.info("Subscribing to topic ");
                    }
                    future.complete();
                    consume();
                } catch (Exception e) {
                    logger.error("Failed to startup", e);
                    this.bus.send(ConfigConstants.CONSUMER_ERROR_TOPIC, getErrorString("Failed to startup", e.getMessage()));
                    future.fail(e);
                }
            });
        } catch (Exception e) {
            logger.error("Failed to startup", e);
            this.bus.send(ConfigConstants.CONSUMER_ERROR_TOPIC, getErrorString("Failed to startup", e.getMessage()));
            future.fail(e);
        }
    }

    private String getErrorString(String str, String str2) {
        return String.format("%s - error: %s", str, str2);
    }

    private void consume() {
        this.consumer.subscribe(this.topics);
        while (this.running.get()) {
            try {
                ConsumerRecords poll = this.consumer.poll(this.pollIntervalMs);
                if (poll != null) {
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        sendMessage((ConsumerRecord) it.next());
                    }
                }
            } catch (Exception e) {
                logger.error("Error consuming messages from kafka", e);
                this.bus.send(ConfigConstants.CONSUMER_ERROR_TOPIC, getErrorString("Error consuming messages from kafka", e.getMessage()));
            }
        }
    }

    public void stop() {
        this.running.compareAndSet(true, false);
    }

    private void sendMessage(ConsumerRecord<String, String> consumerRecord) {
        try {
            this.bus.send(this.busAddress, KafkaEvent.createEventForBus(consumerRecord));
        } catch (Exception e) {
            String format = String.format("Error sending messages on event bus - record: %s", consumerRecord.toString());
            logger.error(format, e);
            this.bus.send(ConfigConstants.CONSUMER_ERROR_TOPIC, getErrorString(format, e.getMessage()));
        }
    }

    private Properties populateKafkaConfig(JsonObject jsonObject) {
        Properties properties = new Properties();
        properties.put(ConfigConstants.ZK_CONNECT, jsonObject.getString(ConfigConstants.ZK_CONNECT, "localhost:2181"));
        properties.put(ConfigConstants.BACKOFF_INCREMENT_MS, jsonObject.getString(ConfigConstants.BACKOFF_INCREMENT_MS, "100"));
        properties.put(ConfigConstants.AUTO_OFFSET_RESET, jsonObject.getString(ConfigConstants.AUTO_OFFSET_RESET, "smallest"));
        properties.put(ConfigConstants.BOOTSTRAP_SERVERS, getRequiredConfig(ConfigConstants.BOOTSTRAP_SERVERS));
        properties.put(ConfigConstants.KEY_DESERIALIZER_CLASS, jsonObject.getString(ConfigConstants.KEY_DESERIALIZER_CLASS, ConfigConstants.DEFAULT_DESERIALIZER_CLASS));
        properties.put(ConfigConstants.VALUE_DESERIALIZER_CLASS, jsonObject.getString(ConfigConstants.VALUE_DESERIALIZER_CLASS, ConfigConstants.DEFAULT_DESERIALIZER_CLASS));
        properties.put(ConfigConstants.GROUP_ID, getRequiredConfig(ConfigConstants.GROUP_ID));
        return properties;
    }

    private String getRequiredConfig(String str) {
        String string = this.verticleConfig.getString(str, (String) null);
        if (null == string) {
            throw new IllegalArgumentException(String.format("Required config value not found key: %s", str));
        }
        return string;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void shutdown() {
        this.running.compareAndSet(true, false);
        try {
            if (this.consumer != null) {
                try {
                    this.consumer.unsubscribe();
                    this.consumer.close();
                    this.consumer = null;
                } catch (Exception e) {
                }
            }
            if (this.backgroundConsumer != null) {
                this.backgroundConsumer.shutdown();
                this.backgroundConsumer = null;
            }
        } catch (Exception e2) {
            logger.error("Failed to close consumer", e2);
        }
    }
}
