package org.voltdb.importclient.kafka10;

import au.com.bytecode.opencsv_voltpatches.CSVParser;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.voltcore.logging.VoltLogger;
import org.voltdb.client.ProcedureCallback;
import org.voltdb.importer.ImporterLifecycle;
import org.voltdb.importer.formatter.FormatException;
import org.voltdb.importer.formatter.Formatter;
import org.voltdb.sysprocs.saverestore.SnapshotUtil;
import org.voltdb.utils.CSVDataLoader;
import org.voltdb.utils.CatalogUtil;
import org.voltdb.utils.RowWithMetaData;

/* loaded from: input_file:org/voltdb/importclient/kafka10/KafkaExternalConsumerRunner.class */
public class KafkaExternalConsumerRunner extends KafkaConsumerRunner {
    private static final VoltLogger LOGGER = new VoltLogger("KAFKALOADER10");
    private final CSVDataLoader m_loader;
    private Formatter m_formatter;

    public KafkaExternalConsumerRunner(ImporterLifecycle importerLifecycle, KafkaLoaderConfig kafkaLoaderConfig, Consumer<ByteBuffer, ByteBuffer> consumer, CSVDataLoader cSVDataLoader) throws Exception {
        super(importerLifecycle, kafkaLoaderConfig, consumer);
        String property;
        this.m_formatter = null;
        this.m_loader = cSVDataLoader;
        if (kafkaLoaderConfig.getFormatterProperties() == null || (property = kafkaLoaderConfig.getFormatterProperties().getProperty("formatter")) == null || property.trim().isEmpty()) {
            return;
        }
        this.m_formatter = (Formatter) Class.forName(property).getDeclaredConstructor(String.class, Properties.class).newInstance(kafkaLoaderConfig.getFormatterProperties().getProperty(SnapshotUtil.JSON_FORMAT, CatalogUtil.DEFAULT_DR_CONFLICTS_EXPORT_TYPE), kafkaLoaderConfig.getFormatterProperties());
    }

    @Override // org.voltdb.importclient.kafka10.KafkaConsumerRunner
    public boolean invoke(String str, long j, String str2, Object[] objArr, ProcedureCallback procedureCallback) throws Exception {
        this.m_loader.insertRow(new RowWithMetaData(str, j, procedureCallback), objArr);
        return true;
    }

    @Override // org.voltdb.importclient.kafka10.KafkaConsumerRunner
    protected void subscribe() {
        LOGGER.info("Kafka consumer subscribes topics:" + this.m_config.getTopics());
        this.m_consumer.subscribe(Arrays.asList(this.m_config.getTopics().split(CatalogUtil.SIGNATURE_DELIMITER)), new ConsumerRebalanceListener() { // from class: org.voltdb.importclient.kafka10.KafkaExternalConsumerRunner.1
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                KafkaExternalConsumerRunner.LOGGER.info("Kafka consumer drops topic and partitions: " + collection);
            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                KafkaExternalConsumerRunner.LOGGER.info("Kafka topics and partitions join this consumer: " + collection);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.voltdb.importclient.kafka10.KafkaConsumerRunner
    public void shutdown() {
        if (this.m_consumer != null && this.m_done.compareAndSet(false, true)) {
            this.m_consumer.wakeup();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v64, types: [java.lang.Object[]] */
    @Override // org.voltdb.importclient.kafka10.KafkaConsumerRunner, java.lang.Runnable
    public void run() {
        String[] parseLine;
        CSVParser cSVParser = new CSVParser();
        try {
            try {
                subscribe();
                while (!this.m_done.get()) {
                    ConsumerRecords consumerRecords = null;
                    try {
                        consumerRecords = this.m_consumer.poll(this.m_config.getPollTimeout());
                    } catch (WakeupException e) {
                        if (this.m_done.get()) {
                            break;
                        }
                    }
                    Iterator it = consumerRecords.iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        long offset = consumerRecord.offset();
                        String str = new String(((ByteBuffer) consumerRecord.value()).array(), StandardCharsets.UTF_8);
                        if (this.m_formatter != null) {
                            try {
                                parseLine = this.m_formatter.transform(ByteBuffer.wrap(str.getBytes()));
                            } catch (FormatException e2) {
                                LOGGER.warn("Failed to transform message " + str + " at offset " + offset + ", error: " + e2.getMessage());
                            }
                        } else {
                            parseLine = cSVParser.parseLine(str);
                        }
                        if (parseLine != null) {
                            this.m_loader.insertRow(new RowWithMetaData(str, offset), parseLine);
                        }
                    }
                }
                try {
                    this.m_consumer.close();
                    this.m_consumer = null;
                } catch (Exception e3) {
                    LOGGER.warn("Exception while cleaning up Kafka consumer.", e3);
                }
                ((KafkaLoader) this.m_lifecycle).notifyShutdown();
            } catch (Throwable th) {
                try {
                    this.m_consumer.close();
                    this.m_consumer = null;
                } catch (Exception e4) {
                    LOGGER.warn("Exception while cleaning up Kafka consumer.", e4);
                }
                ((KafkaLoader) this.m_lifecycle).notifyShutdown();
                throw th;
            }
        } catch (IOException e5) {
            this.m_done.set(true);
            LOGGER.error("Fail to process message:" + this.m_config.getTopics(), e5);
            try {
                this.m_consumer.close();
                this.m_consumer = null;
            } catch (Exception e6) {
                LOGGER.warn("Exception while cleaning up Kafka consumer.", e6);
            }
            ((KafkaLoader) this.m_lifecycle).notifyShutdown();
        } catch (Throwable th2) {
            this.m_done.set(true);
            LOGGER.error("Error seen during poll", th2);
            try {
                this.m_consumer.close();
                this.m_consumer = null;
            } catch (Exception e7) {
                LOGGER.warn("Exception while cleaning up Kafka consumer.", e7);
            }
            ((KafkaLoader) this.m_lifecycle).notifyShutdown();
        }
    }
}
