/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.sleuth.instrument.kafka;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.assertj.core.api.BDDAssertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.BDDMockito;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.cloud.sleuth.exporter.FinishedSpan;
import org.springframework.cloud.sleuth.instrument.kafka.KafkaTestUtils;
import org.springframework.cloud.sleuth.instrument.kafka.TracingKafkaConsumerFactory;
import org.springframework.cloud.sleuth.instrument.kafka.TracingKafkaPropagatorGetter;
import org.springframework.cloud.sleuth.propagation.Propagator;
import org.springframework.cloud.sleuth.test.TestSpanHandler;
import org.springframework.cloud.sleuth.test.TestTracingAwareSupplier;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.ResolvableType;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import reactor.core.Disposable;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.internals.ConsumerFactory;

@Testcontainers
@ExtendWith(value={MockitoExtension.class})
@Tag(value="DockerRequired")
public abstract class KafkaReceiverTest
implements TestTracingAwareSupplier {
    protected String testTopic;
    protected Tracer tracer = this.tracerTest().tracing().tracer();
    protected Propagator propagator = this.tracerTest().tracing().propagator();
    protected TestSpanHandler spans = this.tracerTest().handler();
    private Disposable consumerSubscription;
    protected final AtomicInteger receivedCounter = new AtomicInteger(0);
    @Mock(answer=Answers.RETURNS_DEEP_STUBS)
    BeanFactory beanFactory;
    @Container
    protected static final KafkaContainer kafkaContainer = (KafkaContainer)((KafkaContainer)new KafkaContainer(DockerImageName.parse((String)"confluentinc/cp-kafka:6.1.1")).withExposedPorts(new Integer[]{9093})).waitingFor((WaitStrategy)Wait.forListeningPort());

    @BeforeAll
    static void setupAll() {
        kafkaContainer.start();
    }

    @AfterAll
    static void destroyAll() {
        kafkaContainer.stop();
    }

    @BeforeEach
    void setup() {
        BDDMockito.given((Object)this.beanFactory.getBean(Propagator.class)).willReturn((Object)this.propagator);
        BDDMockito.given((Object)this.beanFactory.getBeanProvider(ResolvableType.forClassWithGenerics(Propagator.Getter.class, (ResolvableType[])new ResolvableType[]{ResolvableType.forType((ParameterizedTypeReference)new ParameterizedTypeReference<ConsumerRecord<?, ?>>(){})})).getIfAvailable()).willReturn((Object)new TracingKafkaPropagatorGetter());
        this.testTopic = UUID.randomUUID().toString();
        HashMap<String, Object> consumerProperties = new HashMap<String, Object>();
        consumerProperties.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
        consumerProperties.put("group.id", "test-consumer-group");
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        consumerProperties.put("auto.offset.reset", "earliest");
        ReceiverOptions options = ReceiverOptions.create(consumerProperties);
        options = options.withKeyDeserializer((Deserializer)new StringDeserializer()).withValueDeserializer((Deserializer)new StringDeserializer()).subscription(Collections.singletonList(this.testTopic));
        KafkaReceiver kafkaReceiver = KafkaReceiver.create((ConsumerFactory)new TracingKafkaConsumerFactory(this.beanFactory), (ReceiverOptions)options);
        this.consumerSubscription = kafkaReceiver.receive().subscribeOn(Schedulers.single()).subscribe(record -> this.receivedCounter.incrementAndGet());
        this.receivedCounter.set(0);
    }

    @AfterEach
    void destroy() {
        this.consumerSubscription.dispose();
    }

    @Test
    public void should_create_and_finish_consumer_span() {
        KafkaProducer<String, String> kafkaProducer = KafkaTestUtils.buildTestKafkaProducer(kafkaContainer.getBootstrapServers());
        ProducerRecord producerRecord = new ProducerRecord(this.testTopic, (Object)"test", (Object)"test");
        kafkaProducer.send(producerRecord);
        Awaitility.await().atMost(Duration.ofSeconds(15L)).until(() -> this.receivedCounter.intValue() == 1);
        BDDAssertions.then((Object)this.tracer.currentSpan()).isNull();
        BDDAssertions.then((Iterable)this.spans).hasSize(1);
        FinishedSpan span = this.spans.get(0);
        BDDAssertions.then((Comparable)span.getKind()).isEqualTo((Object)Span.Kind.CONSUMER);
        BDDAssertions.then((Map)span.getTags()).isNotEmpty();
        BDDAssertions.then((String)((String)span.getTags().get("kafka.topic"))).isEqualTo(this.testTopic);
        BDDAssertions.then((String)((String)span.getTags().get("kafka.offset"))).isEqualTo("0");
        BDDAssertions.then((String)((String)span.getTags().get("kafka.partition"))).isEqualTo("0");
    }

    @Override
    public void cleanUpTracing() {
        this.spans.clear();
    }
}

