package org.springframework.kafka.test.rule;

import com.gs.collections.api.block.function.Function;
import com.gs.collections.impl.list.mutable.FastList;
import com.gs.collections.impl.utility.ListIterate;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import javax.net.ServerSocketFactory;
import kafka.admin.AdminUtils;
import kafka.admin.AdminUtils$;
import kafka.api.PartitionMetadata;
import kafka.api.TopicMetadata;
import kafka.cluster.BrokerEndPoint;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.NotRunning;
import kafka.utils.CoreUtils;
import kafka.utils.SystemTime$;
import kafka.utils.TestUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import kafka.zk.EmbeddedZookeeper;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.apache.kafka.common.protocol.Errors;
import org.junit.rules.ExternalResource;
import org.springframework.kafka.test.core.BrokerAddress;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import scala.Option;
import scala.collection.JavaConversions;

/* loaded from: input_file:org/springframework/kafka/test/rule/KafkaEmbedded.class */
public class KafkaEmbedded extends ExternalResource implements KafkaRule {
    public static final long METADATA_PROPAGATION_TIMEOUT = 10000;
    private final int count;
    private final boolean controlledShutdown;
    private final String[] topics;
    private final int partitionsPerTopic;
    private List<KafkaServer> kafkaServers;
    private EmbeddedZookeeper zookeeper;
    private ZkClient zookeeperClient;
    private String zkConnect;

    public KafkaEmbedded(int i) {
        this(i, false, new String[0]);
    }

    public KafkaEmbedded(int i, boolean z, String... strArr) {
        this(i, z, 2, strArr);
    }

    public KafkaEmbedded(int i, boolean z, int i2, String... strArr) {
        this.count = i;
        this.controlledShutdown = z;
        if (strArr != null) {
            this.topics = strArr;
        } else {
            this.topics = new String[0];
        }
        this.partitionsPerTopic = i2;
    }

    protected void before() throws Exception {
        startZookeeper();
        this.zkConnect = "127.0.0.1:" + this.zookeeper.port();
        this.zookeeperClient = new ZkClient(this.zkConnect, 6000, 6000, ZKStringSerializer$.MODULE$);
        this.kafkaServers = new ArrayList();
        for (int i = 0; i < this.count; i++) {
            ServerSocket createServerSocket = ServerSocketFactory.getDefault().createServerSocket(0);
            int localPort = createServerSocket.getLocalPort();
            createServerSocket.close();
            Properties createBrokerConfig = TestUtils.createBrokerConfig(i, this.zkConnect, this.controlledShutdown, true, localPort, Option.apply((Object) null), Option.apply((Object) null), true, false, 0, false, 0, false, 0);
            createBrokerConfig.setProperty("replica.socket.timeout.ms", "1000");
            createBrokerConfig.setProperty("controller.socket.timeout.ms", "1000");
            createBrokerConfig.setProperty("offsets.topic.replication.factor", "1");
            this.kafkaServers.add(TestUtils.createServer(new KafkaConfig(createBrokerConfig), SystemTime$.MODULE$));
        }
        ZkUtils zkUtils = new ZkUtils(getZkClient(), (ZkConnection) null, false);
        Properties properties = new Properties();
        for (String str : this.topics) {
            AdminUtils.createTopic(zkUtils, str, this.partitionsPerTopic, this.count, properties);
        }
    }

    protected void after() {
        for (KafkaServer kafkaServer : this.kafkaServers) {
            try {
                if (kafkaServer.brokerState().currentState() != NotRunning.state()) {
                    kafkaServer.shutdown();
                    kafkaServer.awaitShutdown();
                }
            } catch (Exception e) {
            }
            try {
                CoreUtils.rm(kafkaServer.config().logDirs());
            } catch (Exception e2) {
            }
        }
        try {
            this.zookeeperClient.close();
        } catch (ZkInterruptedException e3) {
        }
        try {
            this.zookeeper.shutdown();
        } catch (Exception e4) {
        }
    }

    @Override // org.springframework.kafka.test.rule.KafkaRule
    public List<KafkaServer> getKafkaServers() {
        return this.kafkaServers;
    }

    public KafkaServer getKafkaServer(int i) {
        return this.kafkaServers.get(i);
    }

    public EmbeddedZookeeper getZookeeper() {
        return this.zookeeper;
    }

    @Override // org.springframework.kafka.test.rule.KafkaRule
    public ZkClient getZkClient() {
        return this.zookeeperClient;
    }

    @Override // org.springframework.kafka.test.rule.KafkaRule
    public String getZookeeperConnectionString() {
        return this.zkConnect;
    }

    public BrokerAddress getBrokerAddress(int i) {
        KafkaServer kafkaServer = this.kafkaServers.get(i);
        return new BrokerAddress(kafkaServer.config().hostName(), kafkaServer.config().port().intValue());
    }

    @Override // org.springframework.kafka.test.rule.KafkaRule
    public BrokerAddress[] getBrokerAddresses() {
        return (BrokerAddress[]) ListIterate.collect(this.kafkaServers, new Function<KafkaServer, BrokerAddress>() { // from class: org.springframework.kafka.test.rule.KafkaEmbedded.1
            public BrokerAddress valueOf(KafkaServer kafkaServer) {
                return new BrokerAddress("127.0.0.1", kafkaServer.config().port().intValue());
            }
        }).toArray(new BrokerAddress[this.kafkaServers.size()]);
    }

    @Override // org.springframework.kafka.test.rule.KafkaRule
    public int getPartitionsPerTopic() {
        return this.partitionsPerTopic;
    }

    public void bounce(BrokerAddress brokerAddress) {
        for (KafkaServer kafkaServer : getKafkaServers()) {
            if (brokerAddress.equals(new BrokerAddress(kafkaServer.config().hostName(), kafkaServer.config().port().intValue()))) {
                kafkaServer.shutdown();
                kafkaServer.awaitShutdown();
            }
        }
    }

    public void startZookeeper() {
        this.zookeeper = new EmbeddedZookeeper();
    }

    public void bounce(int i, boolean z) {
        this.kafkaServers.get(i).shutdown();
        if (z) {
            long currentTimeMillis = System.currentTimeMillis();
            do {
                try {
                    Thread.sleep(100L);
                    boolean z2 = true;
                    ZkUtils zkUtils = new ZkUtils(getZkClient(), (ZkConnection) null, false);
                    for (TopicMetadata topicMetadata : JavaConversions.asJavaCollection(AdminUtils$.MODULE$.fetchTopicMetadataFromZk(AdminUtils$.MODULE$.fetchAllTopicConfigs(zkUtils).keySet(), zkUtils))) {
                        if (Errors.forCode(topicMetadata.errorCode()).exception() == null) {
                            Iterator it = JavaConversions.asJavaCollection(topicMetadata.partitionsMetadata()).iterator();
                            while (it.hasNext()) {
                                Iterator it2 = JavaConversions.asJavaCollection(((PartitionMetadata) it.next()).isr()).iterator();
                                while (it2.hasNext()) {
                                    if (((BrokerEndPoint) it2.next()).id() == i) {
                                        z2 = false;
                                    }
                                }
                            }
                        }
                    }
                    if (z2) {
                        return;
                    }
                } catch (InterruptedException e) {
                    return;
                }
            } while (System.currentTimeMillis() - currentTimeMillis < METADATA_PROPAGATION_TIMEOUT);
        }
    }

    public void bounce(int i) {
        bounce(i, true);
    }

    public void restart(final int i) throws Exception {
        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(10, Collections.singletonMap(Exception.class, true));
        ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
        exponentialBackOffPolicy.setInitialInterval(100L);
        exponentialBackOffPolicy.setMaxInterval(1000L);
        exponentialBackOffPolicy.setMultiplier(2.0d);
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(simpleRetryPolicy);
        retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);
        retryTemplate.execute(new RetryCallback<Void, Exception>() { // from class: org.springframework.kafka.test.rule.KafkaEmbedded.2
            /* renamed from: doWithRetry, reason: merged with bridge method [inline-methods] */
            public Void m0doWithRetry(RetryContext retryContext) throws Exception {
                ((KafkaServer) KafkaEmbedded.this.kafkaServers.get(i)).startup();
                return null;
            }
        });
    }

    public void waitUntilSynced(String str, int i) {
        long currentTimeMillis = System.currentTimeMillis();
        do {
            try {
                Thread.sleep(100L);
                boolean z = true;
                TopicMetadata fetchTopicMetadataFromZk = AdminUtils$.MODULE$.fetchTopicMetadataFromZk(str, new ZkUtils(getZkClient(), (ZkConnection) null, false));
                if (Errors.forCode(fetchTopicMetadataFromZk.errorCode()).exception() == null) {
                    Iterator it = JavaConversions.asJavaCollection(fetchTopicMetadataFromZk.partitionsMetadata()).iterator();
                    while (it.hasNext()) {
                        boolean z2 = false;
                        Iterator it2 = JavaConversions.asJavaCollection(((PartitionMetadata) it.next()).isr()).iterator();
                        while (it2.hasNext()) {
                            if (((BrokerEndPoint) it2.next()).id() == i) {
                                z2 = true;
                            }
                        }
                        if (!z2) {
                            z = false;
                        }
                    }
                }
                if (z) {
                    return;
                }
            } catch (InterruptedException e) {
                return;
            }
        } while (System.currentTimeMillis() - currentTimeMillis < METADATA_PROPAGATION_TIMEOUT);
    }

    @Override // org.springframework.kafka.test.rule.KafkaRule
    public String getBrokersAsString() {
        return FastList.newList(Arrays.asList(getBrokerAddresses())).collect(new Function<BrokerAddress, String>() { // from class: org.springframework.kafka.test.rule.KafkaEmbedded.3
            public String valueOf(BrokerAddress brokerAddress) {
                return brokerAddress.getHost() + ":" + brokerAddress.getPort();
            }
        }).makeString(",");
    }

    @Override // org.springframework.kafka.test.rule.KafkaRule
    public boolean isEmbedded() {
        return true;
    }
}
