package io.smallrye.reactive.messaging.kafka.health;

import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.kafka.KafkaAdmin;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.kafka.impl.KafkaAdminHelper;
import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Metric;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/health/KafkaSinkHealth.class */
public class KafkaSinkHealth extends BaseHealth {
    private final KafkaAdmin admin;
    private final String topic;
    private final ReactiveKafkaProducer<?, ?> client;
    private final Duration adminClientTimeout;
    private Metric metric;

    public KafkaSinkHealth(KafkaConnectorOutgoingConfiguration kafkaConnectorOutgoingConfiguration, Map<String, ?> map, ReactiveKafkaProducer<?, ?> reactiveKafkaProducer) {
        super(kafkaConnectorOutgoingConfiguration.getChannel(), kafkaConnectorOutgoingConfiguration.getHealthReadinessTopicVerification().orElse(kafkaConnectorOutgoingConfiguration.getHealthTopicVerificationEnabled()).booleanValue(), kafkaConnectorOutgoingConfiguration.getHealthTopicVerificationStartupDisabled().booleanValue(), kafkaConnectorOutgoingConfiguration.getHealthTopicVerificationReadinessDisabled().booleanValue());
        this.topic = kafkaConnectorOutgoingConfiguration.getTopic().orElse(kafkaConnectorOutgoingConfiguration.getChannel());
        this.adminClientTimeout = Duration.ofMillis(kafkaConnectorOutgoingConfiguration.getHealthReadinessTimeout().orElse(kafkaConnectorOutgoingConfiguration.getHealthTopicVerificationTimeout()).longValue());
        this.client = reactiveKafkaProducer;
        if (kafkaConnectorOutgoingConfiguration.getHealthReadinessTopicVerification().orElse(kafkaConnectorOutgoingConfiguration.getHealthTopicVerificationEnabled()).booleanValue()) {
            this.admin = KafkaAdminHelper.createAdminClient(new HashMap(map), kafkaConnectorOutgoingConfiguration.getChannel(), true);
        } else {
            this.admin = null;
        }
    }

    protected synchronized Metric getMetric() {
        Producer<?, ?> unwrap;
        if (this.metric == null && (unwrap = this.client.unwrap()) != null) {
            this.metric = getMetric(unwrap.metrics());
        }
        return this.metric;
    }

    @Override // io.smallrye.reactive.messaging.kafka.health.BaseHealth
    public KafkaAdmin getAdmin() {
        return this.admin;
    }

    @Override // io.smallrye.reactive.messaging.kafka.health.BaseHealth
    protected void metricsBasedStartupCheck(HealthReport.HealthReportBuilder healthReportBuilder) {
        Metric metric = getMetric();
        if (metric != null) {
            healthReportBuilder.add(this.channel, ((Double) metric.metricValue()).doubleValue() >= 1.0d);
        } else {
            healthReportBuilder.add(this.channel, true).build();
        }
    }

    @Override // io.smallrye.reactive.messaging.kafka.health.BaseHealth
    protected void metricsBasedReadinessCheck(HealthReport.HealthReportBuilder healthReportBuilder) {
        metricsBasedStartupCheck(healthReportBuilder);
    }

    @Override // io.smallrye.reactive.messaging.kafka.health.BaseHealth
    protected void clientBasedStartupCheck(HealthReport.HealthReportBuilder healthReportBuilder) {
        try {
            Map map = (Map) this.admin.describeTopics(Collections.singleton(this.topic), new DescribeTopicsOptions().timeoutMs(Integer.valueOf((int) this.adminClientTimeout.toMillis())).includeAuthorizedOperations(false)).await().atMost(this.adminClientTimeout);
            if (!map.containsKey(this.topic)) {
                healthReportBuilder.add(this.channel, false, "Unable to find topic " + this.topic);
            } else if (((TopicDescription) map.get(this.topic)).partitions().stream().allMatch(topicPartitionInfo -> {
                return topicPartitionInfo.leader() != null;
            })) {
                healthReportBuilder.add(this.channel, true);
            } else {
                healthReportBuilder.add(this.channel, false, "Unable to find leaders for all partitions of topic " + this.topic);
            }
        } catch (Exception e) {
            healthReportBuilder.add(this.channel, false, "No response from broker for topic " + this.topic + " : " + e);
        }
    }

    @Override // io.smallrye.reactive.messaging.kafka.health.BaseHealth
    protected void clientBasedReadinessCheck(HealthReport.HealthReportBuilder healthReportBuilder) {
        try {
            this.admin.describeCluster(new DescribeClusterOptions().timeoutMs(Integer.valueOf((int) this.adminClientTimeout.toMillis())).includeAuthorizedOperations(false)).await().atMost(this.adminClientTimeout);
            healthReportBuilder.add(this.channel, true);
        } catch (Exception e) {
            healthReportBuilder.add(this.channel, false, "Failed to get response from broker for channel " + this.channel + " : " + e);
        }
    }
}
