package org.voltdb.importclient.kafka10;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
import org.voltcore.logging.VoltLogger;
import org.voltdb.CLIConfig;
import org.voltdb.client.AutoReconnectListener;
import org.voltdb.client.Client;
import org.voltdb.client.ClientConfig;
import org.voltdb.client.ClientFactory;
import org.voltdb.client.ClientImpl;
import org.voltdb.client.ClientResponse;
import org.voltdb.common.Constants;
import org.voltdb.importer.ImporterLifecycle;
import org.voltdb.utils.BulkLoaderErrorHandler;
import org.voltdb.utils.CSVBulkDataLoader;
import org.voltdb.utils.CSVDataLoader;
import org.voltdb.utils.CSVTupleDataLoader;
import org.voltdb.utils.MiscUtils;
import org.voltdb.utils.RowWithMetaData;

/* loaded from: input_file:org/voltdb/importclient/kafka10/KafkaLoader.class */
public class KafkaLoader implements ImporterLifecycle {
    private static final VoltLogger LOGGER = new VoltLogger("KAFKALOADER10");
    private static final AtomicLong FAILED_COUNT = new AtomicLong(0);
    private final KafkaLoaderCLIArguments m_cliOptions;
    private List<KafkaExternalConsumerRunner> m_consumers;
    private CSVDataLoader m_loader = null;
    private Client m_client = null;
    private ExecutorService m_executorService = null;
    private final AtomicBoolean m_shutdown = new AtomicBoolean(false);
    private volatile boolean m_stopping = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/voltdb/importclient/kafka10/KafkaLoader$KafkaBulkLoaderCallback.class */
    public class KafkaBulkLoaderCallback implements BulkLoaderErrorHandler {
        KafkaBulkLoaderCallback() {
        }

        @Override // org.voltdb.utils.BulkLoaderErrorHandler
        public boolean handleError(RowWithMetaData rowWithMetaData, ClientResponse clientResponse, String str) {
            byte status;
            if (KafkaLoader.this.m_cliOptions.maxerrors <= 0 || clientResponse == null || (status = clientResponse.getStatus()) == 1) {
                return false;
            }
            KafkaLoader.LOGGER.error("Failed to insert: " + rowWithMetaData.rawLine);
            if (KafkaLoader.FAILED_COUNT.incrementAndGet() <= KafkaLoader.this.m_cliOptions.maxerrors && (status == -1 || status == -2)) {
                return false;
            }
            KafkaLoader.this.notifyShutdown();
            return true;
        }

        @Override // org.voltdb.utils.BulkLoaderErrorHandler
        public boolean hasReachedErrorLimit() {
            return KafkaLoader.this.m_cliOptions.maxerrors > 0 && KafkaLoader.FAILED_COUNT.get() > ((long) KafkaLoader.this.m_cliOptions.maxerrors);
        }
    }

    public KafkaLoader(KafkaLoaderCLIArguments kafkaLoaderCLIArguments) {
        this.m_cliOptions = kafkaLoaderCLIArguments;
    }

    @Override // org.voltdb.importer.ImporterLifecycle
    public boolean shouldRun() {
        return !this.m_stopping;
    }

    @Override // org.voltdb.importer.ImporterLifecycle
    public void stop() {
        this.m_stopping = true;
    }

    @Override // org.voltdb.importer.ImporterLifecycle
    public boolean hasTransaction() {
        return false;
    }

    void close() {
        if (this.m_executorService != null) {
            try {
                this.m_executorService.shutdownNow();
                this.m_executorService.awaitTermination(365L, TimeUnit.DAYS);
            } catch (Throwable th) {
            } finally {
                this.m_executorService = null;
            }
        }
        if (this.m_loader != null) {
            try {
                this.m_loader.close();
                this.m_loader = null;
            } catch (Throwable th2) {
            } finally {
                this.m_loader = null;
            }
        }
        if (this.m_client != null) {
            try {
                this.m_client.close();
                this.m_client = null;
            } catch (Throwable th3) {
                this.m_client = null;
                throw th3;
            }
        }
    }

    private Properties getKafkaConfigFromCLIArguments() throws IOException {
        Properties properties = new Properties();
        String str = "voltdb-" + (this.m_cliOptions.useSuppliedProcedure ? this.m_cliOptions.procedure : this.m_cliOptions.table);
        if (this.m_cliOptions.config.trim().isEmpty()) {
            properties.put("auto.offset.reset", "earliest");
        } else {
            properties.load(new FileInputStream(new File(this.m_cliOptions.config)));
            str = properties.getProperty("group.id", str);
            this.m_cliOptions.brokers = properties.getProperty("bootstrap.servers", this.m_cliOptions.brokers);
            String property = properties.getProperty("enable.auto.commit");
            if (property != null && !property.trim().isEmpty() && !"true".equals(property.trim().toLowerCase())) {
                LOGGER.warn("Auto commit policy for Kafka loader will be set to 'true' instead of '" + property + "'");
            }
            if (properties.getProperty("auto.offset.reset") == null) {
                properties.put("auto.offset.reset", "earliest");
            }
        }
        properties.put("group.id", str);
        properties.put("bootstrap.servers", this.m_cliOptions.brokers);
        properties.put("key.deserializer", ByteBufferDeserializer.class.getName());
        properties.put("value.deserializer", ByteBufferDeserializer.class.getName());
        properties.put("enable.auto.commit", "true");
        properties.put("max.poll.records", Integer.valueOf(this.m_cliOptions.getMaxPollRecords()));
        properties.put("max.poll.interval.ms", Integer.valueOf(this.m_cliOptions.getMaxPollInterval()));
        properties.put("session.timeout.ms", Integer.valueOf(this.m_cliOptions.getSessionTimeout()));
        properties.put("request.timeout.ms", Integer.valueOf(this.m_cliOptions.getRequestTimeout()));
        return properties;
    }

    private ExecutorService getExecutor() throws Exception {
        Properties kafkaConfigFromCLIArguments = getKafkaConfigFromCLIArguments();
        KafkaLoaderConfig kafkaLoaderConfig = new KafkaLoaderConfig(this.m_cliOptions);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.m_cliOptions.getConsumerCount());
        this.m_consumers = new ArrayList();
        for (int i = 0; i < this.m_cliOptions.getConsumerCount(); i++) {
            try {
                this.m_consumers.add(new KafkaExternalConsumerRunner(this, kafkaLoaderConfig, new KafkaConsumer(kafkaConfigFromCLIArguments), this.m_loader));
            } catch (KafkaException e) {
                LOGGER.error("Couldn't create Kafka consumer. Please check the configuration paramaters. Error:" + e.getMessage());
            } catch (Throwable th) {
                LOGGER.error("Failed creating Kafka consumer ", th);
            }
        }
        if (this.m_consumers.size() != this.m_cliOptions.getConsumerCount()) {
            Iterator<KafkaExternalConsumerRunner> it = this.m_consumers.iterator();
            while (it.hasNext()) {
                it.next().shutdown();
            }
            return null;
        }
        Iterator<KafkaExternalConsumerRunner> it2 = this.m_consumers.iterator();
        while (it2.hasNext()) {
            newFixedThreadPool.submit(it2.next());
        }
        return newFixedThreadPool;
    }

    public void notifyShutdown() {
        if (this.m_shutdown.compareAndSet(false, true)) {
            Iterator<KafkaExternalConsumerRunner> it = this.m_consumers.iterator();
            while (it.hasNext()) {
                it.next().shutdown();
            }
            close();
        }
    }

    private void processKafkaMessages() throws Exception {
        ClientConfig clientConfig;
        if (this.m_cliOptions.credentials != null && !this.m_cliOptions.credentials.trim().isEmpty()) {
            Properties readPropertiesFromCredentials = MiscUtils.readPropertiesFromCredentials(this.m_cliOptions.credentials);
            this.m_cliOptions.user = readPropertiesFromCredentials.getProperty("username");
            this.m_cliOptions.password = readPropertiesFromCredentials.getProperty(Constants.DEFAULT_KEYSTORE_PASSWD);
        }
        this.m_cliOptions.password = CLIConfig.readPasswordIfNeeded(this.m_cliOptions.user, this.m_cliOptions.password, "Enter password: ");
        AutoReconnectListener autoReconnectListener = new AutoReconnectListener();
        if (this.m_cliOptions.stopondisconnect) {
            clientConfig = new ClientConfig(this.m_cliOptions.user, this.m_cliOptions.password, null);
            clientConfig.setReconnectOnConnectionLoss(false);
        } else {
            clientConfig = new ClientConfig(this.m_cliOptions.user, this.m_cliOptions.password, autoReconnectListener);
            clientConfig.setReconnectOnConnectionLoss(true);
        }
        if (this.m_cliOptions.ssl != null && !this.m_cliOptions.ssl.trim().isEmpty()) {
            clientConfig.setTrustStoreConfigFromPropertyFile(this.m_cliOptions.ssl);
            clientConfig.enableSSL();
        }
        clientConfig.setProcedureCallTimeout(0L);
        this.m_client = getVoltClient(clientConfig, this.m_cliOptions.getVoltHosts());
        if (this.m_cliOptions.useSuppliedProcedure) {
            this.m_loader = new CSVTupleDataLoader((ClientImpl) this.m_client, this.m_cliOptions.procedure, new KafkaBulkLoaderCallback());
        } else {
            this.m_loader = new CSVBulkDataLoader((ClientImpl) this.m_client, this.m_cliOptions.table, this.m_cliOptions.batch, this.m_cliOptions.update, new KafkaBulkLoaderCallback());
        }
        this.m_loader.setFlushInterval(this.m_cliOptions.flush, this.m_cliOptions.flush);
        if (!this.m_cliOptions.stopondisconnect) {
            autoReconnectListener.setLoader(this.m_loader);
        }
        ExecutorService executor = getExecutor();
        this.m_executorService = executor;
        if (executor != null) {
            if (this.m_cliOptions.useSuppliedProcedure) {
                LOGGER.info("Kafka Consumer from topic: " + this.m_cliOptions.topic + " Started using procedure: " + this.m_cliOptions.procedure);
            } else {
                LOGGER.info("Kafka Consumer from topic: " + this.m_cliOptions.topic + " Started for table: " + this.m_cliOptions.table);
            }
            this.m_executorService.shutdown();
            this.m_executorService.awaitTermination(365L, TimeUnit.DAYS);
            this.m_executorService = null;
        }
    }

    private static Client getVoltClient(ClientConfig clientConfig, List<String> list) throws Exception {
        clientConfig.setTopologyChangeAware(true);
        Client createClient = ClientFactory.createClient(clientConfig);
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            try {
                createClient.createConnection(it.next());
            } catch (IOException e) {
            }
        }
        if (!createClient.getConnectedHostList().isEmpty()) {
            return createClient;
        }
        try {
            createClient.close();
        } catch (Exception e2) {
        }
        throw new Exception("Unable to connect to any servers");
    }

    public static void main(String[] strArr) {
        KafkaLoaderCLIArguments kafkaLoaderCLIArguments = new KafkaLoaderCLIArguments();
        kafkaLoaderCLIArguments.parse(KafkaLoader.class.getName(), strArr);
        KafkaLoader kafkaLoader = new KafkaLoader(kafkaLoaderCLIArguments);
        try {
            try {
                kafkaLoader.processKafkaMessages();
                kafkaLoader.close();
            } catch (Exception e) {
                LOGGER.error("Failure in KafkaLoader10 ", e);
                kafkaLoader.close();
            }
            System.exit(0);
        } catch (Throwable th) {
            kafkaLoader.close();
            throw th;
        }
    }
}
