package info.batey.kafka.unit;

import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.ServerSocket;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import kafka.admin.TopicCommand;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import kafka.utils.ZkUtils;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Console;

/* loaded from: input_file:info/batey/kafka/unit/KafkaUnit.class */
public class KafkaUnit {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaUnit.class);
    private final String zookeeperString;
    private final String brokerString;
    private final int zkPort;
    private final int brokerPort;
    private final Properties kafkaBrokerConfig;
    private final int zkMaxConnections;
    private KafkaServerStartable broker;
    private Zookeeper zookeeper;
    private KafkaProducer<String, String> producer;
    private File logDir;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:info/batey/kafka/unit/KafkaUnit$MessageExtractor.class */
    public interface MessageExtractor<T> {
        T extract(ConsumerRecord<String, String> consumerRecord);
    }

    /* loaded from: input_file:info/batey/kafka/unit/KafkaUnit$PasstroughMessageExtractor.class */
    public class PasstroughMessageExtractor implements MessageExtractor<ConsumerRecord<String, String>> {
        public PasstroughMessageExtractor() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // info.batey.kafka.unit.KafkaUnit.MessageExtractor
        public ConsumerRecord<String, String> extract(ConsumerRecord<String, String> consumerRecord) {
            return consumerRecord;
        }

        @Override // info.batey.kafka.unit.KafkaUnit.MessageExtractor
        public /* bridge */ /* synthetic */ ConsumerRecord<String, String> extract(ConsumerRecord consumerRecord) {
            return extract((ConsumerRecord<String, String>) consumerRecord);
        }
    }

    /* loaded from: input_file:info/batey/kafka/unit/KafkaUnit$ValueMessageExtractor.class */
    public class ValueMessageExtractor implements MessageExtractor<String> {
        public ValueMessageExtractor() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // info.batey.kafka.unit.KafkaUnit.MessageExtractor
        public String extract(ConsumerRecord<String, String> consumerRecord) {
            return (String) consumerRecord.value();
        }

        @Override // info.batey.kafka.unit.KafkaUnit.MessageExtractor
        public /* bridge */ /* synthetic */ String extract(ConsumerRecord consumerRecord) {
            return extract((ConsumerRecord<String, String>) consumerRecord);
        }
    }

    public KafkaUnit() throws IOException {
        this(getEphemeralPort(), getEphemeralPort());
    }

    public KafkaUnit(int i, int i2) {
        this(i, i2, 16);
    }

    public KafkaUnit(String str, String str2) {
        this(parseConnectionString(str), parseConnectionString(str2));
    }

    public KafkaUnit(String str, String str2, int i) {
        this(parseConnectionString(str), parseConnectionString(str2), i);
    }

    public KafkaUnit(int i, int i2, int i3) {
        this.kafkaBrokerConfig = new Properties();
        this.zkPort = i;
        this.brokerPort = i2;
        this.zookeeperString = "localhost:" + i;
        this.brokerString = "localhost:" + i2;
        this.zkMaxConnections = i3;
        this.producer = createProducer();
    }

    private KafkaProducer<String, String> createProducer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.brokerString);
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", StringSerializer.class.getName());
        return new KafkaProducer<>(properties);
    }

    private static int parseConnectionString(String str) {
        try {
            String[] split = str.split(",");
            if (split.length != 1) {
                throw new IllegalArgumentException("Only one 'host:port' pair is allowed in connection string");
            }
            String[] split2 = split[0].split(":");
            if (split2.length != 2) {
                throw new IllegalArgumentException("Invalid format of a 'host:port' pair");
            }
            if ("localhost".equals(split2[0])) {
                return Integer.parseInt(split2[1]);
            }
            throw new IllegalArgumentException("Only localhost is allowed for KafkaUnit");
        } catch (Exception e) {
            throw new RuntimeException("Cannot parse connectionString " + str, e);
        }
    }

    private static int getEphemeralPort() throws IOException {
        ServerSocket serverSocket = new ServerSocket(0);
        Throwable th = null;
        try {
            int localPort = serverSocket.getLocalPort();
            if (serverSocket != null) {
                if (0 != 0) {
                    try {
                        serverSocket.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    serverSocket.close();
                }
            }
            return localPort;
        } catch (Throwable th3) {
            if (serverSocket != null) {
                if (0 != 0) {
                    try {
                        serverSocket.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    serverSocket.close();
                }
            }
            throw th3;
        }
    }

    public void startup() {
        this.zookeeper = new Zookeeper(this.zkPort, this.zkMaxConnections);
        this.zookeeper.startup();
        try {
            this.logDir = Files.createTempDirectory("kafka", new FileAttribute[0]).toFile();
            this.logDir.deleteOnExit();
            Runtime.getRuntime().addShutdownHook(new Thread(getDeleteLogDirectoryAction()));
            this.kafkaBrokerConfig.setProperty("zookeeper.connect", this.zookeeperString);
            this.kafkaBrokerConfig.setProperty("broker.id", "1");
            this.kafkaBrokerConfig.setProperty("host.name", "localhost");
            this.kafkaBrokerConfig.setProperty("port", Integer.toString(this.brokerPort));
            this.kafkaBrokerConfig.setProperty("log.dir", this.logDir.getAbsolutePath());
            this.kafkaBrokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1));
            this.kafkaBrokerConfig.setProperty("delete.topic.enable", String.valueOf(true));
            this.kafkaBrokerConfig.setProperty("offsets.topic.replication.factor", String.valueOf(1));
            this.kafkaBrokerConfig.setProperty("auto.create.topics.enable", String.valueOf(false));
            this.broker = new KafkaServerStartable(new KafkaConfig(this.kafkaBrokerConfig));
            this.broker.startup();
        } catch (IOException e) {
            throw new RuntimeException("Unable to start Kafka", e);
        }
    }

    private Runnable getDeleteLogDirectoryAction() {
        return new Runnable() { // from class: info.batey.kafka.unit.KafkaUnit.1
            @Override // java.lang.Runnable
            public void run() {
                if (KafkaUnit.this.logDir != null) {
                    try {
                        FileUtils.deleteDirectory(KafkaUnit.this.logDir);
                    } catch (IOException e) {
                        KafkaUnit.LOGGER.warn("Problems deleting temporary directory " + KafkaUnit.this.logDir.getAbsolutePath(), e);
                    }
                }
            }
        };
    }

    public String getKafkaConnect() {
        return this.brokerString;
    }

    public int getZkPort() {
        return this.zkPort;
    }

    public int getBrokerPort() {
        return this.brokerPort;
    }

    public void createTopic(String str) {
        createTopic(str, 1);
    }

    public void createTopic(String str, int i) {
        String[] strArr = {"--create", "--zookeeper", this.zookeeperString, "--replication-factor", "1", "--partitions", String.valueOf(i), "--topic", str};
        TopicCommand.TopicCommandOptions topicCommandOptions = new TopicCommand.TopicCommandOptions(strArr);
        ZkUtils apply = ZkUtils.apply((String) topicCommandOptions.options().valueOf(topicCommandOptions.zkConnectOpt()), 30000, 30000, JaasUtils.isZkSecurityEnabled());
        try {
            LOGGER.info("Executing: CreateTopic " + Arrays.toString(strArr));
            TopicCommand.createTopic(apply, topicCommandOptions);
            apply.close();
        } catch (Throwable th) {
            apply.close();
            throw th;
        }
    }

    public List<String> listTopics() {
        String[] strArr = {"--zookeeper", this.zookeeperString, "--list"};
        TopicCommand.TopicCommandOptions topicCommandOptions = new TopicCommand.TopicCommandOptions(strArr);
        ZkUtils apply = ZkUtils.apply((String) topicCommandOptions.options().valueOf(topicCommandOptions.zkConnectOpt()), 30000, 30000, JaasUtils.isZkSecurityEnabled());
        final ArrayList arrayList = new ArrayList();
        try {
            LOGGER.info("Executing: ListTopics " + Arrays.toString(strArr));
            PrintStream out = Console.out();
            try {
                Console.setOut(new PrintStream(out) { // from class: info.batey.kafka.unit.KafkaUnit.2
                    @Override // java.io.PrintStream
                    public void print(String str) {
                        super.print(str);
                        if (str.endsWith("marked for deletion")) {
                            return;
                        }
                        arrayList.add(str);
                    }
                });
                TopicCommand.listTopics(apply, topicCommandOptions);
                Console.setOut(out);
                return arrayList;
            } catch (Throwable th) {
                Console.setOut(out);
                throw th;
            }
        } finally {
            apply.close();
        }
    }

    public void deleteAllTopics() {
        Iterator<String> it = listTopics().iterator();
        while (it.hasNext()) {
            try {
                deleteTopic(it.next());
            } catch (Throwable th) {
            }
        }
    }

    public void deleteTopic(String str) {
        String[] strArr = {"--zookeeper", this.zookeeperString, "--delete", "--topic", str};
        TopicCommand.TopicCommandOptions topicCommandOptions = new TopicCommand.TopicCommandOptions(strArr);
        ZkUtils apply = ZkUtils.apply((String) topicCommandOptions.options().valueOf(topicCommandOptions.zkConnectOpt()), 30000, 30000, JaasUtils.isZkSecurityEnabled());
        try {
            LOGGER.info("Executing: DeleteTopic " + Arrays.toString(strArr));
            TopicCommand.deleteTopic(apply, topicCommandOptions);
            apply.close();
        } catch (Throwable th) {
            apply.close();
            throw th;
        }
    }

    public void shutdown() {
        if (this.broker != null) {
            this.broker.shutdown();
            this.broker.awaitShutdown();
        }
        if (this.zookeeper != null) {
            this.zookeeper.shutdown();
        }
    }

    public List<ConsumerRecord<String, String>> readRecords(String str, int i) {
        return readMessages(str, i, new PasstroughMessageExtractor());
    }

    public List<String> readMessages(String str, int i) {
        return readMessages(str, i, new ValueMessageExtractor());
    }

    public List<String> readAllMessages(String str) {
        return readMessages(str, Integer.MAX_VALUE, new ValueMessageExtractor());
    }

    private <T> List<T> readMessages(String str, int i, MessageExtractor<T> messageExtractor) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.brokerString);
        properties.put("group.id", "test");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer", StringDeserializer.class.getName());
        properties.put("max.poll.records", String.valueOf(i));
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        Throwable th = null;
        try {
            try {
                kafkaConsumer.subscribe(Collections.singletonList(str));
                kafkaConsumer.poll(0L);
                kafkaConsumer.seekToBeginning(Collections.singletonList(new TopicPartition(str, 0)));
                ConsumerRecords poll = kafkaConsumer.poll(1000L);
                ArrayList arrayList = new ArrayList();
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    arrayList.add(messageExtractor.extract((ConsumerRecord) it.next()));
                }
                if (kafkaConsumer != null) {
                    if (0 != 0) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
                return arrayList;
            } finally {
            }
        } catch (Throwable th3) {
            if (kafkaConsumer != null) {
                if (th != null) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            throw th3;
        }
    }

    @SafeVarargs
    public final void sendMessages(ProducerRecord<String, String>... producerRecordArr) {
        for (ProducerRecord<String, String> producerRecord : producerRecordArr) {
            try {
                try {
                    this.producer.send(producerRecord).get();
                    this.producer.flush();
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            } catch (Throwable th) {
                this.producer.flush();
                throw th;
            }
        }
    }

    public final void setKafkaBrokerConfig(String str, String str2) {
        this.kafkaBrokerConfig.setProperty(str, str2);
    }
}
