package io.opentelemetry.instrumentation.reactor.v3_1;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndStrategies;
import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.ContextView;

/* loaded from: input_file:io/opentelemetry/instrumentation/reactor/v3_1/ContextPropagationOperator.class */
public final class ContextPropagationOperator {
    private final ReactorAsyncOperationEndStrategy asyncOperationEndStrategy;
    private static final Logger logger = Logger.getLogger(ContextPropagationOperator.class.getName());
    private static final Object VALUE = new Object();

    @Nullable
    private static final MethodHandle MONO_CONTEXT_WRITE_METHOD = getContextWriteMethod(Mono.class);

    @Nullable
    private static final MethodHandle FLUX_CONTEXT_WRITE_METHOD = getContextWriteMethod(Flux.class);

    @Nullable
    private static final MethodHandle SCHEDULERS_HOOK_METHOD = getSchedulersHookMethod();
    private static final Object TRACE_CONTEXT_KEY = new Object() { // from class: io.opentelemetry.instrumentation.reactor.v3_1.ContextPropagationOperator.1
        public String toString() {
            return "otel-trace-context";
        }
    };
    private static final Object lock = new Object();
    private static volatile boolean enabled = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/opentelemetry/instrumentation/reactor/v3_1/ContextPropagationOperator$Lifter.class */
    public static class Lifter<T> implements BiFunction<Scannable, CoreSubscriber<? super T>, CoreSubscriber<? super T>> {
        private final ReactorAsyncOperationEndStrategy asyncOperationEndStrategy;

        public Lifter(ReactorAsyncOperationEndStrategy reactorAsyncOperationEndStrategy) {
            this.asyncOperationEndStrategy = reactorAsyncOperationEndStrategy;
        }

        @Override // java.util.function.BiFunction
        public CoreSubscriber<? super T> apply(Scannable scannable, CoreSubscriber<? super T> coreSubscriber) {
            return new TracingSubscriber(coreSubscriber, coreSubscriber.currentContext());
        }
    }

    /* loaded from: input_file:io/opentelemetry/instrumentation/reactor/v3_1/ContextPropagationOperator$RunnableWrapper.class */
    private static class RunnableWrapper implements Runnable {
        private final Runnable delegate;
        private final Context context = Context.current();

        RunnableWrapper(Runnable runnable) {
            this.delegate = runnable;
        }

        @Override // java.lang.Runnable
        public void run() {
            Scope makeCurrent = this.context.makeCurrent();
            try {
                this.delegate.run();
                if (makeCurrent != null) {
                    makeCurrent.close();
                }
            } catch (Throwable th) {
                if (makeCurrent != null) {
                    try {
                        makeCurrent.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/opentelemetry/instrumentation/reactor/v3_1/ContextPropagationOperator$ScalarPropagatingFlux.class */
    public static class ScalarPropagatingFlux extends Flux<Object> implements Scannable {
        private final Flux<?> source;

        static <T> Flux<T> create(Flux<T> flux) {
            return new ScalarPropagatingFlux(flux).flatMap(obj -> {
                return flux;
            });
        }

        private ScalarPropagatingFlux(Flux<?> flux) {
            this.source = flux;
        }

        public void subscribe(CoreSubscriber<? super Object> coreSubscriber) {
            ContextPropagationOperator.subscribeInActiveSpan(coreSubscriber, ContextPropagationOperator.VALUE);
        }

        @Nullable
        public Object scanUnsafe(Scannable.Attr attr) {
            if (attr == Scannable.Attr.PARENT) {
                return this.source;
            }
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/opentelemetry/instrumentation/reactor/v3_1/ContextPropagationOperator$ScalarPropagatingMono.class */
    public static class ScalarPropagatingMono extends Mono<Object> implements Scannable {
        private final Mono<?> source;

        static <T> Mono<T> create(Mono<T> mono) {
            return new ScalarPropagatingMono(mono).flatMap(obj -> {
                return mono;
            });
        }

        private ScalarPropagatingMono(Mono<?> mono) {
            this.source = mono;
        }

        public void subscribe(CoreSubscriber<? super Object> coreSubscriber) {
            ContextPropagationOperator.subscribeInActiveSpan(coreSubscriber, ContextPropagationOperator.VALUE);
        }

        @Nullable
        public Object scanUnsafe(Scannable.Attr attr) {
            if (attr == Scannable.Attr.PARENT) {
                return this.source;
            }
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/opentelemetry/instrumentation/reactor/v3_1/ContextPropagationOperator$StoreOpenTelemetryContext.class */
    public static class StoreOpenTelemetryContext implements Function<reactor.util.context.Context, reactor.util.context.Context> {
        private final Context tracingContext;

        private StoreOpenTelemetryContext(Context context) {
            this.tracingContext = context;
        }

        @Override // java.util.function.Function
        public reactor.util.context.Context apply(reactor.util.context.Context context) {
            return ContextPropagationOperator.storeOpenTelemetryContext(context, this.tracingContext);
        }
    }

    @Nullable
    private static MethodHandle getContextWriteMethod(Class<?> cls) {
        MethodHandles.Lookup publicLookup = MethodHandles.publicLookup();
        try {
            return publicLookup.findVirtual(cls, "contextWrite", MethodType.methodType(cls, (Class<?>) Function.class));
        } catch (IllegalAccessException | NoSuchMethodException e) {
            try {
                return publicLookup.findVirtual(cls, "subscriberContext", MethodType.methodType(cls, (Class<?>) Function.class));
            } catch (IllegalAccessException | NoSuchMethodException e2) {
                return null;
            }
        }
    }

    @Nullable
    private static MethodHandle getSchedulersHookMethod() {
        try {
            return MethodHandles.publicLookup().findStatic(Schedulers.class, "onScheduleHook", MethodType.methodType(Void.TYPE, String.class, Function.class));
        } catch (IllegalAccessException | NoSuchMethodException e) {
            return null;
        }
    }

    public static ContextPropagationOperator create() {
        return builder().build();
    }

    public static ContextPropagationOperatorBuilder builder() {
        return new ContextPropagationOperatorBuilder();
    }

    public static reactor.util.context.Context storeOpenTelemetryContext(reactor.util.context.Context context, Context context2) {
        return context.put(TRACE_CONTEXT_KEY, context2);
    }

    public static Context getOpenTelemetryContext(reactor.util.context.Context context, Context context2) {
        return (Context) context.getOrDefault(TRACE_CONTEXT_KEY, context2);
    }

    @NoMuzzle
    public static Context getOpenTelemetryContextFromContextView(ContextView contextView, Context context) {
        return (Context) contextView.getOrDefault(TRACE_CONTEXT_KEY, context);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ContextPropagationOperator(boolean z) {
        this.asyncOperationEndStrategy = ReactorAsyncOperationEndStrategy.builder().setCaptureExperimentalSpanAttributes(z).build();
    }

    public void registerOnEachOperator() {
        synchronized (lock) {
            if (enabled) {
                return;
            }
            Hooks.onEachOperator(TracingSubscriber.class.getName(), tracingLift(this.asyncOperationEndStrategy));
            AsyncOperationEndStrategies.instance().registerStrategy(this.asyncOperationEndStrategy);
            registerScheduleHook(RunnableWrapper.class.getName(), RunnableWrapper::new);
            enabled = true;
        }
    }

    private static void registerScheduleHook(String str, Function<Runnable, Runnable> function) {
        if (SCHEDULERS_HOOK_METHOD == null) {
            return;
        }
        try {
            (void) SCHEDULERS_HOOK_METHOD.invoke(str, function);
        } catch (Throwable th) {
            logger.log(Level.WARNING, "Failed to install scheduler hook", th);
        }
    }

    public void resetOnEachOperator() {
        synchronized (lock) {
            if (enabled) {
                Hooks.resetOnEachOperator(TracingSubscriber.class.getName());
                AsyncOperationEndStrategies.instance().unregisterStrategy(this.asyncOperationEndStrategy);
                enabled = false;
            }
        }
    }

    private static <T> Function<? super Publisher<T>, ? extends Publisher<T>> tracingLift(ReactorAsyncOperationEndStrategy reactorAsyncOperationEndStrategy) {
        return Operators.lift(ContextPropagationOperator::shouldInstrument, new Lifter(reactorAsyncOperationEndStrategy));
    }

    public static <T> Mono<T> runWithContext(Mono<T> mono, Context context) {
        if (!enabled || MONO_CONTEXT_WRITE_METHOD == null) {
            return mono;
        }
        try {
            return (Mono) MONO_CONTEXT_WRITE_METHOD.invoke(ScalarPropagatingMono.create(mono), new StoreOpenTelemetryContext(context));
        } catch (Throwable th) {
            throw ((RuntimeException) sneakyThrow(th));
        }
    }

    public static <T> Flux<T> runWithContext(Flux<T> flux, Context context) {
        if (!enabled || FLUX_CONTEXT_WRITE_METHOD == null) {
            return flux;
        }
        try {
            return (Flux) FLUX_CONTEXT_WRITE_METHOD.invoke(ScalarPropagatingFlux.create(flux), new StoreOpenTelemetryContext(context));
        } catch (Throwable th) {
            throw ((RuntimeException) sneakyThrow(th));
        }
    }

    private static <T extends Throwable> T sneakyThrow(Throwable th) throws Throwable {
        throw th;
    }

    private static boolean shouldInstrument(Scannable scannable) {
        return !(scannable instanceof Fuseable.ScalarCallable);
    }

    static void subscribeInActiveSpan(CoreSubscriber<? super Object> coreSubscriber, Object obj) {
        Context openTelemetryContext = getOpenTelemetryContext(coreSubscriber.currentContext(), null);
        if (openTelemetryContext == null || openTelemetryContext == Context.current()) {
            coreSubscriber.onSubscribe(Operators.scalarSubscription(coreSubscriber, obj));
            return;
        }
        Scope makeCurrent = openTelemetryContext.makeCurrent();
        try {
            coreSubscriber.onSubscribe(Operators.scalarSubscription(coreSubscriber, obj));
            if (makeCurrent != null) {
                makeCurrent.close();
            }
        } catch (Throwable th) {
            if (makeCurrent != null) {
                try {
                    makeCurrent.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
