package org.apache.pulsar.broker.stats;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.BatchCallback;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.api.metrics.ObservableMeasurement;
import java.util.Optional;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Consumer;

/* loaded from: input_file:org/apache/pulsar/broker/stats/OpenTelemetryConsumerStats.class */
public class OpenTelemetryConsumerStats implements AutoCloseable {
    public static final String MESSAGE_OUT_COUNTER = "pulsar.broker.consumer.message.outgoing.count";
    private final ObservableLongMeasurement messageOutCounter;
    public static final String BYTES_OUT_COUNTER = "pulsar.broker.consumer.message.outgoing.size";
    private final ObservableLongMeasurement bytesOutCounter;
    public static final String MESSAGE_ACK_COUNTER = "pulsar.broker.consumer.message.ack.count";
    private final ObservableLongMeasurement messageAckCounter;
    public static final String MESSAGE_REDELIVER_COUNTER = "pulsar.broker.consumer.message.redeliver.count";
    private final ObservableLongMeasurement messageRedeliverCounter;
    public static final String MESSAGE_UNACKNOWLEDGED_COUNTER = "pulsar.broker.consumer.message.unack.count";
    private final ObservableLongMeasurement messageUnacknowledgedCounter;
    public static final String CONSUMER_BLOCKED_COUNTER = "pulsar.broker.consumer.blocked";
    private final ObservableLongMeasurement consumerBlockedCounter;
    public static final String MESSAGE_PERMITS_COUNTER = "pulsar.broker.consumer.permit.count";
    private final ObservableLongMeasurement messagePermitsCounter;
    private final BatchCallback batchCallback;

    public OpenTelemetryConsumerStats(PulsarService pulsarService) {
        Meter meter = pulsarService.getOpenTelemetry().getMeter();
        this.messageOutCounter = meter.counterBuilder(MESSAGE_OUT_COUNTER).setUnit("{message}").setDescription("The total number of messages dispatched to this consumer.").buildObserver();
        this.bytesOutCounter = meter.counterBuilder(BYTES_OUT_COUNTER).setUnit("By").setDescription("The total number of messages bytes dispatched to this consumer.").buildObserver();
        this.messageAckCounter = meter.counterBuilder(MESSAGE_ACK_COUNTER).setUnit("{ack}").setDescription("The total number of message acknowledgments received from this consumer.").buildObserver();
        this.messageRedeliverCounter = meter.counterBuilder(MESSAGE_REDELIVER_COUNTER).setUnit("{message}").setDescription("The total number of messages that have been redelivered to this consumer.").buildObserver();
        this.messageUnacknowledgedCounter = meter.upDownCounterBuilder(MESSAGE_UNACKNOWLEDGED_COUNTER).setUnit("{message}").setDescription("The total number of messages unacknowledged by this consumer.").buildObserver();
        this.consumerBlockedCounter = meter.upDownCounterBuilder(CONSUMER_BLOCKED_COUNTER).setUnit("1").setDescription("Indicates whether the consumer is currently blocked due to unacknowledged messages.").buildObserver();
        this.messagePermitsCounter = meter.upDownCounterBuilder(MESSAGE_PERMITS_COUNTER).setUnit("{permit}").setDescription("The number of permits currently available for this consumer.").buildObserver();
        this.batchCallback = meter.batchCallback(() -> {
            pulsarService.getBrokerService().getTopics().values().stream().map(completableFuture -> {
                return (Optional) completableFuture.getNow(Optional.empty());
            }).filter((v0) -> {
                return v0.isPresent();
            }).map((v0) -> {
                return v0.get();
            }).map((v0) -> {
                return v0.getSubscriptions();
            }).flatMap(concurrentOpenHashMap -> {
                return concurrentOpenHashMap.values().stream();
            }).map((v0) -> {
                return v0.getConsumers();
            }).flatMap((v0) -> {
                return v0.stream();
            }).forEach(this::recordMetricsForConsumer);
        }, this.messageOutCounter, new ObservableMeasurement[]{this.bytesOutCounter, this.messageAckCounter, this.messageRedeliverCounter, this.messageUnacknowledgedCounter, this.consumerBlockedCounter, this.messagePermitsCounter});
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.batchCallback.close();
    }

    private void recordMetricsForConsumer(Consumer consumer) {
        Attributes openTelemetryAttributes = consumer.getOpenTelemetryAttributes();
        this.messageOutCounter.record(consumer.getMsgOutCounter(), openTelemetryAttributes);
        this.bytesOutCounter.record(consumer.getBytesOutCounter(), openTelemetryAttributes);
        this.messageAckCounter.record(consumer.getMessageAckCounter(), openTelemetryAttributes);
        this.messageRedeliverCounter.record(consumer.getMessageRedeliverCounter(), openTelemetryAttributes);
        this.messageUnacknowledgedCounter.record(consumer.getUnackedMessages(), openTelemetryAttributes);
        this.consumerBlockedCounter.record(consumer.isBlocked() ? 1L : 0L, openTelemetryAttributes);
        this.messagePermitsCounter.record(consumer.getAvailablePermits(), openTelemetryAttributes);
    }
}
