package org.voltdb.importclient.kafka10;

import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
import org.voltcore.logging.VoltLogger;
import org.voltdb.importer.AbstractImporter;

/* loaded from: input_file:org/voltdb/importclient/kafka10/KafkaStreamImporter.class */
public class KafkaStreamImporter extends AbstractImporter {
    private static final VoltLogger LOGGER = new VoltLogger("KAFKAIMPORTER");
    protected KafkaStreamImporterConfig m_config;
    protected KafkaConsumerRunner m_runner;
    private List<KafkaInternalConsumerRunner> m_consumers;
    private ExecutorService m_executorService = null;
    private final AtomicBoolean m_shutdown = new AtomicBoolean(false);
    private final Object m_lock = new Object();

    public KafkaStreamImporter(KafkaStreamImporterConfig kafkaStreamImporterConfig) {
        this.m_config = kafkaStreamImporterConfig;
    }

    @Override // org.voltdb.InternalConnectionContext
    public String getName() {
        return KafkaStreamImporter.class.getName();
    }

    @Override // org.voltdb.importer.AbstractImporter
    public URI getResourceID() {
        return this.m_config.getURI();
    }

    private KafkaInternalConsumerRunner createConsumerRunner(Properties properties) throws Exception {
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
        try {
            KafkaInternalConsumerRunner kafkaInternalConsumerRunner = new KafkaInternalConsumerRunner(this, this.m_config, new KafkaConsumer(properties));
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            return kafkaInternalConsumerRunner;
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    @Override // org.voltdb.importer.AbstractImporter
    public void accept() {
        Properties properties = new Properties();
        properties.put("group.id", this.m_config.getGroupId());
        properties.put("bootstrap.servers", this.m_config.getBrokers());
        properties.put("key.deserializer", ByteBufferDeserializer.class.getName());
        properties.put("value.deserializer", ByteBufferDeserializer.class.getName());
        properties.put("enable.auto.commit", "false");
        properties.put("fetch.max.bytes", Integer.valueOf(this.m_config.getMaxMessageFetchSize()));
        properties.put("request.timeout.ms", Integer.valueOf(this.m_config.getConsumerRequestTimeout()));
        properties.put("max.partition.fetch.bytes", Integer.valueOf(this.m_config.getMaxPartitionFetchBytes()));
        properties.put("max.poll.records", Integer.valueOf(this.m_config.getMaxPollRecords()));
        properties.put("max.poll.interval.ms", Integer.valueOf(this.m_config.getMaxPollInterval()));
        properties.put("session.timeout.ms", Integer.valueOf(this.m_config.getSessionTimeOut()));
        properties.put("heartbeat.interval.ms", Integer.valueOf(this.m_config.getHeartBeatInterval()));
        properties.put("auto.offset.reset", this.m_config.getAutoOffsetReset());
        properties.put("partition.assignment.strategy", RoundRobinAssignor.class.getName());
        synchronized (this.m_lock) {
            int i = 0;
            KafkaInternalConsumerRunner kafkaInternalConsumerRunner = null;
            try {
                try {
                    kafkaInternalConsumerRunner = createConsumerRunner(properties);
                    i = kafkaInternalConsumerRunner.getKafkaTopicPartitionCount();
                } catch (Throwable th) {
                    LOGGER.error("Failed creating Kafka consumer ", th);
                }
            } catch (KafkaException e) {
                LOGGER.error("Couldn't create Kafka consumer. Please check the configuration paramaters. Error:" + e.getMessage());
            }
            if (i < 1) {
                return;
            }
            int i2 = i;
            if (this.m_config.getConsumerCount() > 0) {
                i2 = this.m_config.getConsumerCount();
            }
            int ceil = (int) Math.ceil(i2 / this.m_config.getDBHostCount());
            this.m_executorService = Executors.newFixedThreadPool(ceil);
            this.m_consumers = new ArrayList();
            this.m_consumers.add(kafkaInternalConsumerRunner);
            if (ceil > 1) {
                for (int i3 = 1; i3 < ceil; i3++) {
                    try {
                        try {
                            if (this.m_shutdown.get()) {
                                return;
                            }
                            this.m_consumers.add(createConsumerRunner(properties));
                        } catch (KafkaException e2) {
                            LOGGER.error("Couldn't create Kafka consumer. Please check the configuration paramaters. Error:" + e2.getMessage());
                        }
                    } catch (Throwable th2) {
                        LOGGER.error("Couldn't create Kafka consumer. ", th2);
                    }
                }
            }
            if (this.m_consumers.size() != ceil) {
                Iterator<KafkaInternalConsumerRunner> it = this.m_consumers.iterator();
                while (it.hasNext()) {
                    it.next().shutdown();
                }
                this.m_consumers.clear();
            } else {
                for (KafkaInternalConsumerRunner kafkaInternalConsumerRunner2 : this.m_consumers) {
                    if (this.m_shutdown.get()) {
                        return;
                    } else {
                        this.m_executorService.submit(kafkaInternalConsumerRunner2);
                    }
                }
            }
            LOGGER.info("Number of Kafka Consumers on this host:" + ceil);
            for (String str : this.m_config.getTopics().split("\\s*,\\s*")) {
                reportInitializedStat(this.m_config.getProcedure(str));
            }
        }
    }

    @Override // org.voltdb.importer.AbstractImporter, org.voltdb.importer.ImporterLifecycle
    public void stop() {
        this.m_shutdown.set(true);
        synchronized (this.m_lock) {
            if (this.m_consumers != null) {
                for (KafkaInternalConsumerRunner kafkaInternalConsumerRunner : this.m_consumers) {
                    if (kafkaInternalConsumerRunner != null) {
                        kafkaInternalConsumerRunner.shutdown();
                    }
                }
                this.m_consumers.clear();
            }
            if (this.m_executorService == null) {
                return;
            }
            this.m_executorService.shutdown();
            try {
                this.m_executorService.awaitTermination(60L, TimeUnit.SECONDS);
                this.m_executorService = null;
            } catch (InterruptedException e) {
                this.m_executorService = null;
            } catch (Throwable th) {
                this.m_executorService = null;
                throw th;
            }
        }
    }
}
