/*
 * Decompiled with CFR 0.152.
 */
package co.elastic.apm.agent.kafka;

import co.elastic.apm.agent.bci.ElasticApmInstrumentation;
import co.elastic.apm.agent.impl.ElasticApmTracer;
import co.elastic.apm.agent.impl.transaction.Span;
import co.elastic.apm.agent.kafka.BaseKafkaHeadersInstrumentation;
import co.elastic.apm.agent.kafka.BaseKafkaInstrumentation;
import co.elastic.apm.agent.kafka.KafkaProducerInstrumentation;
import co.elastic.apm.agent.kafka.helper.KafkaInstrumentationHeadersHelper;
import co.elastic.apm.agent.kafka.helper.KafkaInstrumentationHelper;
import co.elastic.apm.agent.shaded.bytebuddy.asm.Advice;
import co.elastic.apm.agent.shaded.bytebuddy.description.method.MethodDescription;
import co.elastic.apm.agent.shaded.bytebuddy.description.type.TypeDescription;
import co.elastic.apm.agent.shaded.bytebuddy.matcher.ElementMatcher;
import co.elastic.apm.agent.shaded.bytebuddy.matcher.ElementMatchers;
import co.elastic.apm.agent.shaded.slf4j.Logger;
import co.elastic.apm.agent.shaded.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaProducerHeadersInstrumentation
extends BaseKafkaHeadersInstrumentation {
    public static final Logger logger = LoggerFactory.getLogger(KafkaProducerInstrumentation.class);
    public static boolean headersSupported = true;

    public KafkaProducerHeadersInstrumentation(ElasticApmTracer tracer) {
        super(tracer);
    }

    @Override
    public ElementMatcher<? super TypeDescription> getTypeMatcher() {
        return ElementMatchers.named("org.apache.kafka.clients.producer.KafkaProducer");
    }

    @Override
    public ElementMatcher<? super MethodDescription> getMethodMatcher() {
        return ElementMatchers.named("doSend").and(ElementMatchers.takesArgument(0, ElementMatchers.named("org.apache.kafka.clients.producer.ProducerRecord")));
    }

    @Override
    public Class<?> getAdviceClass() {
        return KafkaProducerHeadersAdvice.class;
    }

    public static class KafkaProducerHeadersAdvice {
        @Advice.OnMethodEnter(suppress=Throwable.class)
        @Nullable
        public static Span beforeSend(@Advice.FieldValue(value="apiVersions") ApiVersions apiVersions, @Advice.Argument(value=0) ProducerRecord record, @Advice.Local(value="helper") @Nullable KafkaInstrumentationHelper<Callback, ProducerRecord, KafkaProducer> helper, @Nullable @Advice.Argument(value=1, readOnly=false) Callback callback) {
            if (ElasticApmInstrumentation.tracer == null) {
                return null;
            }
            Span span = null;
            helper = BaseKafkaInstrumentation.kafkaInstrHelperManager.getForClassLoaderOfClass(KafkaProducer.class);
            if (helper != null) {
                span = helper.onSendStart(record);
            }
            if (span == null) {
                return null;
            }
            if (apiVersions.maxUsableProduceMagic() >= 2 && headersSupported) {
                try {
                    KafkaInstrumentationHeadersHelper<ConsumerRecord, ProducerRecord> kafkaInstrumentationHelper = BaseKafkaHeadersInstrumentation.kafkaInstrHeadersHelperManager.getForClassLoaderOfClass(KafkaProducer.class);
                    if (kafkaInstrumentationHelper != null) {
                        kafkaInstrumentationHelper.setOutgoingTraceContextHeaders(span, record);
                    }
                }
                catch (IllegalStateException e) {
                    logger.debug("Failed to add header to Kafka record {}, probably to headers' read-only state.", (Object)record);
                }
            }
            callback = helper.wrapCallback(callback, span);
            return span;
        }

        @Advice.OnMethodExit(onThrowable=Throwable.class, suppress=Throwable.class, repeatOn=Advice.OnNonDefaultValue.class)
        public static boolean afterSend(@Advice.Enter(readOnly=false) @Nullable Span span, @Advice.Argument(value=0, readOnly=false) ProducerRecord record, @Advice.This KafkaProducer thiz, @Advice.Local(value="helper") @Nullable KafkaInstrumentationHelper<Callback, ProducerRecord, KafkaProducer> helper, @Advice.Thrown @Nullable Throwable throwable) {
            if (throwable != null && throwable.getMessage().contains("Magic v1 does not support record headers") && span != null) {
                logger.info("Adding header to Kafka record is not allowed with the used broker, attempting to resend record");
                record = new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(), record.value(), (Iterable)record.headers());
                KafkaInstrumentationHeadersHelper<ConsumerRecord, ProducerRecord> kafkaInstrumentationHelper = BaseKafkaHeadersInstrumentation.kafkaInstrHeadersHelperManager.getForClassLoaderOfClass(KafkaProducer.class);
                if (kafkaInstrumentationHelper != null) {
                    kafkaInstrumentationHelper.removeTraceContextHeader(record);
                }
                span.deactivate();
                span = null;
                headersSupported = false;
                return true;
            }
            if (helper != null && span != null) {
                helper.onSendEnd(span, record, thiz, throwable);
            }
            return false;
        }
    }
}

