package com.azure.messaging.eventhubs.implementation.instrumentation;

import com.azure.core.amqp.AmqpMessageConstant;
import com.azure.core.util.Context;
import com.azure.core.util.metrics.Meter;
import com.azure.core.util.tracing.SpanKind;
import com.azure.core.util.tracing.StartSpanOptions;
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.implementation.MessageUtils;
import com.azure.messaging.eventhubs.models.EventBatchContext;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import java.time.Instant;
import java.util.Objects;
import java.util.function.BiConsumer;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.message.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/messaging/eventhubs/implementation/instrumentation/EventHubsConsumerInstrumentation.class */
public final class EventHubsConsumerInstrumentation {
    private static final Symbol ENQUEUED_TIME_UTC_ANNOTATION_NAME_SYMBOL = Symbol.valueOf(AmqpMessageConstant.ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue());
    private static final InstrumentationScope NOOP_SCOPE = new InstrumentationScope(null, null, null);
    private final EventHubsTracer tracer;
    private final EventHubsMetricsProvider meter;
    private final boolean isSync;

    public EventHubsConsumerInstrumentation(Tracer tracer, Meter meter, String str, String str2, String str3, boolean z) {
        this.tracer = new EventHubsTracer(tracer, str, str2, str3);
        this.meter = new EventHubsMetricsProvider(meter, str, str2, str3);
        this.isSync = z;
    }

    public EventHubsTracer getTracer() {
        return this.tracer;
    }

    public InstrumentationScope createScope(BiConsumer<EventHubsMetricsProvider, InstrumentationScope> biConsumer) {
        return isEnabled() ? new InstrumentationScope(this.tracer, this.meter, biConsumer) : NOOP_SCOPE;
    }

    public InstrumentationScope startAsyncConsume(Message message, String str) {
        if (!isEnabled()) {
            return NOOP_SCOPE;
        }
        InstrumentationScope createScope = createScope((eventHubsMetricsProvider, instrumentationScope) -> {
            if (this.isSync) {
                return;
            }
            eventHubsMetricsProvider.reportProcess(1, str, instrumentationScope);
        });
        Instant enqueuedTime = MessageUtils.getEnqueuedTime(message.getMessageAnnotations().getValue(), ENQUEUED_TIME_UTC_ANNOTATION_NAME_SYMBOL);
        if (!this.isSync) {
            ApplicationProperties applicationProperties = message.getApplicationProperties();
            createScope.setSpan(this.tracer.startProcessSpan(applicationProperties == null ? null : applicationProperties.getValue(), enqueuedTime, str)).makeSpanCurrent();
        }
        if (enqueuedTime != null) {
            this.meter.reportLag(enqueuedTime, str, createScope);
        }
        return createScope;
    }

    public Flux<PartitionEvent> syncReceive(Flux<PartitionEvent> flux, String str) {
        if (!isEnabled()) {
            return flux;
        }
        StartSpanOptions createStartOptions = this.tracer.isEnabled() ? this.tracer.createStartOptions(SpanKind.CLIENT, OperationName.RECEIVE, str) : null;
        Integer[] numArr = {0};
        return Flux.using(() -> {
            if (createStartOptions != null) {
                createStartOptions.setStartTimestamp(Instant.now());
            }
            return createScope((eventHubsMetricsProvider, instrumentationScope) -> {
                this.meter.reportReceive(numArr[0].intValue(), str, instrumentationScope);
            });
        }, instrumentationScope -> {
            Flux doOnNext = flux.doOnNext(partitionEvent -> {
                if (createStartOptions != null) {
                    numArr[0] = Integer.valueOf(numArr[0].intValue() + 1);
                    EventData data = partitionEvent.getData();
                    createStartOptions.addLink(this.tracer.createLink(data.getProperties(), data.getEnqueuedTime()));
                }
            });
            Objects.requireNonNull(instrumentationScope);
            Flux doOnError = doOnNext.doOnError(instrumentationScope::setError);
            Objects.requireNonNull(instrumentationScope);
            return doOnError.doOnCancel(instrumentationScope::setCancelled);
        }, instrumentationScope2 -> {
            if (createStartOptions != null) {
                createStartOptions.setAttribute(InstrumentationUtils.MESSAGING_BATCH_MESSAGE_COUNT, numArr[0]);
                createStartOptions.setAttribute(InstrumentationUtils.MESSAGING_DESTINATION_PARTITION_ID, str);
                instrumentationScope2.setSpan(this.tracer.startSpan(OperationName.RECEIVE, createStartOptions, Context.NONE));
            }
            instrumentationScope2.close();
        });
    }

    public InstrumentationScope startProcess(EventBatchContext eventBatchContext) {
        return (eventBatchContext.getEvents().isEmpty() || !isEnabled()) ? NOOP_SCOPE : createScope((eventHubsMetricsProvider, instrumentationScope) -> {
            eventHubsMetricsProvider.reportProcess(eventBatchContext.getEvents().size(), eventBatchContext.getPartitionContext().getPartitionId(), instrumentationScope);
        }).setSpan(this.tracer.startProcessSpan(eventBatchContext)).makeSpanCurrent();
    }

    public InstrumentationScope startProcess(EventContext eventContext) {
        EventData eventData = eventContext.getEventData();
        return (eventData == null || !isEnabled()) ? NOOP_SCOPE : createScope((eventHubsMetricsProvider, instrumentationScope) -> {
            eventHubsMetricsProvider.reportProcess(1, eventContext.getPartitionContext().getPartitionId(), instrumentationScope);
        }).setSpan(this.tracer.startProcessSpan(eventData.getProperties(), eventData.getEnqueuedTime(), eventContext.getPartitionContext().getPartitionId())).makeSpanCurrent();
    }

    public <T> Mono<T> instrumentMono(Mono<T> mono, OperationName operationName, String str) {
        return !isEnabled() ? mono : Mono.using(() -> {
            return createScope((eventHubsMetricsProvider, instrumentationScope) -> {
                eventHubsMetricsProvider.reportGenericOperationDuration(operationName, str, instrumentationScope);
            }).setSpan(this.tracer.startGenericOperationSpan(operationName, str, Context.NONE));
        }, instrumentationScope -> {
            Objects.requireNonNull(instrumentationScope);
            Mono doOnError = mono.doOnError(instrumentationScope::setError);
            Objects.requireNonNull(instrumentationScope);
            return doOnError.doOnCancel(instrumentationScope::setCancelled).contextWrite(context -> {
                return context.put("trace-context", instrumentationScope.getSpan());
            });
        }, (v0) -> {
            v0.close();
        });
    }

    public boolean isEnabled() {
        return this.tracer.isEnabled() || this.meter.isEnabled();
    }
}
