package co.elastic.otel;

import co.elastic.otel.common.AbstractChainingSpanProcessor;
import co.elastic.otel.common.LocalRootSpan;
import co.elastic.otel.common.util.ExecutorUtils;
import co.elastic.otel.common.util.HexUtils;
import co.elastic.otel.hostid.ProfilerProvidedHostId;
import co.elastic.otel.profiler.DecodeException;
import co.elastic.otel.profiler.ProfilerMessage;
import co.elastic.otel.profiler.ProfilerRegistrationMessage;
import co.elastic.otel.profiler.TraceCorrelationMessage;
import io.opentelemetry.javaagent.bootstrap.PatchLogger;
import io.opentelemetry.javaagent.shaded.io.opentelemetry.api.trace.Span;
import io.opentelemetry.javaagent.shaded.io.opentelemetry.context.Context;
import io.opentelemetry.javaagent.shaded.io.opentelemetry.context.ContextStorage;
import io.opentelemetry.javaagent.shaded.io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SpanProcessor;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Base64;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import java.util.logging.Level;
import javax.annotation.Nullable;

/* loaded from: input_file:inst/co/elastic/otel/UniversalProfilingProcessor.classdata */
public class UniversalProfilingProcessor extends AbstractChainingSpanProcessor {
    static final long POLL_FREQUENCY_MS = 20;
    final boolean tryEnableVirtualThreadSupport;
    final SpanProfilingSamplesCorrelator correlator;
    private final ScheduledExecutorService messagePollAndSpanFlushExecutor;
    String socketPath;
    volatile boolean tlsPropagationActive;
    private static final PatchLogger log = PatchLogger.getLogger(UniversalProfilingProcessor.class.getName());
    private static final long INITIAL_SPAN_DELAY_NANOS = Duration.ofSeconds(1).toNanos();
    private static boolean anyInstanceActive = false;

    /* loaded from: input_file:inst/co/elastic/otel/UniversalProfilingProcessor$ActivationListener.classdata */
    private static class ActivationListener implements ContextStorage {

        @Nullable
        private static volatile UniversalProfilingProcessor processor;
        private final ContextStorage delegate;

        private ActivationListener(ContextStorage contextStorage) {
            this.delegate = contextStorage;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static synchronized void setProcessor(@Nullable UniversalProfilingProcessor universalProfilingProcessor) {
            if (universalProfilingProcessor != null && processor != null) {
                throw new IllegalStateException("Only one processor can be registered at a time");
            }
            processor = universalProfilingProcessor;
        }

        @Override // io.opentelemetry.javaagent.shaded.io.opentelemetry.context.ContextStorage
        public Scope attach(Context context) {
            UniversalProfilingProcessor universalProfilingProcessor = processor;
            if (universalProfilingProcessor == null) {
                return this.delegate.attach(context);
            }
            Context current = this.delegate.current();
            universalProfilingProcessor.onContextChange(current, context);
            Scope attach = this.delegate.attach(context);
            return () -> {
                UniversalProfilingProcessor universalProfilingProcessor2 = processor;
                if (universalProfilingProcessor2 != null) {
                    universalProfilingProcessor2.onContextChange(context, current);
                }
                attach.close();
            };
        }

        @Override // io.opentelemetry.javaagent.shaded.io.opentelemetry.context.ContextStorage
        @Nullable
        public Context current() {
            return this.delegate.current();
        }

        static {
            ContextStorage.addWrapper(ActivationListener::new);
            processor = null;
        }
    }

    public static UniversalProfilingProcessorBuilder builder(SpanProcessor spanProcessor, Resource resource) {
        return new UniversalProfilingProcessorBuilder(spanProcessor, resource);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public UniversalProfilingProcessor(SpanProcessor spanProcessor, Resource resource, int i, boolean z, boolean z2, String str, LongSupplier longSupplier) {
        super(spanProcessor);
        long j;
        this.tlsPropagationActive = false;
        synchronized (UniversalProfilingProcessor.class) {
            this.tryEnableVirtualThreadSupport = z2;
            if (anyInstanceActive) {
                throw new IllegalStateException("Another instance has already been started and not stopped yet. There must be at most one processor of this type active at a time!");
            }
            if (z) {
                j = 0;
            } else {
                j = INITIAL_SPAN_DELAY_NANOS;
                enableTlsPropagation();
            }
            SpanProcessor spanProcessor2 = this.next;
            Objects.requireNonNull(spanProcessor2);
            this.correlator = new SpanProfilingSamplesCorrelator(i, longSupplier, j, spanProcessor2::onEnd);
            this.socketPath = openProfilerSocket(str);
            try {
                UniversalProfilingCorrelation.setProcessStorage(ProfilerSharedMemoryWriter.generateProcessCorrelationStorage(resource, this.socketPath));
                this.messagePollAndSpanFlushExecutor = Executors.newSingleThreadScheduledExecutor(ExecutorUtils.threadFactory("elastic-profiler-correlation-", true));
                this.messagePollAndSpanFlushExecutor.scheduleWithFixedDelay(this::pollMessagesAndFlushPendingSpans, POLL_FREQUENCY_MS, POLL_FREQUENCY_MS, TimeUnit.MILLISECONDS);
                ActivationListener.setProcessor(this);
                anyInstanceActive = true;
            } catch (Exception e) {
                UniversalProfilingCorrelation.stopProfilerReturnChannel();
                throw e;
            }
        }
    }

    private synchronized void enableTlsPropagation() {
        if (this.tlsPropagationActive) {
            return;
        }
        try {
            log.log(Level.FINE, "Setting virtual thread support to {0}", new Object[]{Boolean.valueOf(this.tryEnableVirtualThreadSupport)});
            UniversalProfilingCorrelation.setVirtualThreadSupportEnabled(this.tryEnableVirtualThreadSupport);
        } catch (Exception e) {
            log.log(Level.SEVERE, "Could not enable virtual thread support, correlation will only work for platform threads", (Throwable) e);
        }
        this.tlsPropagationActive = true;
    }

    private String openProfilerSocket(String str) {
        Path resolve;
        Path path = Paths.get(str, new String[0]);
        if (!Files.exists(path, new LinkOption[0]) && !path.toFile().mkdirs()) {
            throw new IllegalArgumentException("Could not create directory '" + str + "'");
        }
        do {
            resolve = path.resolve(randomSocketFileName());
        } while (Files.exists(resolve, new LinkOption[0]));
        String path2 = resolve.toAbsolutePath().toString();
        log.log(Level.FINE, "Opening profiler correlation socket {0}", new Object[]{path2});
        UniversalProfilingCorrelation.startProfilerReturnChannel(path2);
        return path2;
    }

    private String randomSocketFileName() {
        StringBuilder sb = new StringBuilder("essock");
        Random random = new Random();
        for (int i = 0; i < 8; i++) {
            sb.append("abcdefghijklmonpqrstuvwxzyABCDEFGHIJKLMONPQRSTUVWXYZ0123456789".charAt(random.nextInt("abcdefghijklmonpqrstuvwxzyABCDEFGHIJKLMONPQRSTUVWXYZ0123456789".length())));
        }
        return sb.toString();
    }

    @Override // co.elastic.otel.common.AbstractChainingSpanProcessor
    protected void doOnStart(Context context, ReadWriteSpan readWriteSpan) {
        LocalRootSpan.onSpanStart(readWriteSpan, context);
        this.correlator.onSpanStart(readWriteSpan, context);
    }

    @Override // io.opentelemetry.sdk.trace.SpanProcessor
    public void onEnd(ReadableSpan readableSpan) {
        this.correlator.sendOrBufferSpan(readableSpan);
    }

    @Override // co.elastic.otel.common.AbstractChainingSpanProcessor
    protected CompletableResultCode doShutdown() {
        try {
            ActivationListener.setProcessor(null);
            UniversalProfilingCorrelation.reset();
            anyInstanceActive = false;
            this.messagePollAndSpanFlushExecutor.shutdown();
            try {
                this.messagePollAndSpanFlushExecutor.awaitTermination(10L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                log.log(Level.WARNING, "Could not wait for executor termination", (Throwable) e);
            }
            consumeProfilerMessages();
            this.correlator.shutdownAndFlushAll();
            CompletableResultCode ofSuccess = CompletableResultCode.ofSuccess();
            UniversalProfilingCorrelation.stopProfilerReturnChannel();
            return ofSuccess;
        } catch (Throwable th) {
            UniversalProfilingCorrelation.stopProfilerReturnChannel();
            throw th;
        }
    }

    @Override // co.elastic.otel.common.AbstractChainingSpanProcessor
    protected boolean requiresStart() {
        return true;
    }

    @Override // io.opentelemetry.sdk.trace.SpanProcessor
    public boolean isEndRequired() {
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    @Nullable
    public void onContextChange(@Nullable Context context, @Nullable Context context2) {
        if (this.tlsPropagationActive) {
            try {
                Span safeSpanFromContext = safeSpanFromContext(context);
                Span safeSpanFromContext2 = safeSpanFromContext(context2);
                if (safeSpanFromContext != safeSpanFromContext2 && !safeSpanFromContext.getSpanContext().equals(safeSpanFromContext2.getSpanContext())) {
                    ProfilerSharedMemoryWriter.updateThreadCorrelationStorage(safeSpanFromContext2);
                }
            } catch (Throwable th) {
                log.log(Level.SEVERE, "Error on context update", th);
            }
        }
    }

    private Span safeSpanFromContext(@Nullable Context context) {
        return context == null ? Span.getInvalid() : Span.fromContext(context);
    }

    synchronized void pollMessagesAndFlushPendingSpans() {
        consumeProfilerMessages();
        this.correlator.flushPendingBufferedSpans();
    }

    private void consumeProfilerMessages() {
        ProfilerMessage readProfilerReturnChannelMessage;
        StringBuilder sb = new StringBuilder();
        while (true) {
            try {
                try {
                    readProfilerReturnChannelMessage = UniversalProfilingCorrelation.readProfilerReturnChannelMessage();
                } catch (DecodeException e) {
                    log.log(Level.WARNING, "Failed to read profiler message", (Throwable) e);
                }
                if (readProfilerReturnChannelMessage == null) {
                    return;
                }
                if (readProfilerReturnChannelMessage instanceof TraceCorrelationMessage) {
                    handleMessage((TraceCorrelationMessage) readProfilerReturnChannelMessage, sb);
                } else if (readProfilerReturnChannelMessage instanceof ProfilerRegistrationMessage) {
                    handleMessage((ProfilerRegistrationMessage) readProfilerReturnChannelMessage);
                } else {
                    log.log(Level.FINE, "Received unknown message type from profiler: {0}", readProfilerReturnChannelMessage);
                }
            } catch (Exception e2) {
                log.log(Level.SEVERE, "Cannot read from profiler socket", (Throwable) e2);
                return;
            }
        }
    }

    private void handleMessage(ProfilerRegistrationMessage profilerRegistrationMessage) {
        log.log(Level.FINE, "Received profiler registration message! host.id is {0} and the span delay is {1} ms", new Object[]{profilerRegistrationMessage.getHostId(), Long.valueOf(profilerRegistrationMessage.getSamplesDelayMillis())});
        enableTlsPropagation();
        this.correlator.setSpanBufferDurationNanos(Duration.ofMillis(profilerRegistrationMessage.getSamplesDelayMillis() + POLL_FREQUENCY_MS).toNanos());
        ProfilerProvidedHostId.set(profilerRegistrationMessage.getHostId());
    }

    private void handleMessage(TraceCorrelationMessage traceCorrelationMessage, StringBuilder sb) {
        sb.setLength(0);
        HexUtils.appendAsHex(traceCorrelationMessage.getTraceId(), sb);
        String sb2 = sb.toString();
        sb.setLength(0);
        HexUtils.appendAsHex(traceCorrelationMessage.getLocalRootSpanId(), sb);
        this.correlator.correlate(sb2, sb.toString(), Base64.getUrlEncoder().withoutPadding().encodeToString(traceCorrelationMessage.getStackTraceId()), traceCorrelationMessage.getSampleCount());
    }
}
