/*
 * Decompiled with CFR 0.152.
 */
package io.strimzi.test.container;

import io.strimzi.test.container.StrimziKafkaContainer;
import io.strimzi.test.container.StrimziZookeeperContainer;
import io.strimzi.test.container.Utils;
import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.lifecycle.Startables;

public class StrimziKafkaCluster
implements Startable {
    private static final Logger LOGGER = LoggerFactory.getLogger(StrimziKafkaCluster.class);
    private final int brokersNum;
    private final Network network;
    private final StrimziZookeeperContainer zookeeper;
    private final Collection<StrimziKafkaContainer> brokers;

    public StrimziKafkaCluster(int brokersNum, int internalTopicReplicationFactor, Map<String, String> additionalKafkaConfiguration) {
        if (brokersNum < 0) {
            throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0");
        }
        if (internalTopicReplicationFactor < 0 || internalTopicReplicationFactor > brokersNum) {
            throw new IllegalArgumentException("internalTopicReplicationFactor '" + internalTopicReplicationFactor + "' must be less than brokersNum and greater than 0");
        }
        this.brokersNum = brokersNum;
        this.network = Network.newNetwork();
        this.zookeeper = (StrimziZookeeperContainer)new StrimziZookeeperContainer().withNetwork(this.network);
        HashMap<String, String> defaultKafkaConfigurationForMultiNode = new HashMap<String, String>();
        defaultKafkaConfigurationForMultiNode.put("offsets.topic.replication.factor", String.valueOf(internalTopicReplicationFactor));
        defaultKafkaConfigurationForMultiNode.put("num.partitions", String.valueOf(internalTopicReplicationFactor));
        defaultKafkaConfigurationForMultiNode.put("transaction.state.log.replication.factor", String.valueOf(internalTopicReplicationFactor));
        defaultKafkaConfigurationForMultiNode.put("transaction.state.log.min.isr", String.valueOf(internalTopicReplicationFactor));
        additionalKafkaConfiguration.putAll(defaultKafkaConfigurationForMultiNode);
        this.brokers = IntStream.range(0, this.brokersNum).mapToObj(brokerId -> {
            LOGGER.info("Starting broker with id {}", (Object)brokerId);
            StrimziKafkaContainer kafkaContainer = (StrimziKafkaContainer)((StrimziKafkaContainer)((StrimziKafkaContainer)new StrimziKafkaContainer().withBrokerId(brokerId).withKafkaConfigurationMap(additionalKafkaConfiguration).withExternalZookeeperConnect("zookeeper:2181").withNetwork(this.network)).withNetworkAliases(new String[]{"broker-" + brokerId})).dependsOn(new Startable[]{this.zookeeper});
            LOGGER.info("Started broker with id: {}", (Object)kafkaContainer);
            return kafkaContainer;
        }).collect(Collectors.toList());
    }

    public Collection<StrimziKafkaContainer> getBrokers() {
        return this.brokers;
    }

    public String getBootstrapServers() {
        return this.brokers.stream().map(StrimziKafkaContainer::getBootstrapServers).collect(Collectors.joining(","));
    }

    public void start() {
        Stream<StrimziKafkaContainer> startables = this.brokers.stream();
        try {
            Startables.deepStart(startables).get(60L, TimeUnit.SECONDS);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            Thread.currentThread().interrupt();
            e.printStackTrace();
        }
        Utils.waitFor("Broker node", Duration.ofSeconds(1L).toMillis(), Duration.ofSeconds(30L).toMillis(), () -> {
            try {
                Container.ExecResult result = this.zookeeper.execInContainer(new String[]{"sh", "-c", "bin/zookeeper-shell.sh zookeeper:2181 ls /brokers/ids | tail -n 1"});
                String brokers = result.getStdout();
                LOGGER.info("Stdout from zookeeper container....{}", (Object)result.getStdout());
                return brokers != null && brokers.split(",").length == this.brokersNum;
            }
            catch (IOException | InterruptedException e) {
                Thread.currentThread().interrupt();
                e.printStackTrace();
                return false;
            }
        });
    }

    public void stop() {
        this.zookeeper.stop();
        ((Stream)this.brokers.stream().parallel()).forEach(GenericContainer::stop);
    }

    public StrimziZookeeperContainer getZookeeper() {
        return this.zookeeper;
    }
}

