package org.voltdb.importclient.kafka;

import java.net.URI;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons_voltpatches.cli.HelpFormatter;
import org.voltcore.logging.Level;
import org.voltcore.logging.VoltLogger;
import org.voltcore.utils.CoreUtils;
import org.voltdb.CLIConfig;
import org.voltdb.ClientResponseImpl;
import org.voltdb.client.Client;
import org.voltdb.client.ClientConfig;
import org.voltdb.client.ClientFactory;
import org.voltdb.client.ClientImpl;
import org.voltdb.client.ClientResponse;
import org.voltdb.client.VoltBulkLoader.BulkLoaderSuccessCallback;
import org.voltdb.importclient.kafka.util.HostAndPort;
import org.voltdb.importclient.kafka.util.KafkaUtils;
import org.voltdb.importclient.kafka.util.ProcedureInvocationCallback;
import org.voltdb.importer.ImporterLifecycle;
import org.voltdb.importer.ImporterLogger;
import org.voltdb.importer.formatter.FormatterBuilder;
import org.voltdb.utils.BulkLoaderErrorHandler;
import org.voltdb.utils.CSVBulkDataLoader;
import org.voltdb.utils.CSVDataLoader;
import org.voltdb.utils.CSVTupleDataLoader;
import org.voltdb.utils.CatalogUtil;
import org.voltdb.utils.RowWithMetaData;

/* loaded from: input_file:org/voltdb/importclient/kafka/KafkaExternalLoader.class */
public class KafkaExternalLoader implements ImporterLifecycle, ImporterLogger {
    private static final int LOG_SUPPRESSION_INTERVAL_SECONDS = 60;
    private final KafkaExternalLoaderCLIArguments m_args;
    private CSVDataLoader m_loader = null;
    private Client m_client = null;
    private ExecutorService m_executorService = null;
    private ExecutorService m_callbackExecutor = null;
    private volatile boolean m_stopping = false;
    private static final VoltLogger m_log = new VoltLogger("KAFKA-EXTERNAL-LOADER");
    private static final AtomicLong m_failedCount = new AtomicLong(0);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/voltdb/importclient/kafka/KafkaExternalLoader$KafkaBulkLoaderCallback.class */
    public class KafkaBulkLoaderCallback implements BulkLoaderErrorHandler, BulkLoaderSuccessCallback {
        private KafkaBulkLoaderCallback() {
        }

        @Override // org.voltdb.client.VoltBulkLoader.BulkLoaderSuccessCallback
        public void success(Object obj, ClientResponse clientResponse) {
            try {
                ((RowWithMetaData) obj).procedureCallback.clientCallback(clientResponse);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        @Override // org.voltdb.utils.BulkLoaderErrorHandler
        public boolean handleError(RowWithMetaData rowWithMetaData, ClientResponse clientResponse, String str) {
            if (KafkaExternalLoader.this.m_args.maxerrors <= 0 || clientResponse == null) {
                return false;
            }
            byte status = clientResponse.getStatus();
            if (status == 1) {
                if (rowWithMetaData.procedureCallback == null) {
                    return false;
                }
                try {
                    rowWithMetaData.procedureCallback.clientCallback(clientResponse);
                    return false;
                } catch (Exception e) {
                    return false;
                }
            }
            KafkaExternalLoader.m_log.error("Failed to Insert Row: error=" + str + " response=" + ((ClientResponseImpl) clientResponse).toJSONString() + " data=" + rowWithMetaData.rawLine);
            long incrementAndGet = KafkaExternalLoader.m_failedCount.incrementAndGet();
            String str2 = null;
            if (KafkaExternalLoader.this.m_args.maxerrors > 0 && incrementAndGet > KafkaExternalLoader.this.m_args.maxerrors) {
                str2 = "Max error count reached, exiting.";
            } else if (status != -1 && status != -2) {
                str2 = "Error response received from database, exiting; status=" + Byte.toString(status);
            }
            if (str2 == null) {
                return false;
            }
            KafkaExternalLoader.m_log.error(str2);
            try {
                KafkaExternalLoader.this.closeExecutors();
                return true;
            } catch (InterruptedException e2) {
                return true;
            }
        }

        @Override // org.voltdb.utils.BulkLoaderErrorHandler
        public boolean hasReachedErrorLimit() {
            return KafkaExternalLoader.this.m_args.maxerrors > 0 && KafkaExternalLoader.m_failedCount.get() > ((long) KafkaExternalLoader.this.m_args.maxerrors);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/voltdb/importclient/kafka/KafkaExternalLoader$LoaderTopicPartitionImporter.class */
    public class LoaderTopicPartitionImporter extends BaseKafkaTopicPartitionImporter implements Runnable {
        public LoaderTopicPartitionImporter(KafkaStreamImporterConfig kafkaStreamImporterConfig, ImporterLifecycle importerLifecycle, ImporterLogger importerLogger) {
            super(kafkaStreamImporterConfig, importerLifecycle, importerLogger);
        }

        @Override // org.voltdb.importclient.kafka.BaseKafkaTopicPartitionImporter
        public boolean invoke(Object[] objArr, ProcedureInvocationCallback procedureInvocationCallback) {
            try {
                KafkaExternalLoader.this.m_loader.insertRow(new RowWithMetaData(StringUtils.join(objArr, CatalogUtil.SIGNATURE_DELIMITER), procedureInvocationCallback.getOffset(), procedureInvocationCallback), objArr);
                return true;
            } catch (Exception e) {
                KafkaExternalLoader.m_log.error("Exception from loader while inserting row", e);
                return false;
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            accept();
        }
    }

    public KafkaExternalLoader(KafkaExternalLoaderCLIArguments kafkaExternalLoaderCLIArguments) {
        this.m_args = kafkaExternalLoaderCLIArguments;
    }

    public void initialize() throws Exception {
        try {
            KafkaExternalLoader.class.getClassLoader().loadClass("org.I0Itec.zkclient.IZkStateListener");
            KafkaExternalLoader.class.getClassLoader().loadClass("org.apache.zookeeper.Watcher");
            this.m_args.password = CLIConfig.readPasswordIfNeeded(this.m_args.user, this.m_args.password, "Enter password: ");
            ClientConfig clientConfig = new ClientConfig(this.m_args.user, this.m_args.password, null);
            if (this.m_args.ssl != null && !this.m_args.ssl.trim().isEmpty()) {
                clientConfig.setTrustStoreConfigFromPropertyFile(this.m_args.ssl);
                clientConfig.enableSSL();
            }
            clientConfig.setProcedureCallTimeout(0L);
            this.m_client = getVoltClient(clientConfig, this.m_args.getVoltHosts());
            KafkaBulkLoaderCallback kafkaBulkLoaderCallback = new KafkaBulkLoaderCallback();
            if (this.m_args.useSuppliedProcedure) {
                this.m_callbackExecutor = CoreUtils.getSingleThreadExecutor(((this.m_args.useSuppliedProcedure ? this.m_args.procedure : this.m_args.table) + "-callbackproc") + HelpFormatter.DEFAULT_OPT_PREFIX + Thread.currentThread().getName());
                this.m_loader = new CSVTupleDataLoader((ClientImpl) this.m_client, this.m_args.procedure, kafkaBulkLoaderCallback, this.m_callbackExecutor, kafkaBulkLoaderCallback);
            } else {
                this.m_loader = new CSVBulkDataLoader((ClientImpl) this.m_client, this.m_args.table, this.m_args.batch, this.m_args.update, kafkaBulkLoaderCallback, kafkaBulkLoaderCallback);
            }
            this.m_loader.setFlushInterval(this.m_args.flush, this.m_args.flush);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException("Cannot find the Zookeeper client libraries, zkclient-0.3.jar and zookeeper-3.3.4.jar. Use the ZKLIB environment variable to specify the path to the Zookeeper jars files.");
        }
    }

    private void processKafkaMessages() throws Exception {
        try {
            this.m_executorService = createImporterExecutor(this, this);
            if (this.m_args.useSuppliedProcedure) {
                m_log.info("Kafka Consumer from topic: " + this.m_args.topic + " Started using procedure: " + this.m_args.procedure);
            } else {
                m_log.info("Kafka Consumer from topic: " + this.m_args.topic + " Started for table: " + this.m_args.table);
            }
            this.m_executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        } catch (Throwable th) {
            m_log.error("Error in Kafka Consumer", th);
        } finally {
            close();
        }
        System.exit(-1);
    }

    @Override // org.voltdb.importer.ImporterLifecycle
    public boolean shouldRun() {
        return !this.m_stopping;
    }

    @Override // org.voltdb.importer.ImporterLifecycle
    public void stop() {
        this.m_stopping = true;
    }

    @Override // org.voltdb.importer.ImporterLogger
    public void rateLimitedLog(Level level, Throwable th, String str, Object... objArr) {
        m_log.rateLimitedLog(60L, level, th, str, objArr);
    }

    @Override // org.voltdb.importer.ImporterLogger
    public void info(Throwable th, String str, Object... objArr) {
        m_log.info(String.format(str, objArr), th);
    }

    @Override // org.voltdb.importer.ImporterLogger
    public void warn(Throwable th, String str, Object... objArr) {
        m_log.warn(String.format(str, objArr), th);
    }

    @Override // org.voltdb.importer.ImporterLogger
    public void error(Throwable th, String str, Object... objArr) {
        m_log.error(String.format(str, objArr), th);
    }

    @Override // org.voltdb.importer.ImporterLogger
    public void debug(Throwable th, String str, Object... objArr) {
        m_log.debug(String.format(str, objArr), th);
    }

    @Override // org.voltdb.importer.ImporterLogger
    public boolean isDebugEnabled() {
        return m_log.isDebugEnabled();
    }

    @Override // org.voltdb.importer.ImporterLifecycle
    public boolean hasTransaction() {
        return true;
    }

    private ExecutorService createImporterExecutor(ImporterLifecycle importerLifecycle, ImporterLogger importerLogger) throws Exception {
        Map<URI, KafkaStreamImporterConfig> createKafkaImporterConfigFromProperties = createKafkaImporterConfigFromProperties(this.m_args);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(createKafkaImporterConfigFromProperties.size());
        m_log.info("Created " + createKafkaImporterConfigFromProperties.size() + " configurations for partitions:");
        for (URI uri : createKafkaImporterConfigFromProperties.keySet()) {
            m_log.info(HelpFormatter.DEFAULT_LONG_OPT_SEPARATOR + uri);
            newFixedThreadPool.submit(new LoaderTopicPartitionImporter(createKafkaImporterConfigFromProperties.get(uri), importerLifecycle, importerLogger));
        }
        return newFixedThreadPool;
    }

    private Map<URI, KafkaStreamImporterConfig> createKafkaImporterConfigFromProperties(KafkaExternalLoaderCLIArguments kafkaExternalLoaderCLIArguments) throws Exception {
        String trim;
        List<HostAndPort> list;
        if (kafkaExternalLoaderCLIArguments.zookeeper.trim().isEmpty()) {
            trim = kafkaExternalLoaderCLIArguments.brokers.trim();
            list = (List) Arrays.stream(trim.split(CatalogUtil.SIGNATURE_DELIMITER)).map(str -> {
                return HostAndPort.fromString(str);
            }).collect(Collectors.toList());
        } else {
            list = KafkaUtils.getBrokersFromZookeeper(kafkaExternalLoaderCLIArguments.zookeeper, kafkaExternalLoaderCLIArguments.zookeeperSessionTimeoutMillis);
            trim = StringUtils.join((Iterable) list.stream().map(hostAndPort -> {
                return hostAndPort.getHost() + ":" + hostAndPort.getPort();
            }).collect(Collectors.toList()), CatalogUtil.SIGNATURE_DELIMITER);
        }
        return KafkaStreamImporterConfig.getConfigsForPartitions(KafkaUtils.getNormalizedKey(trim), list, kafkaExternalLoaderCLIArguments.topic, kafkaExternalLoaderCLIArguments.groupid, kafkaExternalLoaderCLIArguments.procedure, kafkaExternalLoaderCLIArguments.timeout, kafkaExternalLoaderCLIArguments.buffersize, kafkaExternalLoaderCLIArguments.commitpolicy, FormatterBuilder.createFormatterBuilder(kafkaExternalLoaderCLIArguments.formatterProperties));
    }

    private static Client getVoltClient(ClientConfig clientConfig, List<String> list) throws Exception {
        Client createClient = ClientFactory.createClient(clientConfig);
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            createClient.createConnection(it.next());
        }
        return createClient;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeExecutors() throws InterruptedException {
        stop();
        if (this.m_executorService != null) {
            this.m_executorService.shutdownNow();
            this.m_executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
            this.m_executorService = null;
        }
        if (this.m_callbackExecutor != null) {
            this.m_callbackExecutor.shutdownNow();
            this.m_callbackExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
            this.m_callbackExecutor = null;
        }
    }

    private void close() {
        try {
            closeExecutors();
            if (this.m_loader != null) {
                this.m_loader.close();
                this.m_loader = null;
            }
            if (this.m_client != null) {
                this.m_client.close();
                this.m_client = null;
            }
        } catch (Exception e) {
        }
    }

    public static void main(String[] strArr) {
        System.out.println("Support for Kafka version 0.8 has been deprecated. The default kafkaloader is Kafka version 0.10.");
        KafkaExternalLoaderCLIArguments kafkaExternalLoaderCLIArguments = new KafkaExternalLoaderCLIArguments();
        kafkaExternalLoaderCLIArguments.parse(KafkaExternalLoader.class.getName(), strArr);
        KafkaExternalLoader kafkaExternalLoader = new KafkaExternalLoader(kafkaExternalLoaderCLIArguments);
        try {
            kafkaExternalLoader.initialize();
            kafkaExternalLoader.processKafkaMessages();
        } catch (Exception e) {
            kafkaExternalLoader.close();
            m_log.error("Exception in KafkaExternalLoader", e);
            System.exit(-1);
        }
        System.exit(0);
    }
}
