package org.voltdb.importclient.kafka;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.cluster.Broker;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import org.apache.log4j.Logger;
import org.hsqldb_voltpatches.Tokens;
import org.voltdb.importclient.ImportBaseException;
import org.voltdb.importclient.kafka.util.HostAndPort;
import org.voltdb.importclient.kafka.util.KafkaCommitPolicy;
import org.voltdb.importclient.kafka.util.KafkaConstants;
import org.voltdb.importclient.kafka.util.KafkaUtils;
import org.voltdb.importer.ImporterConfig;
import org.voltdb.importer.formatter.FormatterBuilder;

/* loaded from: input_file:org/voltdb/importclient/kafka/KafkaStreamImporterConfig.class */
public class KafkaStreamImporterConfig implements ImporterConfig {
    private static final Logger m_logger = Logger.getLogger("IMPORT");
    private final URI m_uri;
    private final List<HostAndPort> m_brokers;
    private final String m_topic;
    private final String m_groupId;
    private final int m_fetchSize;
    private final int m_soTimeout;
    private final String m_procedure;
    private final int m_partition;
    private HostAndPort m_partitionLeader;
    private final FormatterBuilder m_formatterBuilder;
    private final KafkaCommitPolicy m_commitPolicy;
    private final long m_triggerValue;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/voltdb/importclient/kafka/KafkaStreamImporterConfig$FailedMetaDataAttempt.class */
    public static final class FailedMetaDataAttempt {
        final String msg;
        final Throwable cause;

        FailedMetaDataAttempt(String str, Throwable th) {
            this.cause = th;
            this.msg = str;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void log() {
            KafkaStreamImporterConfig.m_logger.error(this.msg, this.cause);
        }
    }

    /* loaded from: input_file:org/voltdb/importclient/kafka/KafkaStreamImporterConfig$KafkaConfigurationException.class */
    public static class KafkaConfigurationException extends ImportBaseException {
        private static final long serialVersionUID = -3413349105074207334L;

        public KafkaConfigurationException() {
        }

        public KafkaConfigurationException(String str, Object... objArr) {
            super(str, objArr);
        }

        public KafkaConfigurationException(String str, Throwable th, Object... objArr) {
            super(str, th, objArr);
        }

        public KafkaConfigurationException(Throwable th) {
            super(th);
        }
    }

    public KafkaStreamImporterConfig(URI uri, List<HostAndPort> list, String str, int i, HostAndPort hostAndPort, String str2, int i2, int i3, String str3, String str4, FormatterBuilder formatterBuilder) {
        this.m_uri = uri;
        this.m_brokers = list;
        this.m_topic = str;
        this.m_partition = i;
        this.m_partitionLeader = hostAndPort;
        this.m_groupId = str2;
        this.m_fetchSize = i2;
        this.m_soTimeout = i3;
        this.m_procedure = str3;
        this.m_commitPolicy = KafkaCommitPolicy.fromString(str4);
        this.m_triggerValue = KafkaCommitPolicy.fromStringTriggerValue(str4, this.m_commitPolicy);
        this.m_formatterBuilder = formatterBuilder;
    }

    public List<HostAndPort> getBrokers() {
        return this.m_brokers;
    }

    public String getTopic() {
        return this.m_topic;
    }

    public String getGroupId() {
        return this.m_groupId;
    }

    public int getFetchSize() {
        return this.m_fetchSize;
    }

    public int getSocketTimeout() {
        return this.m_soTimeout;
    }

    public String getProcedure() {
        return this.m_procedure;
    }

    public int getPartition() {
        return this.m_partition;
    }

    public HostAndPort getPartitionLeader() {
        return this.m_partitionLeader;
    }

    public void setPartitionLeader(HostAndPort hostAndPort) {
        this.m_partitionLeader = hostAndPort;
    }

    @Override // org.voltdb.importer.ImporterConfig
    public URI getResourceID() {
        return this.m_uri;
    }

    public KafkaCommitPolicy getCommitPolicy() {
        return this.m_commitPolicy;
    }

    public long getTriggerValue() {
        return this.m_triggerValue;
    }

    public static Map<URI, ImporterConfig> createConfigEntries(Properties properties, FormatterBuilder formatterBuilder) {
        String trim = properties.getProperty("brokers", "").trim();
        if (trim.isEmpty()) {
            throw new IllegalArgumentException("Missing kafka broker");
        }
        String normalizedKey = KafkaUtils.getNormalizedKey(trim);
        List asList = Arrays.asList(trim.split("\\s*,\\s*"));
        if (asList == null || asList.isEmpty()) {
            throw new IllegalArgumentException("Missing kafka broker");
        }
        ArrayList arrayList = new ArrayList();
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            arrayList.add(HostAndPort.fromString((String) it.next()));
        }
        if (arrayList.isEmpty()) {
            throw new IllegalArgumentException("Missing or misconfigured kafka broker list. See brokers property");
        }
        String trim2 = properties.getProperty("procedure", "").trim();
        if (trim2.isEmpty()) {
            throw new IllegalArgumentException("Missing procedure.");
        }
        String trim3 = properties.getProperty("topics", "").trim();
        if (trim3.isEmpty()) {
            throw new IllegalArgumentException("Missing topic(s).");
        }
        String trim4 = properties.getProperty("groupid", KafkaConstants.GROUP_ID).trim();
        int parseInt = Integer.parseInt(properties.getProperty("fetch.message.max.bytes", "65536"));
        int parseInt2 = Integer.parseInt(properties.getProperty("socket.timeout.ms", "30000"));
        List<String> asList2 = Arrays.asList(trim3.split("\\s*,\\s*"));
        if (asList2 == null || asList2.isEmpty()) {
            throw new IllegalArgumentException("Missing topic(s).");
        }
        String property = properties.getProperty("commit.policy", "none");
        HashMap hashMap = new HashMap();
        for (String str : asList2) {
            if (str.length() > 255) {
                throw new IllegalArgumentException("topic name is illegal, can't be longer than 255 characters");
            }
            if (!KafkaConstants.TOPIC_LEGAL_NAMES_PATTERN.matcher(str).matches()) {
                throw new IllegalArgumentException("topic name " + str + " is illegal, contains a character other than ASCII alphanumerics, '_' and '-'");
            }
            try {
                hashMap.putAll(getConfigsForPartitions(normalizedKey, arrayList, str, trim4, trim2, parseInt2, parseInt, property, formatterBuilder));
            } catch (Exception e) {
                m_logger.warn(String.format("Error trying to get partition information for topic [%s] on host [%s]", str, ((HostAndPort) arrayList.get(0)).getHost()), e);
            }
        }
        return hashMap;
    }

    public static Map<URI, KafkaStreamImporterConfig> getConfigsForPartitions(String str, List<HostAndPort> list, String str2, String str3, String str4, int i, int i2, String str5, FormatterBuilder formatterBuilder) {
        SimpleConsumer simpleConsumer = null;
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        Iterator<HostAndPort> it = list.iterator();
        while (hashMap.isEmpty() && it.hasNext()) {
            HostAndPort next = it.next();
            try {
                try {
                    simpleConsumer = new SimpleConsumer(next.getHost(), next.getPort(), i, i2, KafkaConstants.CLIENT_ID);
                    List<TopicMetadata> list2 = simpleConsumer.send(new TopicMetadataRequest(Collections.singletonList(str2))).topicsMetadata();
                    if (list2 == null || list2.isEmpty()) {
                        arrayList.add(new FailedMetaDataAttempt("Failed to get topic metadata for topic " + str2 + " from host " + next.getHost(), null));
                        closeConsumer(simpleConsumer);
                        simpleConsumer = null;
                        closeConsumer(null);
                    } else {
                        int i3 = 0;
                        for (TopicMetadata topicMetadata : list2) {
                            if (!topicMetadata.partitionsMetadata().isEmpty()) {
                                for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
                                    i3++;
                                    try {
                                        URI uri = new URI("kafka", str, str2 + "/partition/" + partitionMetadata.partitionId() + Tokens.T_DIVIDE + str3 + Tokens.T_DIVIDE);
                                        Broker leader = partitionMetadata.leader();
                                        if (leader == null) {
                                            arrayList.add(new FailedMetaDataAttempt("Failed to get leader broker for topic " + str2 + " partition " + partitionMetadata.partitionId() + " from host " + next.getHost(), null));
                                        } else {
                                            hashMap.put(uri, new KafkaStreamImporterConfig(uri, list, str2, partitionMetadata.partitionId(), new HostAndPort(leader.host(), leader.port()), str3, i2, i, str4, str5, formatterBuilder));
                                        }
                                    } catch (URISyntaxException e) {
                                        throw new KafkaConfigurationException("unable to create topic resource URI", e, new Object[0]);
                                        break;
                                    }
                                }
                            } else {
                                arrayList.add(new FailedMetaDataAttempt("Failed to get partition metadata for topic " + str2 + " from host " + next.getHost() + ":" + list2.toString(), null));
                            }
                        }
                        if (hashMap.size() != i3) {
                            hashMap.clear();
                            closeConsumer(simpleConsumer);
                            simpleConsumer = null;
                        }
                        closeConsumer(simpleConsumer);
                    }
                } catch (Exception e2) {
                    arrayList.add(new FailedMetaDataAttempt("Failed to send topic metadata request for topic " + str2 + " from host " + next.getHost(), e2));
                    closeConsumer(simpleConsumer);
                }
            } catch (Throwable th) {
                closeConsumer(simpleConsumer);
                throw th;
            }
        }
        if (!arrayList.isEmpty()) {
            arrayList.forEach(failedMetaDataAttempt -> {
                failedMetaDataAttempt.log();
            });
            arrayList.clear();
            if (hashMap.isEmpty()) {
                throw new KafkaConfigurationException("Failed to get topic metadata for %s", str2);
            }
        }
        return hashMap;
    }

    public static void closeConsumer(SimpleConsumer simpleConsumer) {
        if (simpleConsumer != null) {
            try {
                simpleConsumer.close();
            } catch (Exception e) {
                m_logger.warn("Failed to close consumer connection.", e);
            }
        }
    }

    @Override // org.voltdb.importer.ImporterConfig
    public FormatterBuilder getFormatterBuilder() {
        return this.m_formatterBuilder;
    }
}
