package com.azure.messaging.eventhubs.implementation.instrumentation;

import com.azure.core.amqp.implementation.MessageFlux;
import java.util.Objects;
import org.apache.qpid.proton.message.Message;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.util.context.Context;

/* loaded from: input_file:com/azure/messaging/eventhubs/implementation/instrumentation/InstrumentedMessageFlux.class */
public final class InstrumentedMessageFlux extends FluxOperator<Message, Message> {
    private final EventHubsConsumerInstrumentation instrumentation;
    private final String partitionId;

    /* loaded from: input_file:com/azure/messaging/eventhubs/implementation/instrumentation/InstrumentedMessageFlux$TracingSubscriber.class */
    private static class TracingSubscriber extends BaseSubscriber<Message> {
        private final CoreSubscriber<? super Message> downstream;
        private final EventHubsConsumerInstrumentation instrumentation;
        private final String partitionId;

        TracingSubscriber(CoreSubscriber<? super Message> coreSubscriber, String str, EventHubsConsumerInstrumentation eventHubsConsumerInstrumentation) {
            this.downstream = coreSubscriber;
            this.instrumentation = eventHubsConsumerInstrumentation;
            this.partitionId = str;
        }

        public Context currentContext() {
            return this.downstream.currentContext();
        }

        protected void hookOnSubscribe(Subscription subscription) {
            this.downstream.onSubscribe(this);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void hookOnNext(Message message) {
            InstrumentationScope startAsyncConsume = this.instrumentation.startAsyncConsume(message, this.partitionId);
            try {
                try {
                    this.downstream.onNext(message);
                    startAsyncConsume.close();
                } catch (Exception e) {
                    startAsyncConsume.setError(e);
                    throw e;
                }
            } catch (Throwable th) {
                startAsyncConsume.close();
                throw th;
            }
        }

        protected void hookOnError(Throwable th) {
            this.downstream.onError(th);
        }

        protected void hookOnComplete() {
            this.downstream.onComplete();
        }
    }

    private InstrumentedMessageFlux(MessageFlux messageFlux, String str, EventHubsConsumerInstrumentation eventHubsConsumerInstrumentation) {
        super(messageFlux);
        this.instrumentation = eventHubsConsumerInstrumentation;
        this.partitionId = str;
    }

    public static Flux<Message> instrument(MessageFlux messageFlux, String str, EventHubsConsumerInstrumentation eventHubsConsumerInstrumentation) {
        return eventHubsConsumerInstrumentation.isEnabled() ? new InstrumentedMessageFlux(messageFlux, str, eventHubsConsumerInstrumentation) : messageFlux;
    }

    public void subscribe(CoreSubscriber<? super Message> coreSubscriber) {
        Objects.requireNonNull(coreSubscriber, "'coreSubscriber' cannot be null.");
        this.source.subscribe(new TracingSubscriber(coreSubscriber, this.partitionId, this.instrumentation));
    }
}
