package org.voltdb.importclient.kafka;

import au.com.bytecode.opencsv_voltpatches.CSVParser;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.voltcore.logging.VoltLogger;
import org.voltdb.CLIConfig;
import org.voltdb.client.AutoReconnectListener;
import org.voltdb.client.Client;
import org.voltdb.client.ClientConfig;
import org.voltdb.client.ClientFactory;
import org.voltdb.client.ClientImpl;
import org.voltdb.client.ClientResponse;
import org.voltdb.common.Constants;
import org.voltdb.importer.formatter.FormatException;
import org.voltdb.importer.formatter.Formatter;
import org.voltdb.sysprocs.saverestore.SnapshotUtil;
import org.voltdb.utils.BulkLoaderErrorHandler;
import org.voltdb.utils.CSVBulkDataLoader;
import org.voltdb.utils.CSVDataLoader;
import org.voltdb.utils.CSVTupleDataLoader;
import org.voltdb.utils.CatalogUtil;
import org.voltdb.utils.MiscUtils;
import org.voltdb.utils.RowWithMetaData;

/* loaded from: input_file:org/voltdb/importclient/kafka/KafkaLoader.class */
public class KafkaLoader {
    private final KafkaConfig m_config;
    private CSVDataLoader m_loader = null;
    private Client m_client = null;
    private KafkaConsumerConnector m_consumer = null;
    private ExecutorService m_executorService = null;
    private static final VoltLogger m_log = new VoltLogger("KAFKALOADER");
    private static final AtomicLong m_failedCount = new AtomicLong(0);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/voltdb/importclient/kafka/KafkaLoader$KafkaBulkLoaderCallback.class */
    public class KafkaBulkLoaderCallback implements BulkLoaderErrorHandler {
        KafkaBulkLoaderCallback() {
        }

        @Override // org.voltdb.utils.BulkLoaderErrorHandler
        public boolean handleError(RowWithMetaData rowWithMetaData, ClientResponse clientResponse, String str) {
            byte status;
            if (KafkaLoader.this.m_config.maxerrors <= 0 || clientResponse == null || (status = clientResponse.getStatus()) == 1) {
                return false;
            }
            KafkaLoader.m_log.error("Failed to Insert Row: " + rowWithMetaData.rawLine);
            long incrementAndGet = KafkaLoader.m_failedCount.incrementAndGet();
            if ((KafkaLoader.this.m_config.maxerrors <= 0 || incrementAndGet <= KafkaLoader.this.m_config.maxerrors) && (status == -1 || status == -2)) {
                return false;
            }
            try {
                KafkaLoader.m_log.error("Kafkaloader will exit.");
                KafkaLoader.this.closeConsumer();
                return true;
            } catch (InterruptedException e) {
                return false;
            }
        }

        @Override // org.voltdb.utils.BulkLoaderErrorHandler
        public boolean hasReachedErrorLimit() {
            return KafkaLoader.this.m_config.maxerrors > 0 && KafkaLoader.m_failedCount.get() > ((long) KafkaLoader.this.m_config.maxerrors);
        }
    }

    /* loaded from: input_file:org/voltdb/importclient/kafka/KafkaLoader$KafkaConfig.class */
    public static class KafkaConfig extends CLIConfig {

        @CLIConfig.Option(shortOpt = "p", desc = "procedure name to insert the data into the database")
        String procedure = "";
        boolean useSuppliedProcedure = false;

        @CLIConfig.Option(shortOpt = "t", desc = "Kafka Topic to subscribe to")
        String topic = "";

        @CLIConfig.Option(shortOpt = "m", desc = "maximum errors allowed")
        int maxerrors = 100;

        @CLIConfig.Option(shortOpt = "s", desc = "list of servers to connect to (default: localhost)")
        String servers = "localhost";

        @CLIConfig.Option(desc = "port to use when connecting to database (default: 21212)")
        int port = 21212;

        @CLIConfig.Option(desc = "username when connecting to the servers")
        String user = "";

        @CLIConfig.Option(desc = "password to use when connecting to servers")
        String password = "";

        @CLIConfig.Option(desc = "credentials that contains username and password information")
        String credentials = "";

        @CLIConfig.Option(shortOpt = "z", desc = "kafka zookeeper to connect to. (format: zkserver:port)")
        String zookeeper = "";

        @CLIConfig.Option(shortOpt = "f", desc = "Periodic Flush Interval in seconds. (default: 10)")
        int flush = 10;

        @CLIConfig.Option(shortOpt = "k", desc = "Kafka Topic Partitions. (default: 10)")
        int kpartitions = 10;

        @CLIConfig.Option(shortOpt = "c", desc = "Kafka Consumer Configuration File")
        String config = "";

        @CLIConfig.Option(desc = "Formatter configuration file. (Optional) .")
        String formatter = "";

        @CLIConfig.Option(desc = "Batch Size for processing.")
        public int batch = 200;

        @CLIConfig.AdditionalArgs(desc = "insert the data into this table.")
        public String table = "";

        @CLIConfig.Option(desc = "Use upsert instead of insert", hasArg = false)
        boolean update = false;

        @CLIConfig.Option(desc = "Enable SSL, Optionally provide configuration file.")
        String ssl = "";

        @CLIConfig.Option(desc = "Stop when all connections are lost", hasArg = false)
        boolean stopondisconnect = false;
        Properties m_formatterProperties = new Properties();

        @Override // org.voltdb.CLIConfig
        public void validate() {
            if (this.batch < 0) {
                exitWithMessageAndUsage("batch size number must be >= 0");
            }
            if (this.flush <= 0) {
                exitWithMessageAndUsage("Periodic Flush Interval must be > 0");
            }
            if (this.topic.trim().isEmpty()) {
                exitWithMessageAndUsage("Topic must be specified.");
            }
            if (this.zookeeper.trim().isEmpty()) {
                exitWithMessageAndUsage("Kafka Zookeeper must be specified.");
            }
            if (this.port < 0) {
                exitWithMessageAndUsage("port number must be >= 0");
            }
            if (this.procedure.trim().isEmpty() && this.table.trim().isEmpty()) {
                exitWithMessageAndUsage("procedure name or a table name required");
            }
            if (!this.procedure.trim().isEmpty() && !this.table.trim().isEmpty()) {
                exitWithMessageAndUsage("Only a procedure name or a table name required, pass only one please");
            }
            if (!this.procedure.trim().isEmpty()) {
                this.useSuppliedProcedure = true;
            }
            if (this.useSuppliedProcedure && this.update) {
                this.update = false;
                exitWithMessageAndUsage("update is not applicable when stored procedure specified");
            }
            try {
                KafkaConfig.class.getClassLoader().loadClass("org.I0Itec.zkclient.IZkStateListener");
                KafkaConfig.class.getClassLoader().loadClass("org.apache.zookeeper.Watcher");
            } catch (ClassNotFoundException e) {
                System.out.println("Cannot find the Zookeeper libraries, zkclient-0.3.jar and zookeeper-3.3.4.jar.");
                System.out.println("Use the ZKLIB environment variable to specify the path to the Zookeeper jars files.");
                System.exit(1);
            }
        }

        @Override // org.voltdb.CLIConfig
        public void printUsage() {
            System.out.println("Usage: kafkaloader [args] -z kafka-zookeeper -t topic tablename");
            super.printUsage();
        }
    }

    /* loaded from: input_file:org/voltdb/importclient/kafka/KafkaLoader$KafkaConsumer.class */
    public static class KafkaConsumer implements Runnable {
        private final KafkaStream m_stream;
        private final CSVDataLoader m_loader;
        private final CSVParser m_csvParser = new CSVParser();
        private final Formatter m_formatter;
        private final KafkaConfig m_config;

        public KafkaConsumer(KafkaStream kafkaStream, CSVDataLoader cSVDataLoader, KafkaConfig kafkaConfig) throws ClassNotFoundException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
            this.m_stream = kafkaStream;
            this.m_loader = cSVDataLoader;
            this.m_config = kafkaConfig;
            if (this.m_config.m_formatterProperties.size() <= 0) {
                this.m_formatter = null;
                return;
            }
            this.m_formatter = (Formatter) Class.forName(this.m_config.m_formatterProperties.getProperty("formatter")).getDeclaredConstructor(String.class, Properties.class).newInstance(this.m_config.m_formatterProperties.getProperty(SnapshotUtil.JSON_FORMAT, CatalogUtil.DEFAULT_DR_CONFLICTS_EXPORT_TYPE), this.m_config.m_formatterProperties);
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v26, types: [java.lang.Object[]] */
        @Override // java.lang.Runnable
        public void run() {
            String[] parseLine;
            ConsumerIterator it = this.m_stream.iterator();
            while (it.hasNext()) {
                MessageAndMetadata next = it.next();
                byte[] bArr = (byte[]) next.message();
                long offset = next.offset();
                String str = new String(bArr);
                try {
                    if (this.m_formatter != null) {
                        try {
                            parseLine = this.m_formatter.transform(ByteBuffer.wrap(str.getBytes()));
                        } catch (FormatException e) {
                            KafkaLoader.m_log.warn("Failed to transform message: " + str);
                        }
                    } else {
                        parseLine = this.m_csvParser.parseLine(str);
                    }
                    if (parseLine != null) {
                        this.m_loader.insertRow(new RowWithMetaData(str, offset), parseLine);
                    }
                } catch (Throwable th) {
                    KafkaLoader.m_log.error("Consumer stopped", th);
                    System.exit(1);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/voltdb/importclient/kafka/KafkaLoader$KafkaConsumerConnector.class */
    public static class KafkaConsumerConnector {
        final ConsumerConfig m_consumerConfig;
        final ConsumerConnector m_consumer;
        final KafkaConfig m_config;

        public KafkaConsumerConnector(KafkaConfig kafkaConfig) throws IOException {
            this.m_config = kafkaConfig;
            String str = "voltdb-" + (this.m_config.useSuppliedProcedure ? this.m_config.procedure : this.m_config.table);
            Properties properties = new Properties();
            if (this.m_config.config.trim().isEmpty()) {
                properties.put("zookeeper.session.timeout.ms", "400");
                properties.put("zookeeper.sync.time.ms", "200");
                properties.put("auto.commit.interval.ms", "1000");
                properties.put("auto.commit.enable", "true");
                properties.put("auto.offset.reset", "smallest");
                properties.put("rebalance.backoff.ms", "10000");
            } else {
                properties.load(new FileInputStream(new File(this.m_config.config)));
                str = properties.getProperty("group.id", str);
                this.m_config.zookeeper = properties.getProperty("zookeeper.connect", this.m_config.zookeeper);
                if (properties.getProperty("zookeeper.session.timeout.ms") == null) {
                    properties.put("zookeeper.session.timeout.ms", "400");
                }
                if (properties.getProperty("zookeeper.sync.time.ms") == null) {
                    properties.put("zookeeper.sync.time.ms", "200");
                }
                if (properties.getProperty("auto.commit.interval.ms") == null) {
                    properties.put("auto.commit.interval.ms", "1000");
                }
                if (properties.getProperty("auto.commit.enable") == null) {
                    properties.put("auto.commit.enable", "true");
                }
                if (properties.getProperty("auto.offset.reset") == null) {
                    properties.put("auto.offset.reset", "smallest");
                }
                if (properties.getProperty("rebalance.backoff.ms") == null) {
                    properties.put("rebalance.backoff.ms", "10000");
                }
            }
            properties.put("group.id", str);
            properties.put("zookeeper.connect", this.m_config.zookeeper);
            this.m_consumerConfig = new ConsumerConfig(properties);
            this.m_consumer = Consumer.createJavaConsumerConnector(this.m_consumerConfig);
        }

        public void stop() {
            try {
                Thread.sleep(1100L);
            } catch (InterruptedException e) {
            } finally {
                this.m_consumer.commitOffsets();
                this.m_consumer.shutdown();
            }
        }
    }

    public KafkaLoader(KafkaConfig kafkaConfig) {
        this.m_config = kafkaConfig;
    }

    public void closeConsumer() throws InterruptedException {
        if (this.m_consumer != null) {
            this.m_consumer.stop();
            this.m_consumer = null;
        }
        if (this.m_executorService != null) {
            this.m_executorService.shutdownNow();
            this.m_executorService.awaitTermination(365L, TimeUnit.DAYS);
            this.m_executorService = null;
        }
    }

    public void close() {
        try {
            closeConsumer();
            this.m_loader.close();
            if (this.m_client != null) {
                this.m_client.close();
                this.m_client = null;
            }
        } catch (Exception e) {
        }
    }

    public void processKafkaMessages() throws Exception {
        ClientConfig clientConfig;
        if (this.m_config.credentials != null && !this.m_config.credentials.trim().isEmpty()) {
            Properties readPropertiesFromCredentials = MiscUtils.readPropertiesFromCredentials(this.m_config.credentials);
            this.m_config.user = readPropertiesFromCredentials.getProperty("username");
            this.m_config.password = readPropertiesFromCredentials.getProperty(Constants.DEFAULT_KEYSTORE_PASSWD);
        }
        String[] split = this.m_config.servers.split(CatalogUtil.SIGNATURE_DELIMITER);
        this.m_config.password = CLIConfig.readPasswordIfNeeded(this.m_config.user, this.m_config.password, "Enter password: ");
        AutoReconnectListener autoReconnectListener = new AutoReconnectListener();
        if (this.m_config.stopondisconnect) {
            clientConfig = new ClientConfig(this.m_config.user, this.m_config.password, null);
            clientConfig.setReconnectOnConnectionLoss(false);
        } else {
            clientConfig = new ClientConfig(this.m_config.user, this.m_config.password, autoReconnectListener);
            clientConfig.setReconnectOnConnectionLoss(true);
        }
        if (this.m_config.ssl != null && !this.m_config.ssl.trim().isEmpty()) {
            clientConfig.setTrustStoreConfigFromPropertyFile(this.m_config.ssl);
            clientConfig.enableSSL();
        }
        clientConfig.setProcedureCallTimeout(0L);
        this.m_client = getClient(clientConfig, split, this.m_config.port);
        if (this.m_config.useSuppliedProcedure) {
            this.m_loader = new CSVTupleDataLoader((ClientImpl) this.m_client, this.m_config.procedure, new KafkaBulkLoaderCallback());
        } else {
            this.m_loader = new CSVBulkDataLoader((ClientImpl) this.m_client, this.m_config.table, this.m_config.batch, this.m_config.update, new KafkaBulkLoaderCallback());
        }
        if (!this.m_config.stopondisconnect) {
            autoReconnectListener.setLoader(this.m_loader);
        }
        this.m_loader.setFlushInterval(this.m_config.flush, this.m_config.flush);
        this.m_consumer = new KafkaConsumerConnector(this.m_config);
        try {
            this.m_executorService = getConsumerExecutor(this.m_consumer, this.m_loader);
            if (this.m_config.useSuppliedProcedure) {
                m_log.info("Kafka Consumer from topic: " + this.m_config.topic + " Started using procedure: " + this.m_config.procedure);
            } else {
                m_log.info("Kafka Consumer from topic: " + this.m_config.topic + " Started for table: " + this.m_config.table);
            }
            this.m_executorService.awaitTermination(365L, TimeUnit.DAYS);
        } catch (Throwable th) {
            m_log.error("Error in Kafka Consumer", th);
            System.exit(-1);
        }
        close();
    }

    private ExecutorService getConsumerExecutor(KafkaConsumerConnector kafkaConsumerConnector, CSVDataLoader cSVDataLoader) throws Exception {
        HashMap hashMap = new HashMap();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.m_config.kpartitions);
        hashMap.put(this.m_config.topic, Integer.valueOf(this.m_config.kpartitions));
        Iterator it = ((List) kafkaConsumerConnector.m_consumer.createMessageStreams(hashMap).get(this.m_config.topic)).iterator();
        while (it.hasNext()) {
            newFixedThreadPool.submit(new KafkaConsumer((KafkaStream) it.next(), cSVDataLoader, this.m_config));
        }
        return newFixedThreadPool;
    }

    public static Client getClient(ClientConfig clientConfig, String[] strArr, int i) throws Exception {
        clientConfig.setTopologyChangeAware(true);
        Client createClient = ClientFactory.createClient(clientConfig);
        for (String str : strArr) {
            try {
                createClient.createConnection(str.trim(), i);
            } catch (IOException e) {
            }
        }
        if (!createClient.getConnectedHostList().isEmpty()) {
            return createClient;
        }
        try {
            createClient.close();
        } catch (Exception e2) {
        }
        throw new Exception("Unable to connect to any servers");
    }

    public static void main(String[] strArr) {
        KafkaConfig kafkaConfig = new KafkaConfig();
        kafkaConfig.parse(KafkaLoader.class.getName(), strArr);
        try {
            if (!kafkaConfig.formatter.trim().isEmpty()) {
                kafkaConfig.m_formatterProperties.load(new FileInputStream(kafkaConfig.formatter));
                String property = kafkaConfig.m_formatterProperties.getProperty("formatter");
                if (property == null || property.trim().isEmpty()) {
                    m_log.error("formatter class must be specified in formatter file as formatter=<class>: " + kafkaConfig.formatter);
                    System.exit(-1);
                }
            }
            new KafkaLoader(kafkaConfig).processKafkaMessages();
        } catch (Exception e) {
            m_log.error("Failure in kafkaloader", e);
            System.exit(-1);
        }
        System.exit(0);
    }
}
