package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0;

import io.opentelemetry.javaagent.shaded.instrumentation.kafka.internal.KafkaConsumerContext;
import io.opentelemetry.javaagent.shaded.instrumentation.kafka.internal.KafkaConsumerContextUtil;
import io.opentelemetry.javaagent.shaded.instrumentation.kafka.internal.KafkaProcessRequest;
import io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.ContextPropagationOperator;
import io.opentelemetry.javaagent.shaded.io.opentelemetry.context.Context;
import io.opentelemetry.javaagent.shaded.io.opentelemetry.context.Scope;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Operators;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:applicationinsights-agent-3.4.18.jar:inst/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/InstrumentedKafkaFlux.classdata */
public final class InstrumentedKafkaFlux<R extends ConsumerRecord<?, ?>> extends FluxOperator<R, R> {

    /* loaded from: input_file:applicationinsights-agent-3.4.18.jar:inst/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/InstrumentedKafkaFlux$InstrumentedSubscriber.classdata */
    static final class InstrumentedSubscriber implements CoreSubscriber<ConsumerRecord<?, ?>>, Subscription, Scannable {
        private final CoreSubscriber<ConsumerRecord<?, ?>> actual;
        private final Context currentContext;
        private Subscription subscription;

        InstrumentedSubscriber(CoreSubscriber<ConsumerRecord<?, ?>> coreSubscriber) {
            this.actual = coreSubscriber;
            this.currentContext = ContextPropagationOperator.getOpenTelemetryContext(coreSubscriber.currentContext(), Context.current());
        }

        @Override // reactor.core.CoreSubscriber, org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            if (Operators.validate(this.subscription, subscription)) {
                this.subscription = subscription;
                this.actual.onSubscribe(this);
            }
        }

        @Override // reactor.core.CoreSubscriber
        public reactor.util.context.Context currentContext() {
            return this.actual.currentContext();
        }

        /* JADX WARN: Finally extract failed */
        @Override // org.reactivestreams.Subscriber
        public void onNext(ConsumerRecord<?, ?> consumerRecord) {
            KafkaConsumerContext kafkaConsumerContext = KafkaConsumerContextUtil.get(consumerRecord);
            Context context = kafkaConsumerContext.getContext();
            Context context2 = context != null ? context : this.currentContext;
            KafkaProcessRequest create = KafkaProcessRequest.create(kafkaConsumerContext, consumerRecord);
            if (!ReactorKafkaSingletons.processInstrumenter().shouldStart(context2, create)) {
                this.actual.onNext(consumerRecord);
                return;
            }
            Context start = ReactorKafkaSingletons.processInstrumenter().start(context2, create);
            try {
                Scope makeCurrent = start.makeCurrent();
                try {
                    this.actual.onNext(consumerRecord);
                    if (makeCurrent != null) {
                        makeCurrent.close();
                    }
                    ReactorKafkaSingletons.processInstrumenter().end(start, create, null, null);
                } catch (Throwable th) {
                    if (makeCurrent != null) {
                        try {
                            makeCurrent.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                ReactorKafkaSingletons.processInstrumenter().end(start, create, null, null);
                throw th3;
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            Scope makeCurrent = this.currentContext.makeCurrent();
            try {
                this.actual.onError(th);
                if (makeCurrent != null) {
                    makeCurrent.close();
                }
            } catch (Throwable th2) {
                if (makeCurrent != null) {
                    try {
                        makeCurrent.close();
                    } catch (Throwable th3) {
                        th2.addSuppressed(th3);
                    }
                }
                throw th2;
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            Scope makeCurrent = this.currentContext.makeCurrent();
            try {
                this.actual.onComplete();
                if (makeCurrent != null) {
                    makeCurrent.close();
                }
            } catch (Throwable th) {
                if (makeCurrent != null) {
                    try {
                        makeCurrent.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }

        @Override // org.reactivestreams.Subscription
        public void request(long j) {
            this.subscription.request(j);
        }

        @Override // org.reactivestreams.Subscription
        public void cancel() {
            this.subscription.cancel();
        }

        @Override // reactor.core.Scannable
        public Object scanUnsafe(Scannable.Attr attr) {
            if (attr == Scannable.Attr.ACTUAL) {
                return this.actual;
            }
            if (attr == Scannable.Attr.PARENT) {
                return this.subscription;
            }
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public InstrumentedKafkaFlux(Flux<R> flux) {
        super(flux);
    }

    @Override // reactor.core.publisher.Flux, reactor.core.CorePublisher
    public void subscribe(CoreSubscriber<? super R> coreSubscriber) {
        this.source.subscribe((CoreSubscriber) new InstrumentedSubscriber(coreSubscriber));
    }
}
