package io.debezium.kafka;

import io.debezium.annotation.ThreadSafe;
import io.debezium.document.Document;
import io.debezium.document.DocumentSerdes;
import io.debezium.util.IoUtil;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Properties;
import java.util.Set;
import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiPredicate;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:io/debezium/kafka/KafkaCluster.class */
public class KafkaCluster {
    public static final boolean DEFAULT_DELETE_DATA_UPON_SHUTDOWN = true;
    public static final boolean DEFAULT_DELETE_DATA_PRIOR_TO_STARTUP = false;
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaCluster.class);
    private final ConcurrentMap<Integer, KafkaServer> kafkaServers = new ConcurrentHashMap();
    private final ZookeeperServer zkServer = new ZookeeperServer();
    private volatile File dataDir = null;
    private volatile boolean deleteDataUponShutdown = true;
    private volatile boolean deleteDataPriorToStartup = false;
    private volatile boolean running = false;
    private volatile Properties kafkaConfig = null;
    private volatile int startingKafkaPort = -1;
    private final AtomicLong nextKafkaPort = new AtomicLong(this.startingKafkaPort);

    /* loaded from: input_file:io/debezium/kafka/KafkaCluster$InteractiveConsumer.class */
    public interface InteractiveConsumer<K, V> extends Closeable {
        default V nextValue() throws InterruptedException {
            return (V) nextRecord().value();
        }

        ConsumerRecord<K, V> nextRecord() throws InterruptedException;

        default V nextValue(long j, TimeUnit timeUnit) throws InterruptedException {
            ConsumerRecord<K, V> nextRecord = nextRecord(j, timeUnit);
            if (nextRecord != null) {
                return (V) nextRecord.value();
            }
            return null;
        }

        ConsumerRecord<K, V> nextRecord(long j, TimeUnit timeUnit) throws InterruptedException;

        Stream<ConsumerRecord<K, V>> stream();

        Stream<ConsumerRecord<K, V>> streamAll();

        @Override // java.io.Closeable, java.lang.AutoCloseable
        void close();
    }

    /* loaded from: input_file:io/debezium/kafka/KafkaCluster$InteractiveProducer.class */
    public interface InteractiveProducer<K, V> extends Closeable {
        default InteractiveProducer<K, V> write(String str, K k, V v) {
            return write(new ProducerRecord<>(str, k, v));
        }

        InteractiveProducer<K, V> write(ProducerRecord<K, V> producerRecord);

        @Override // java.io.Closeable, java.lang.AutoCloseable
        void close();
    }

    /* loaded from: input_file:io/debezium/kafka/KafkaCluster$Usage.class */
    public class Usage {
        public Usage() {
        }

        public Properties getConsumerProperties(String str, String str2, OffsetResetStrategy offsetResetStrategy) {
            if (str == null) {
                throw new IllegalArgumentException("The groupId is required");
            }
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", KafkaCluster.this.brokerList());
            properties.setProperty("group.id", str);
            properties.setProperty("enable.auto.commit", Boolean.FALSE.toString());
            if (offsetResetStrategy != null) {
                properties.setProperty("auto.offset.reset", offsetResetStrategy.toString().toLowerCase());
            }
            if (str2 != null) {
                properties.setProperty("client.id", str2);
            }
            return properties;
        }

        public Properties getProducerProperties(String str) {
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", KafkaCluster.this.brokerList());
            properties.setProperty("acks", Integer.toString(1));
            if (str != null) {
                properties.setProperty("client.id", str);
            }
            return properties;
        }

        public <K, V> InteractiveProducer<K, V> createProducer(String str, Serializer<K> serializer, Serializer<V> serializer2) {
            final KafkaProducer kafkaProducer = new KafkaProducer(getProducerProperties(str), serializer, serializer2);
            return new InteractiveProducer<K, V>() { // from class: io.debezium.kafka.KafkaCluster.Usage.1
                @Override // io.debezium.kafka.KafkaCluster.InteractiveProducer
                public InteractiveProducer<K, V> write(ProducerRecord<K, V> producerRecord) {
                    kafkaProducer.send(producerRecord);
                    kafkaProducer.flush();
                    return this;
                }

                @Override // io.debezium.kafka.KafkaCluster.InteractiveProducer, java.io.Closeable, java.lang.AutoCloseable
                public void close() {
                    kafkaProducer.close();
                }
            };
        }

        public InteractiveProducer<String, Document> createProducer(String str) {
            return createProducer(str, new StringSerializer(), new DocumentSerdes());
        }

        public <K, V> InteractiveConsumer<K, V> createConsumer(String str, String str2, String str3, Deserializer<K> deserializer, Deserializer<V> deserializer2, Runnable runnable) {
            return createConsumer(str, str2, Collections.singleton(str3), deserializer, deserializer2, runnable);
        }

        public <K, V> InteractiveConsumer<K, V> createConsumer(String str, String str2, Set<String> set, Deserializer<K> deserializer, Deserializer<V> deserializer2, Runnable runnable) {
            final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
            final LinkedList linkedList = new LinkedList();
            final AtomicBoolean atomicBoolean = new AtomicBoolean();
            consume(str, str2, OffsetResetStrategy.EARLIEST, deserializer, deserializer2, () -> {
                return atomicBoolean.get();
            }, null, runnable, set, consumerRecord -> {
                linkedBlockingQueue.add(consumerRecord);
                linkedList.add(consumerRecord);
            });
            return new InteractiveConsumer<K, V>() { // from class: io.debezium.kafka.KafkaCluster.Usage.2
                @Override // io.debezium.kafka.KafkaCluster.InteractiveConsumer
                public ConsumerRecord<K, V> nextRecord() throws InterruptedException {
                    return (ConsumerRecord) linkedBlockingQueue.take();
                }

                @Override // io.debezium.kafka.KafkaCluster.InteractiveConsumer
                public ConsumerRecord<K, V> nextRecord(long j, TimeUnit timeUnit) throws InterruptedException {
                    return (ConsumerRecord) linkedBlockingQueue.poll(j, timeUnit);
                }

                @Override // io.debezium.kafka.KafkaCluster.InteractiveConsumer, java.io.Closeable, java.lang.AutoCloseable
                public void close() {
                    atomicBoolean.set(false);
                }

                @Override // io.debezium.kafka.KafkaCluster.InteractiveConsumer
                public Stream<ConsumerRecord<K, V>> stream() {
                    return linkedBlockingQueue.stream();
                }

                @Override // io.debezium.kafka.KafkaCluster.InteractiveConsumer
                public Stream<ConsumerRecord<K, V>> streamAll() {
                    return linkedList.stream();
                }
            };
        }

        public InteractiveConsumer<String, Document> createConsumer(String str, String str2, String str3, Runnable runnable) {
            return createConsumer(str, str2, Collections.singleton(str3), (Deserializer) new StringDeserializer(), (Deserializer) new DocumentSerdes(), runnable);
        }

        public InteractiveConsumer<String, Document> createConsumer(String str, String str2, Set<String> set, Runnable runnable) {
            return createConsumer(str, str2, set, (Deserializer) new StringDeserializer(), (Deserializer) new DocumentSerdes(), runnable);
        }

        public <K, V> void produce(String str, Consumer<InteractiveProducer<String, Document>> consumer) {
            produce(str, new StringSerializer(), new DocumentSerdes(), consumer);
        }

        public <K, V> void produce(String str, Serializer<K> serializer, Serializer<V> serializer2, Consumer<InteractiveProducer<K, V>> consumer) {
            final KafkaProducer kafkaProducer = new KafkaProducer(getProducerProperties(str), serializer, serializer2);
            InteractiveProducer<K, V> interactiveProducer = new InteractiveProducer<K, V>() { // from class: io.debezium.kafka.KafkaCluster.Usage.3
                @Override // io.debezium.kafka.KafkaCluster.InteractiveProducer
                public InteractiveProducer<K, V> write(ProducerRecord<K, V> producerRecord) {
                    kafkaProducer.send(producerRecord);
                    kafkaProducer.flush();
                    return this;
                }

                @Override // io.debezium.kafka.KafkaCluster.InteractiveProducer, java.io.Closeable, java.lang.AutoCloseable
                public void close() {
                    kafkaProducer.close();
                }
            };
            Thread thread = new Thread(() -> {
                try {
                    consumer.accept(interactiveProducer);
                } finally {
                    interactiveProducer.close();
                }
            });
            thread.setName(str + "-thread");
            thread.start();
        }

        public <K, V> void produce(String str, int i, Serializer<K> serializer, Serializer<V> serializer2, Runnable runnable, Supplier<ProducerRecord<K, V>> supplier) {
            Properties producerProperties = getProducerProperties(str);
            Thread thread = new Thread(() -> {
                KafkaCluster.LOGGER.debug("Starting producer {} to write {} messages", str, Integer.valueOf(i));
                try {
                    KafkaProducer kafkaProducer = new KafkaProducer(producerProperties, serializer, serializer2);
                    Throwable th = null;
                    for (int i2 = 0; i2 != i; i2++) {
                        try {
                            try {
                                ProducerRecord producerRecord = (ProducerRecord) supplier.get();
                                kafkaProducer.send(producerRecord);
                                kafkaProducer.flush();
                                KafkaCluster.LOGGER.debug("Producer {}: sent message {}", str, producerRecord);
                            } finally {
                            }
                        } finally {
                        }
                    }
                    if (kafkaProducer != null) {
                        if (0 != 0) {
                            try {
                                kafkaProducer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            kafkaProducer.close();
                        }
                    }
                } finally {
                    if (runnable != null) {
                        runnable.run();
                    }
                    KafkaCluster.LOGGER.debug("Stopping producer {}", str);
                }
            });
            thread.setName(str + "-thread");
            thread.start();
        }

        public void produceStrings(int i, Runnable runnable, Supplier<ProducerRecord<String, String>> supplier) {
            StringSerializer stringSerializer = new StringSerializer();
            produce(UUID.randomUUID().toString(), i, stringSerializer, stringSerializer, runnable, supplier);
        }

        public void produceDocuments(int i, Runnable runnable, Supplier<ProducerRecord<String, Document>> supplier) {
            produce(UUID.randomUUID().toString(), i, new StringSerializer(), new DocumentSerdes(), runnable, supplier);
        }

        public void produceIntegers(int i, Runnable runnable, Supplier<ProducerRecord<String, Integer>> supplier) {
            produce(UUID.randomUUID().toString(), i, new StringSerializer(), new IntegerSerializer(), runnable, supplier);
        }

        public void produceIntegers(String str, int i, int i2, Runnable runnable) {
            AtomicLong atomicLong = new AtomicLong(i2);
            produceIntegers(i, runnable, () -> {
                long incrementAndGet = atomicLong.incrementAndGet();
                return new ProducerRecord(str, Long.toString(incrementAndGet), new Integer((int) incrementAndGet));
            });
        }

        public void produceStrings(String str, int i, Runnable runnable, Supplier<String> supplier) {
            AtomicLong atomicLong = new AtomicLong(0L);
            produceStrings(i, runnable, () -> {
                return new ProducerRecord(str, Long.toString(atomicLong.incrementAndGet()), supplier.get());
            });
        }

        public void produceDocuments(String str, int i, Runnable runnable, Supplier<Document> supplier) {
            AtomicLong atomicLong = new AtomicLong(0L);
            produceDocuments(i, runnable, () -> {
                return new ProducerRecord(str, Long.toString(atomicLong.incrementAndGet()), supplier.get());
            });
        }

        public <K, V> void consume(String str, String str2, OffsetResetStrategy offsetResetStrategy, Deserializer<K> deserializer, Deserializer<V> deserializer2, BooleanSupplier booleanSupplier, OffsetCommitCallback offsetCommitCallback, Runnable runnable, Collection<String> collection, Consumer<ConsumerRecord<K, V>> consumer) {
            Properties consumerProperties = getConsumerProperties(str, str2, offsetResetStrategy);
            Thread thread = new Thread(() -> {
                KafkaCluster.LOGGER.debug("Starting consumer {} to read messages", str2);
                try {
                    KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerProperties, deserializer, deserializer2);
                    Throwable th = null;
                    try {
                        try {
                            kafkaConsumer.subscribe(new ArrayList(collection));
                            while (booleanSupplier.getAsBoolean()) {
                                kafkaConsumer.poll(10L).forEach(consumerRecord -> {
                                    KafkaCluster.LOGGER.debug("Consumer {}: consuming message {}", str2, consumerRecord);
                                    consumer.accept(consumerRecord);
                                    if (offsetCommitCallback != null) {
                                        kafkaConsumer.commitAsync(offsetCommitCallback);
                                    }
                                });
                            }
                            if (kafkaConsumer != null) {
                                if (0 != 0) {
                                    try {
                                        kafkaConsumer.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    kafkaConsumer.close();
                                }
                            }
                        } finally {
                        }
                    } finally {
                    }
                } finally {
                    if (runnable != null) {
                        runnable.run();
                    }
                    KafkaCluster.LOGGER.debug("Stopping consumer {}", str2);
                }
            });
            thread.setName(str2 + "-thread");
            thread.start();
        }

        public void consumeDocuments(BooleanSupplier booleanSupplier, Runnable runnable, Collection<String> collection, Consumer<ConsumerRecord<String, Document>> consumer) {
            StringDeserializer stringDeserializer = new StringDeserializer();
            DocumentSerdes documentSerdes = new DocumentSerdes();
            String uuid = UUID.randomUUID().toString();
            consume(uuid, uuid, OffsetResetStrategy.EARLIEST, stringDeserializer, documentSerdes, booleanSupplier, null, runnable, collection, consumer);
        }

        public void consumeStrings(BooleanSupplier booleanSupplier, Runnable runnable, Collection<String> collection, Consumer<ConsumerRecord<String, String>> consumer) {
            StringDeserializer stringDeserializer = new StringDeserializer();
            String uuid = UUID.randomUUID().toString();
            consume(uuid, uuid, OffsetResetStrategy.EARLIEST, stringDeserializer, stringDeserializer, booleanSupplier, null, runnable, collection, consumer);
        }

        public void consumeIntegers(BooleanSupplier booleanSupplier, Runnable runnable, Collection<String> collection, Consumer<ConsumerRecord<String, Integer>> consumer) {
            StringDeserializer stringDeserializer = new StringDeserializer();
            IntegerDeserializer integerDeserializer = new IntegerDeserializer();
            String uuid = UUID.randomUUID().toString();
            consume(uuid, uuid, OffsetResetStrategy.EARLIEST, stringDeserializer, integerDeserializer, booleanSupplier, null, runnable, collection, consumer);
        }

        public void consumeStrings(String str, int i, long j, TimeUnit timeUnit, Runnable runnable, BiPredicate<String, String> biPredicate) {
            AtomicLong atomicLong = new AtomicLong();
            consumeStrings(continueIfNotExpired(() -> {
                return atomicLong.get() < ((long) i);
            }, j, timeUnit), runnable, Collections.singleton(str), consumerRecord -> {
                if (biPredicate.test(consumerRecord.key(), consumerRecord.value())) {
                    atomicLong.incrementAndGet();
                }
            });
        }

        public void consumeDocuments(String str, int i, long j, TimeUnit timeUnit, Runnable runnable, BiPredicate<String, Document> biPredicate) {
            AtomicLong atomicLong = new AtomicLong();
            consumeDocuments(continueIfNotExpired(() -> {
                return atomicLong.get() < ((long) i);
            }, j, timeUnit), runnable, Collections.singleton(str), consumerRecord -> {
                if (biPredicate.test(consumerRecord.key(), consumerRecord.value())) {
                    atomicLong.incrementAndGet();
                }
            });
        }

        public void consumeIntegers(String str, int i, long j, TimeUnit timeUnit, Runnable runnable, BiPredicate<String, Integer> biPredicate) {
            AtomicLong atomicLong = new AtomicLong();
            consumeIntegers(continueIfNotExpired(() -> {
                return atomicLong.get() < ((long) i);
            }, j, timeUnit), runnable, Collections.singleton(str), consumerRecord -> {
                if (biPredicate.test(consumerRecord.key(), consumerRecord.value())) {
                    atomicLong.incrementAndGet();
                }
            });
        }

        public void consumeStrings(String str, int i, long j, TimeUnit timeUnit, Runnable runnable) {
            consumeStrings(str, i, j, timeUnit, runnable, (str2, str3) -> {
                return true;
            });
        }

        public void consumeDocuments(String str, int i, long j, TimeUnit timeUnit, Runnable runnable) {
            consumeDocuments(str, i, j, timeUnit, runnable, (str2, document) -> {
                return true;
            });
        }

        public void consumeIntegers(String str, int i, long j, TimeUnit timeUnit, Runnable runnable) {
            consumeIntegers(str, i, j, timeUnit, runnable, (str2, num) -> {
                return true;
            });
        }

        protected BooleanSupplier continueIfNotExpired(final BooleanSupplier booleanSupplier, final long j, final TimeUnit timeUnit) {
            return new BooleanSupplier() { // from class: io.debezium.kafka.KafkaCluster.Usage.4
                long stopTime = 0;

                @Override // java.util.function.BooleanSupplier
                public boolean getAsBoolean() {
                    if (this.stopTime == 0) {
                        this.stopTime = System.currentTimeMillis() + timeUnit.toMillis(j);
                    }
                    return booleanSupplier.getAsBoolean() && System.currentTimeMillis() <= this.stopTime;
                }
            };
        }
    }

    public KafkaCluster deleteDataUponShutdown(boolean z) {
        if (this.running) {
            throw new IllegalStateException("Unable to change cluster settings when running");
        }
        this.deleteDataUponShutdown = z;
        return this;
    }

    public KafkaCluster deleteDataPriorToStartup(boolean z) {
        if (this.running) {
            throw new IllegalStateException("Unable to change cluster settings when running");
        }
        this.deleteDataPriorToStartup = z;
        return this;
    }

    public KafkaCluster addBrokers(int i) {
        if (this.running) {
            throw new IllegalStateException("Unable to add a broker when the cluster is already running");
        }
        AtomicLong atomicLong = new AtomicLong();
        while (atomicLong.intValue() < i) {
            this.kafkaServers.computeIfAbsent(new Integer(atomicLong.intValue() + 1), num -> {
                atomicLong.incrementAndGet();
                ZookeeperServer zookeeperServer = this.zkServer;
                zookeeperServer.getClass();
                KafkaServer kafkaServer = new KafkaServer(zookeeperServer::getConnection, num.intValue());
                if (this.dataDir != null) {
                    kafkaServer.setStateDirectory(this.dataDir);
                }
                if (this.kafkaConfig != null) {
                    kafkaServer.setProperties(this.kafkaConfig);
                }
                if (this.startingKafkaPort >= 0) {
                    kafkaServer.setPort((int) this.nextKafkaPort.getAndIncrement());
                }
                return kafkaServer;
            });
        }
        return this;
    }

    public KafkaCluster usingDirectory(File file) {
        if (this.running) {
            throw new IllegalStateException("Unable to add a broker when the cluster is already running");
        }
        if (file != null && file.exists() && !file.isDirectory() && !file.canWrite() && !file.canRead()) {
            throw new IllegalArgumentException("The directory must be readable and writable");
        }
        this.dataDir = file;
        return this;
    }

    public KafkaCluster withKafkaConfiguration(Properties properties) {
        if (this.running) {
            throw new IllegalStateException("Unable to add a broker when the cluster is already running");
        }
        if (properties != null && !properties.isEmpty()) {
            this.kafkaConfig = new Properties();
            this.kafkaConfig.putAll(properties);
            this.kafkaServers.values().forEach(kafkaServer -> {
                kafkaServer.setProperties(this.kafkaConfig);
            });
        }
        return this;
    }

    public KafkaCluster withPorts(int i, int i2) {
        if (this.running) {
            throw new IllegalStateException("Unable to add a broker when the cluster is already running");
        }
        this.zkServer.setPort(i);
        this.startingKafkaPort = i2;
        if (this.startingKafkaPort >= 0) {
            this.nextKafkaPort.set(this.startingKafkaPort);
            this.kafkaServers.values().forEach(kafkaServer -> {
                kafkaServer.setPort((int) this.nextKafkaPort.getAndIncrement());
            });
        }
        return this;
    }

    public boolean isRunning() {
        return this.running;
    }

    public synchronized KafkaCluster startup() throws IOException {
        if (!this.running) {
            if (this.dataDir == null) {
                try {
                    File createTempFile = File.createTempFile("kafka", "suffix");
                    this.dataDir = new File(createTempFile.getParentFile(), "cluster");
                    this.dataDir.mkdirs();
                    createTempFile.delete();
                } catch (IOException e) {
                    throw new RuntimeException("Unable to create temporary directory", e);
                }
            } else if (this.deleteDataPriorToStartup) {
                IoUtil.delete(this.dataDir);
                this.dataDir.mkdirs();
            }
            this.zkServer.setStateDirectory(new File(this.dataDir, "zk"));
            this.dataDir = this.dataDir;
            File file = new File(this.dataDir, "kafka");
            this.kafkaServers.values().forEach(kafkaServer -> {
                kafkaServer.setStateDirectory(new File(file, "broker" + kafkaServer.brokerId()));
            });
            this.zkServer.startup();
            LOGGER.debug("Starting {} brokers", Integer.valueOf(this.kafkaServers.size()));
            this.kafkaServers.values().forEach((v0) -> {
                v0.startup();
            });
            this.running = true;
        }
        return this;
    }

    public synchronized KafkaCluster shutdown() {
        if (this.running) {
            try {
                this.kafkaServers.values().forEach(this::shutdownReliably);
                try {
                    try {
                        this.zkServer.shutdown(this.deleteDataUponShutdown);
                        if (this.deleteDataUponShutdown) {
                            try {
                                this.kafkaServers.values().forEach((v0) -> {
                                    v0.deleteData();
                                });
                                try {
                                    IoUtil.delete(this.dataDir);
                                } catch (IOException e) {
                                    LOGGER.error("Error while deleting cluster data", e);
                                }
                            } finally {
                            }
                        }
                        this.running = false;
                    } catch (Throwable th) {
                        LOGGER.error("Error while shutting down {}", this.zkServer, th);
                        if (this.deleteDataUponShutdown) {
                            try {
                                this.kafkaServers.values().forEach((v0) -> {
                                    v0.deleteData();
                                });
                                try {
                                    IoUtil.delete(this.dataDir);
                                } catch (IOException e2) {
                                    LOGGER.error("Error while deleting cluster data", e2);
                                }
                            } finally {
                            }
                        }
                        this.running = false;
                    }
                } catch (Throwable th2) {
                    if (this.deleteDataUponShutdown) {
                        try {
                            this.kafkaServers.values().forEach((v0) -> {
                                v0.deleteData();
                            });
                            try {
                                IoUtil.delete(this.dataDir);
                            } catch (IOException e3) {
                                LOGGER.error("Error while deleting cluster data", e3);
                            }
                        } finally {
                            try {
                                IoUtil.delete(this.dataDir);
                            } catch (IOException e4) {
                                LOGGER.error("Error while deleting cluster data", e4);
                            }
                        }
                    }
                    this.running = false;
                    throw th2;
                }
            } catch (Throwable th3) {
                try {
                    try {
                        this.zkServer.shutdown(this.deleteDataUponShutdown);
                        if (this.deleteDataUponShutdown) {
                            try {
                                this.kafkaServers.values().forEach((v0) -> {
                                    v0.deleteData();
                                });
                                try {
                                    IoUtil.delete(this.dataDir);
                                } catch (IOException e5) {
                                    LOGGER.error("Error while deleting cluster data", e5);
                                }
                            } finally {
                                try {
                                    IoUtil.delete(this.dataDir);
                                } catch (IOException e6) {
                                    LOGGER.error("Error while deleting cluster data", e6);
                                }
                            }
                        }
                        this.running = false;
                    } catch (Throwable th4) {
                        LOGGER.error("Error while shutting down {}", this.zkServer, th4);
                        if (this.deleteDataUponShutdown) {
                            try {
                                this.kafkaServers.values().forEach((v0) -> {
                                    v0.deleteData();
                                });
                                try {
                                    IoUtil.delete(this.dataDir);
                                } catch (IOException e7) {
                                    LOGGER.error("Error while deleting cluster data", e7);
                                }
                            } finally {
                                try {
                                    IoUtil.delete(this.dataDir);
                                } catch (IOException e8) {
                                    LOGGER.error("Error while deleting cluster data", e8);
                                }
                            }
                        }
                        this.running = false;
                        throw th3;
                    }
                    throw th3;
                } catch (Throwable th5) {
                    if (this.deleteDataUponShutdown) {
                        try {
                            this.kafkaServers.values().forEach((v0) -> {
                                v0.deleteData();
                            });
                            try {
                                IoUtil.delete(this.dataDir);
                            } catch (IOException e9) {
                                LOGGER.error("Error while deleting cluster data", e9);
                            }
                        } finally {
                            try {
                                IoUtil.delete(this.dataDir);
                            } catch (IOException e10) {
                                LOGGER.error("Error while deleting cluster data", e10);
                            }
                        }
                    }
                    this.running = false;
                    throw th5;
                }
            }
        }
        return this;
    }

    public void createTopics(String... strArr) {
        LOGGER.debug("Creating topics: {}", Arrays.toString(strArr));
        if (!this.running) {
            throw new IllegalStateException("The cluster must be running to create topics");
        }
        this.kafkaServers.values().stream().findFirst().ifPresent(kafkaServer -> {
            kafkaServer.createTopics(strArr);
        });
    }

    public void createTopics(Set<String> set) {
        createTopics((String[]) set.toArray(new String[set.size()]));
    }

    public void createTopics(int i, int i2, String... strArr) {
        LOGGER.debug("Creating topics with {} partitions and {} replicas each: {}", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), Arrays.toString(strArr)});
        if (!this.running) {
            throw new IllegalStateException("The cluster must be running to create topics");
        }
        this.kafkaServers.values().stream().findFirst().ifPresent(kafkaServer -> {
            kafkaServer.createTopics(i, i2, strArr);
        });
    }

    public void createTopics(int i, int i2, Set<String> set) {
        createTopics(i, i2, (String[]) set.toArray(new String[set.size()]));
    }

    public void createTopic(String str, int i, int i2) {
        LOGGER.debug("Creating topic '{}' with {} partitions and {} replicas", new Object[]{str, Integer.valueOf(i), Integer.valueOf(i2)});
        if (!this.running) {
            throw new IllegalStateException("The cluster must be running to create topics");
        }
        this.kafkaServers.values().stream().findFirst().ifPresent(kafkaServer -> {
            kafkaServer.createTopic(str, i, i2);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onEachDirectory(Consumer<File> consumer) {
        consumer.accept(this.zkServer.getSnapshotDirectory());
        consumer.accept(this.zkServer.getLogDirectory());
        this.kafkaServers.values().forEach(kafkaServer -> {
            consumer.accept(kafkaServer.getStateDirectory());
        });
    }

    public String brokerList() {
        StringJoiner stringJoiner = new StringJoiner(",");
        this.kafkaServers.values().forEach(kafkaServer -> {
            stringJoiner.add(kafkaServer.getConnection());
        });
        return stringJoiner.toString();
    }

    private void shutdownReliably(KafkaServer kafkaServer) {
        try {
            kafkaServer.shutdown(this.deleteDataUponShutdown);
        } catch (Throwable th) {
            LOGGER.error("Error while shutting down {}", kafkaServer, th);
        }
    }

    public Usage useTo() {
        if (this.running) {
            return new Usage();
        }
        throw new IllegalStateException("Unable to use the cluster it is not running");
    }
}
