package org.springframework.cloud.stream.binder.kafka;

import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import kafka.cluster.Broker;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils$;
import org.I0Itec.zkclient.ZkClient;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.integration.kafka.core.BrokerAddress;
import org.springframework.integration.kafka.core.Partition;
import scala.collection.JavaConversions;

/* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/KafkaBinderHealthIndicator.class */
public class KafkaBinderHealthIndicator implements HealthIndicator {
    private final KafkaMessageChannelBinder binder;
    private final KafkaBinderConfigurationProperties configurationProperties;

    public KafkaBinderHealthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder, KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
        this.binder = kafkaMessageChannelBinder;
        this.configurationProperties = kafkaBinderConfigurationProperties;
    }

    public Health health() {
        ZkClient zkClient = null;
        try {
            try {
                ZkClient zkClient2 = new ZkClient(this.configurationProperties.getZkConnectionString(), this.configurationProperties.getZkSessionTimeout(), this.configurationProperties.getZkConnectionTimeout(), ZKStringSerializer$.MODULE$);
                HashSet hashSet = new HashSet();
                Iterator it = JavaConversions.asJavaCollection(ZkUtils$.MODULE$.getAllBrokersInCluster(zkClient2)).iterator();
                while (it.hasNext()) {
                    hashSet.add(((Broker) it.next()).connectionString());
                }
                HashSet hashSet2 = new HashSet();
                Iterator<Map.Entry<String, Collection<Partition>>> it2 = this.binder.getTopicsInUse().entrySet().iterator();
                while (it2.hasNext()) {
                    Iterator<Partition> it3 = it2.next().getValue().iterator();
                    while (it3.hasNext()) {
                        BrokerAddress leader = this.binder.getConnectionFactory().getLeader(it3.next());
                        if (!hashSet.contains(leader.toString())) {
                            hashSet2.add(leader.toString());
                        }
                    }
                }
                if (hashSet2.isEmpty()) {
                    Health build = Health.up().build();
                    if (zkClient2 != null) {
                        try {
                            zkClient2.close();
                        } catch (Exception e) {
                        }
                    }
                    return build;
                }
                Health build2 = Health.down().withDetail("Following brokers are down: ", hashSet2.toString()).build();
                if (zkClient2 != null) {
                    try {
                        zkClient2.close();
                    } catch (Exception e2) {
                    }
                }
                return build2;
            } catch (Exception e3) {
                Health build3 = Health.down(e3).build();
                if (0 != 0) {
                    try {
                        zkClient.close();
                    } catch (Exception e4) {
                    }
                }
                return build3;
            }
        } catch (Throwable th) {
            if (0 != 0) {
                try {
                    zkClient.close();
                } catch (Exception e5) {
                }
            }
            throw th;
        }
    }
}
