package com.azure.messaging.servicebus;

import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import java.util.Objects;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.util.context.Context;

/* loaded from: input_file:com/azure/messaging/servicebus/FluxTrace.class */
final class FluxTrace extends FluxOperator<ServiceBusMessageContext, ServiceBusMessageContext> {
    private final ServiceBusReceiverInstrumentation instrumentation;

    /* loaded from: input_file:com/azure/messaging/servicebus/FluxTrace$TracingSubscriber.class */
    private static class TracingSubscriber extends BaseSubscriber<ServiceBusMessageContext> {
        private final CoreSubscriber<? super ServiceBusMessageContext> downstream;
        private final ServiceBusReceiverInstrumentation instrumentation;
        private final ServiceBusTracer tracer;

        TracingSubscriber(CoreSubscriber<? super ServiceBusMessageContext> coreSubscriber, ServiceBusReceiverInstrumentation serviceBusReceiverInstrumentation) {
            this.downstream = coreSubscriber;
            this.instrumentation = serviceBusReceiverInstrumentation;
            this.tracer = serviceBusReceiverInstrumentation.getTracer();
        }

        public Context currentContext() {
            return this.downstream.currentContext();
        }

        protected void hookOnSubscribe(Subscription subscription) {
            this.downstream.onSubscribe(this);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void hookOnNext(ServiceBusMessageContext serviceBusMessageContext) {
            if (serviceBusMessageContext == null || serviceBusMessageContext.getMessage() == null) {
                this.downstream.onNext(serviceBusMessageContext);
                return;
            }
            com.azure.core.util.Context instrumentProcess = this.instrumentation.instrumentProcess("ServiceBus.process", serviceBusMessageContext.getMessage(), com.azure.core.util.Context.NONE);
            serviceBusMessageContext.getMessage().setContext(instrumentProcess);
            AutoCloseable makeSpanCurrent = this.tracer.makeSpanCurrent(instrumentProcess);
            try {
                try {
                    this.downstream.onNext(serviceBusMessageContext);
                    if (!this.instrumentation.isProcessorInstrumentation()) {
                        this.tracer.endSpan(null, instrumentProcess, makeSpanCurrent);
                    }
                } catch (Throwable th) {
                    this.tracer.endSpan(th, instrumentProcess, makeSpanCurrent);
                    throw th;
                }
            } finally {
                this.tracer.closeScope(makeSpanCurrent);
            }
        }

        protected void hookOnError(Throwable th) {
            this.downstream.onError(th);
        }

        protected void hookOnComplete() {
            this.downstream.onComplete();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FluxTrace(Flux<? extends ServiceBusMessageContext> flux, ServiceBusReceiverInstrumentation serviceBusReceiverInstrumentation) {
        super(flux);
        this.instrumentation = serviceBusReceiverInstrumentation;
    }

    public void subscribe(CoreSubscriber<? super ServiceBusMessageContext> coreSubscriber) {
        Objects.requireNonNull(coreSubscriber, "'coreSubscriber' cannot be null.");
        this.source.subscribe(new TracingSubscriber(coreSubscriber, this.instrumentation));
    }
}
