package org.springframework.cloud.sleuth.instrument.messaging;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.autoconfig.instrument.messaging.SleuthMessagingProperties;
import org.springframework.cloud.sleuth.test.TestSpanHandler;
import org.springframework.cloud.sleuth.test.TestTracingAwareSupplier;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.ChannelInterceptorAdapter;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.messaging.support.ExecutorChannelInterceptor;
import org.springframework.messaging.support.ExecutorSubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/cloud/sleuth/instrument/messaging/TracingChannelInterceptorTest.class */
public abstract class TracingChannelInterceptorTest implements TestTracingAwareSupplier {
    protected Message message;
    protected ChannelInterceptor interceptor = new TracingChannelInterceptor(tracerTest().tracing().tracer(), tracerTest().tracing().propagator(), new MessageHeaderPropagatorSetter(), new MessageHeaderPropagatorGetter(), remoteServiceNameMapper(new SleuthMessagingProperties()), new DefaultMessageSpanCustomizer());
    protected TestSpanHandler spans = tracerTest().handler();
    protected QueueChannel channel = new QueueChannel();
    protected DirectChannel directChannel = new DirectChannel();
    protected MessageHandler handler = new MessageHandler() { // from class: org.springframework.cloud.sleuth.instrument.messaging.TracingChannelInterceptorTest.1
        public void handleMessage(Message<?> message) throws MessagingException {
            TracingChannelInterceptorTest.this.message = message;
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.springframework.cloud.sleuth.instrument.messaging.TracingChannelInterceptorTest$1ExecutorSideOnly, reason: invalid class name */
    /* loaded from: input_file:org/springframework/cloud/sleuth/instrument/messaging/TracingChannelInterceptorTest$1ExecutorSideOnly.class */
    public class C1ExecutorSideOnly extends ChannelInterceptorAdapter implements ExecutorChannelInterceptor {
        final /* synthetic */ ChannelInterceptor val$delegate;

        C1ExecutorSideOnly(ChannelInterceptor channelInterceptor) {
            this.val$delegate = channelInterceptor;
        }

        public Message<?> beforeHandle(Message<?> message, MessageChannel messageChannel, MessageHandler messageHandler) {
            return this.val$delegate.beforeHandle(message, messageChannel, messageHandler);
        }

        public void afterMessageHandled(Message<?> message, MessageChannel messageChannel, MessageHandler messageHandler, Exception exc) {
            this.val$delegate.afterMessageHandled(message, messageChannel, messageHandler, exc);
        }
    }

    static Function<String, String> remoteServiceNameMapper(SleuthMessagingProperties sleuthMessagingProperties) {
        return str -> {
            if (!StringUtils.hasText(str)) {
                return null;
            }
            if (str.startsWith("amqp") || str.startsWith("rabbit")) {
                return sleuthMessagingProperties.getRabbit().getRemoteServiceName();
            }
            if (str.startsWith("kafka")) {
                return sleuthMessagingProperties.getKafka().getRemoteServiceName();
            }
            return null;
        };
    }

    @AfterEach
    public void close() {
        tracerTest().close();
    }

    @Test
    public void pollingReceive_emptyQueue() {
        this.channel.addInterceptor(consumerSideOnly(this.interceptor));
        Assertions.assertThat(this.channel.receive(0L)).isNull();
        Assertions.assertThat(this.spans).hasSize(0);
    }

    @Test
    public void injectsProducerSpan() {
        this.channel.addInterceptor(producerSideOnly(this.interceptor));
        this.channel.send(MessageBuilder.withPayload("foo").build());
        Assertions.assertThat(this.channel.receive().getHeaders()).containsKey("b3");
        Assertions.assertThat(this.spans).hasSize(1).extracting((v0) -> {
            return v0.getKind();
        }).containsExactly(new Span.Kind[]{Span.Kind.PRODUCER});
    }

    @Test
    public void injectsProducerAndConsumerSpan() {
        this.directChannel.addInterceptor(this.interceptor);
        this.directChannel.subscribe(this.handler);
        this.directChannel.send(MessageBuilder.withPayload("foo").build());
        Assertions.assertThat(this.message).isNotNull();
        Assertions.assertThat(this.message.getHeaders()).containsKeys(new String[]{"b3", "nativeHeaders"});
        Assertions.assertThat(this.spans).extracting((v0) -> {
            return v0.getKind();
        }).contains(new Span.Kind[]{Span.Kind.CONSUMER, Span.Kind.PRODUCER});
    }

    @Test
    public void allowsSpanCustomization() {
        this.interceptor = new TracingChannelInterceptor(tracerTest().tracing().tracer(), tracerTest().tracing().propagator(), new MessageHeaderPropagatorSetter(), new MessageHeaderPropagatorGetter(), remoteServiceNameMapper(new SleuthMessagingProperties()), new MyMessageSpanCustomizer());
        this.directChannel.addInterceptor(this.interceptor);
        this.directChannel.subscribe(this.handler);
        this.directChannel.send(MessageBuilder.withPayload("foo").build());
        Assertions.assertThat(this.spans.reportedSpans().stream().filter(finishedSpan -> {
            return "changedHandle".equals(finishedSpan.getName());
        }).findFirst().map(finishedSpan2 -> {
            return (String) finishedSpan2.getTags().get("handleKey");
        })).isPresent().get().isEqualTo("handleValue");
        Assertions.assertThat(this.spans.reportedSpans().stream().filter(finishedSpan3 -> {
            return "changedSend".equals(finishedSpan3.getName());
        }).findFirst().map(finishedSpan4 -> {
            return (String) finishedSpan4.getTags().get("sendKey");
        })).isPresent().get().isEqualTo("sendValue");
    }

    @Test
    public void injectsProducerSpan_nativeHeaders() {
        this.channel.addInterceptor(producerSideOnly(this.interceptor));
        this.channel.send(MessageBuilder.withPayload("foo").build());
        Assertions.assertThat((Map) this.channel.receive().getHeaders().get("nativeHeaders")).containsOnlyKeys(new Object[]{"b3"});
    }

    @Test
    public void pollingReceive_injectsConsumerSpan() {
        this.channel.addInterceptor(consumerSideOnly(this.interceptor));
        this.channel.send(MessageBuilder.withPayload("foo").build());
        Assertions.assertThat(this.channel.receive().getHeaders()).containsKeys(new String[]{"b3", "nativeHeaders"});
        Assertions.assertThat(this.spans).hasSize(1).extracting((v0) -> {
            return v0.getKind();
        }).containsExactly(new Span.Kind[]{Span.Kind.CONSUMER});
    }

    @Test
    public void pollingReceive_injectsConsumerSpan_nativeHeaders() {
        this.channel.addInterceptor(consumerSideOnly(this.interceptor));
        this.channel.send(MessageBuilder.withPayload("foo").build());
        Assertions.assertThat((Map) this.channel.receive().getHeaders().get("nativeHeaders")).containsOnlyKeys(new Object[]{"b3"});
    }

    @Test
    public void subscriber_startsAndStopsConsumerAndProcessingSpan() {
        ExecutorSubscribableChannel executorSubscribableChannel = new ExecutorSubscribableChannel();
        executorSubscribableChannel.addInterceptor(executorSideOnly(this.interceptor));
        ArrayList arrayList = new ArrayList();
        arrayList.getClass();
        executorSubscribableChannel.subscribe((v1) -> {
            r1.add(v1);
        });
        executorSubscribableChannel.send(MessageBuilder.withPayload("foo").build());
        Assertions.assertThat(((Message) arrayList.get(0)).getHeaders()).doesNotContainKeys(new String[]{"b3", "nativeHeaders"});
        Assertions.assertThat(this.spans).extracting((v0) -> {
            return v0.getKind();
        }).containsExactly(new Span.Kind[]{Span.Kind.CONSUMER, null});
    }

    @Test
    public void subscriber_removesTraceIdsFromMessage() {
        ExecutorSubscribableChannel executorSubscribableChannel = new ExecutorSubscribableChannel();
        executorSubscribableChannel.addInterceptor(this.interceptor);
        ArrayList arrayList = new ArrayList();
        arrayList.getClass();
        executorSubscribableChannel.subscribe((v1) -> {
            r1.add(v1);
        });
        executorSubscribableChannel.send(MessageBuilder.withPayload("foo").build());
        Assertions.assertThat(((Message) arrayList.get(0)).getHeaders()).doesNotContainKeys(new String[]{"b3"});
    }

    @Test
    public void subscriber_removesTraceIdsFromMessage_nativeHeaders() {
        ExecutorSubscribableChannel executorSubscribableChannel = new ExecutorSubscribableChannel();
        executorSubscribableChannel.addInterceptor(this.interceptor);
        ArrayList arrayList = new ArrayList();
        arrayList.getClass();
        executorSubscribableChannel.subscribe((v1) -> {
            r1.add(v1);
        });
        executorSubscribableChannel.send(MessageBuilder.withPayload("foo").build());
        Assertions.assertThat((Map) ((Message) arrayList.get(0)).getHeaders().get("nativeHeaders")).doesNotContainKeys(new Object[]{"b3"});
    }

    @Test
    public void integrated_sendAndPoll() {
        this.channel.addInterceptor(this.interceptor);
        this.channel.send(MessageBuilder.withPayload("foo").build());
        this.channel.receive();
        Assertions.assertThat(this.spans).extracting((v0) -> {
            return v0.getKind();
        }).containsExactlyInAnyOrder(new Span.Kind[]{Span.Kind.CONSUMER, Span.Kind.PRODUCER});
    }

    @Test
    public void integrated_sendAndSubscriber() {
        ExecutorSubscribableChannel executorSubscribableChannel = new ExecutorSubscribableChannel();
        executorSubscribableChannel.addInterceptor(this.interceptor);
        ArrayList arrayList = new ArrayList();
        arrayList.getClass();
        executorSubscribableChannel.subscribe((v1) -> {
            r1.add(v1);
        });
        executorSubscribableChannel.send(MessageBuilder.withPayload("foo").build());
        Assertions.assertThat(this.spans).extracting((v0) -> {
            return v0.getKind();
        }).containsExactly(new Span.Kind[]{Span.Kind.CONSUMER, null, Span.Kind.PRODUCER});
    }

    @Test
    public void errorMessageHeadersRetained() {
        this.channel.addInterceptor(this.interceptor);
        QueueChannel queueChannel = new QueueChannel();
        QueueChannel queueChannel2 = new QueueChannel();
        HashMap hashMap = new HashMap();
        hashMap.put("replyChannel", queueChannel2);
        hashMap.put("errorChannel", queueChannel2);
        this.channel.send(new ErrorMessage(new MessagingException(MessageBuilder.withPayload("hi").setHeader("b3", "000000000000000a-000000000000000a").setReplyChannel(queueChannel).setErrorChannel(queueChannel).build()), hashMap));
        this.message = this.channel.receive();
        Assertions.assertThat(this.message).isNotNull();
        B3Context b3Context = new B3Context((String) this.message.getHeaders().get("b3", String.class));
        Assertions.assertThat(b3Context.traceId).endsWith("000000000000000a");
        Assertions.assertThat(b3Context.spanId).doesNotEndWith("000000000000000a");
        Assertions.assertThat(this.spans).hasSize(2);
        Assertions.assertThat(this.message.getHeaders().getReplyChannel()).isSameAs(queueChannel2);
        Assertions.assertThat(this.message.getHeaders().getErrorChannel()).isSameAs(queueChannel2);
    }

    @Test
    public void errorMessageOriginalMessageRetained() {
        this.channel.addInterceptor(this.interceptor);
        Message build = MessageBuilder.withPayload("Hello").setHeader("header", "value").build();
        this.channel.send(new ErrorMessage(new MessagingException(MessageBuilder.fromMessage(build).removeHeader("header").build()), build.getHeaders(), build));
        this.message = this.channel.receive();
        Assertions.assertThat(this.message).isNotNull();
        Assertions.assertThat(this.message).isInstanceOfSatisfying(ErrorMessage.class, errorMessage -> {
            Assertions.assertThat(errorMessage.getOriginalMessage()).isSameAs(build);
            Assertions.assertThat(errorMessage.getHeaders().get("header")).isEqualTo("value");
        });
    }

    @Test
    public void errorMessageHeadersWithNullPayloadRetained() {
        this.channel.addInterceptor(this.interceptor);
        HashMap hashMap = new HashMap();
        hashMap.put("b3", "000000000000000a-000000000000000a");
        this.channel.send(new ErrorMessage(new MessagingException("exception"), hashMap));
        this.message = this.channel.receive();
        B3Context b3Context = new B3Context((String) this.message.getHeaders().get("b3", String.class));
        Assertions.assertThat(b3Context.traceId).endsWith("000000000000000a");
        Assertions.assertThat(b3Context.spanId).doesNotEndWith("000000000000000a");
        Assertions.assertThat(this.spans).hasSize(2);
    }

    @Test
    public void should_store_kafka_as_remote_service_name_when_kafka_header_is_present() {
        ExecutorSubscribableChannel executorSubscribableChannel = new ExecutorSubscribableChannel();
        executorSubscribableChannel.addInterceptor(this.interceptor);
        ArrayList arrayList = new ArrayList();
        arrayList.getClass();
        executorSubscribableChannel.subscribe((v1) -> {
            r1.add(v1);
        });
        HashMap hashMap = new HashMap();
        hashMap.put("kafka_messageKey", "hello");
        executorSubscribableChannel.send(MessageBuilder.createMessage("foo", new MessageHeaders(hashMap)));
        Assertions.assertThat(this.spans).extracting((v0) -> {
            return v0.getRemoteServiceName();
        }).contains(new String[]{"kafka"});
    }

    @Test
    public void should_store_rabbitmq_as_remote_service_name_when_rabbit_header_is_present() {
        ExecutorSubscribableChannel executorSubscribableChannel = new ExecutorSubscribableChannel();
        executorSubscribableChannel.addInterceptor(this.interceptor);
        ArrayList arrayList = new ArrayList();
        arrayList.getClass();
        executorSubscribableChannel.subscribe((v1) -> {
            r1.add(v1);
        });
        HashMap hashMap = new HashMap();
        hashMap.put("amqp_receivedRoutingKey", "hello");
        executorSubscribableChannel.send(MessageBuilder.createMessage("foo", new MessageHeaders(hashMap)));
        Assertions.assertThat(this.spans).extracting((v0) -> {
            return v0.getRemoteServiceName();
        }).contains(new String[]{"rabbitmq"});
    }

    @Test
    public void should_store_broker_as_remote_service_name_when_no_special_headers_were_found() {
        ExecutorSubscribableChannel executorSubscribableChannel = new ExecutorSubscribableChannel();
        executorSubscribableChannel.addInterceptor(this.interceptor);
        ArrayList arrayList = new ArrayList();
        arrayList.getClass();
        executorSubscribableChannel.subscribe((v1) -> {
            r1.add(v1);
        });
        executorSubscribableChannel.send(MessageBuilder.createMessage("foo", new MessageHeaders(new HashMap())));
        Assertions.assertThat(this.spans).extracting((v0) -> {
            return v0.getRemoteServiceName();
        }).containsOnly(new String[]{"broker", null});
    }

    @Test
    public void should_propagate_headers_case_insensitive() {
        this.channel.addInterceptor(this.interceptor);
        HashMap hashMap = new HashMap();
        hashMap.put("Foo-Id", "123");
        hashMap.put("baz-id", "456");
        this.channel.send(MessageBuilder.createMessage("foo", new MessageHeaders(hashMap)));
        Message receive = this.channel.receive();
        Assertions.assertThat(receive.getHeaders()).isNotEmpty();
        Assertions.assertThat(receive.getHeaders().get("not-propagated-header")).isNull();
        Assertions.assertThat(receive.getHeaders().get("Foo-Id")).isEqualTo("123");
        Assertions.assertThat(receive.getHeaders().get("baz-id")).isEqualTo("456");
    }

    @Test
    public void should_propagate_native_headers_case_insensitive() {
        this.channel.addInterceptor(this.interceptor);
        LinkedMultiValueMap linkedMultiValueMap = new LinkedMultiValueMap();
        linkedMultiValueMap.put("Foo-Id", Collections.singletonList("123"));
        linkedMultiValueMap.put("baz-id", Collections.singletonList("456"));
        HashMap hashMap = new HashMap();
        hashMap.put("nativeHeaders", linkedMultiValueMap);
        this.channel.send(MessageBuilder.createMessage("foo", new MessageHeaders(hashMap)));
        Message receive = this.channel.receive();
        Assertions.assertThat(receive.getHeaders()).isNotEmpty();
        LinkedMultiValueMap linkedMultiValueMap2 = (LinkedMultiValueMap) receive.getHeaders().get("nativeHeaders");
        Assertions.assertThat(linkedMultiValueMap2).isNotEmpty();
        Assertions.assertThat(linkedMultiValueMap2.get("not-propagated-header")).isNull();
        Assertions.assertThat(linkedMultiValueMap2.get("Foo-Id")).isEqualTo(Collections.singletonList("123"));
        Assertions.assertThat(linkedMultiValueMap2.get("baz-id")).isEqualTo(Collections.singletonList("456"));
    }

    public ChannelInterceptor producerSideOnly(final ChannelInterceptor channelInterceptor) {
        return new ChannelInterceptorAdapter() { // from class: org.springframework.cloud.sleuth.instrument.messaging.TracingChannelInterceptorTest.2
            public Message<?> preSend(Message<?> message, MessageChannel messageChannel) {
                return channelInterceptor.preSend(message, messageChannel);
            }

            public void afterSendCompletion(Message<?> message, MessageChannel messageChannel, boolean z, Exception exc) {
                channelInterceptor.afterSendCompletion(message, messageChannel, z, exc);
            }
        };
    }

    ChannelInterceptor consumerSideOnly(final ChannelInterceptor channelInterceptor) {
        return new ChannelInterceptorAdapter() { // from class: org.springframework.cloud.sleuth.instrument.messaging.TracingChannelInterceptorTest.3
            public Message<?> postReceive(Message<?> message, MessageChannel messageChannel) {
                return channelInterceptor.postReceive(message, messageChannel);
            }

            public void afterReceiveCompletion(Message<?> message, MessageChannel messageChannel, Exception exc) {
                channelInterceptor.afterReceiveCompletion(message, messageChannel, exc);
            }
        };
    }

    ExecutorChannelInterceptor executorSideOnly(ChannelInterceptor channelInterceptor) {
        return new C1ExecutorSideOnly(channelInterceptor);
    }
}
