package org.springframework.cloud.stream.binder;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.cloud.stream.binder.AbstractTestBinder;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binding.MessageConverterConfigurer;
import org.springframework.cloud.stream.binding.StreamListenerMessageHandler;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.cloud.stream.converter.JavaSerializationMessageConverter;
import org.springframework.cloud.stream.converter.KryoMessageConverter;
import org.springframework.cloud.stream.converter.MessageConverterUtils;
import org.springframework.context.Lifecycle;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.util.Assert;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.ReflectionUtils;

/* loaded from: input_file:org/springframework/cloud/stream/binder/AbstractBinderTests.class */
public abstract class AbstractBinderTests<B extends AbstractTestBinder<? extends AbstractBinder<MessageChannel, CP, PP>, CP, PP>, CP extends ConsumerProperties, PP extends ProducerProperties> {
    protected B testBinder;
    protected SmartMessageConverter messageConverter;
    protected final Log logger = LogFactory.getLog(getClass());
    protected double timeoutMultiplier = 1.0d;

    /* loaded from: input_file:org/springframework/cloud/stream/binder/AbstractBinderTests$Foo.class */
    private class Foo {
        private String name;

        private Foo() {
        }

        public String getName() {
            return this.name;
        }

        public void setName(String str) {
            this.name = str;
        }
    }

    /* loaded from: input_file:org/springframework/cloud/stream/binder/AbstractBinderTests$Station.class */
    public static class Station {
        List<Readings> readings = new ArrayList();

        /* loaded from: input_file:org/springframework/cloud/stream/binder/AbstractBinderTests$Station$Readings.class */
        public static class Readings implements Serializable {
            public String stationid;
            public String customerid;
            public String timestamp;

            public String getStationid() {
                return this.stationid;
            }

            public void setStationid(String str) {
                this.stationid = str;
            }

            public String getCustomerid() {
                return this.customerid;
            }

            public void setCustomerid(String str) {
                this.customerid = str;
            }

            public String getTimestamp() {
                return this.timestamp;
            }

            public void setTimestamp(String str) {
                this.timestamp = str;
            }
        }

        public List<Readings> getReadings() {
            return this.readings;
        }

        public void setReadings(List<Readings> list) {
            this.readings = list;
        }
    }

    @Before
    public void before() {
        this.messageConverter = new CompositeMessageConverterFactory().getMessageConverterForAllRegistered();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Message<?> receive(PollableChannel pollableChannel) {
        return receive(pollableChannel, 1);
    }

    protected Message<?> receive(PollableChannel pollableChannel, int i) {
        long currentTimeMillis = System.currentTimeMillis();
        Message<?> receive = pollableChannel.receive((int) (1000.0d * this.timeoutMultiplier * i));
        this.logger.debug("receive() took " + ((System.currentTimeMillis() - currentTimeMillis) / 1000) + " seconds");
        return receive;
    }

    @Test
    public void testClean() throws Exception {
        B binder = getBinder();
        Binding bindProducer = binder.bindProducer(String.format("foo%s0", getDestinationNameDelimiter()), createBindableChannel("output", new BindingProperties()), createProducerProperties());
        Binding bindConsumer = binder.bindConsumer(String.format("foo%s0", getDestinationNameDelimiter()), "testClean", createBindableChannel("input", new BindingProperties()), createConsumerProperties());
        Binding bindProducer2 = binder.bindProducer(String.format("foo%s1", getDestinationNameDelimiter()), createBindableChannel("output", new BindingProperties()), createProducerProperties());
        Binding bindConsumer2 = binder.bindConsumer(String.format("foo%s1", getDestinationNameDelimiter()), "testClean", createBindableChannel("input", new BindingProperties()), createConsumerProperties());
        Binding bindProducer3 = binder.bindProducer(String.format("foo%s2", getDestinationNameDelimiter()), createBindableChannel("output", new BindingProperties()), createProducerProperties());
        bindProducer.unbind();
        Assertions.assertThat(((Lifecycle) TestUtils.getPropertyValue(bindProducer, "lifecycle", Lifecycle.class)).isRunning()).isFalse();
        bindConsumer.unbind();
        bindProducer2.unbind();
        Assertions.assertThat(((Lifecycle) TestUtils.getPropertyValue(bindConsumer, "lifecycle", Lifecycle.class)).isRunning()).isFalse();
        Assertions.assertThat(((Lifecycle) TestUtils.getPropertyValue(bindProducer2, "lifecycle", Lifecycle.class)).isRunning()).isFalse();
        bindConsumer2.unbind();
        bindProducer3.unbind();
        Assertions.assertThat(((Lifecycle) TestUtils.getPropertyValue(bindConsumer2, "lifecycle", Lifecycle.class)).isRunning()).isFalse();
        Assertions.assertThat(((Lifecycle) TestUtils.getPropertyValue(bindProducer3, "lifecycle", Lifecycle.class)).isRunning()).isFalse();
    }

    @Test
    public void testSendAndReceive() throws Exception {
        B binder = getBinder();
        BindingProperties createProducerBindingProperties = createProducerBindingProperties(createProducerProperties());
        DirectChannel createBindableChannel = createBindableChannel("output", createProducerBindingProperties);
        BindingProperties createConsumerBindingProperties = createConsumerBindingProperties(createConsumerProperties());
        DirectChannel createBindableChannel2 = createBindableChannel("input", createConsumerBindingProperties);
        Binding bindProducer = binder.bindProducer(String.format("foo%s0", getDestinationNameDelimiter()), createBindableChannel, createProducerBindingProperties.getProducer());
        Binding bindConsumer = binder.bindConsumer(String.format("foo%s0", getDestinationNameDelimiter()), "testSendAndReceive", createBindableChannel2, createConsumerBindingProperties.getConsumer());
        Message build = MessageBuilder.withPayload("foo").setHeader("contentType", "text/plain").build();
        binderBindUnbindLatency();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        createBindableChannel2.subscribe(message -> {
            try {
                atomicReference.set(message);
            } finally {
                countDownLatch.countDown();
            }
        });
        createBindableChannel.send(build);
        Assert.isTrue(countDownLatch.await(5L, TimeUnit.SECONDS), "Failed to receive message");
        Assertions.assertThat((byte[]) ((Message) atomicReference.get()).getPayload()).isEqualTo("foo".getBytes());
        Assertions.assertThat(((Message) atomicReference.get()).getHeaders().get("originalContentType")).isNull();
        Assertions.assertThat(((Message) atomicReference.get()).getHeaders().get("contentType").toString()).isEqualTo("text/plain");
        bindProducer.unbind();
        bindConsumer.unbind();
    }

    @Test
    public void testSendAndReceiveKryo() throws Exception {
        B binder = getBinder();
        BindingProperties createProducerBindingProperties = createProducerBindingProperties(createProducerProperties());
        DirectChannel createBindableChannel = createBindableChannel("output", createProducerBindingProperties);
        BindingProperties createConsumerBindingProperties = createConsumerBindingProperties(createConsumerProperties());
        DirectChannel createBindableChannel2 = createBindableChannel("input", createConsumerBindingProperties);
        Binding bindProducer = binder.bindProducer(String.format("foo%s0x", getDestinationNameDelimiter()), createBindableChannel, createProducerBindingProperties.getProducer());
        Binding bindConsumer = binder.bindConsumer(String.format("foo%s0x", getDestinationNameDelimiter()), "testSendAndReceiveKryo", createBindableChannel2, createConsumerBindingProperties.getConsumer());
        Foo foo = new Foo();
        foo.setName("Bill");
        Message build = MessageBuilder.withPayload(foo).setHeader("contentType", MessageConverterUtils.X_JAVA_OBJECT).build();
        binderBindUnbindLatency();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        createBindableChannel2.subscribe(message -> {
            try {
                atomicReference.set(message);
            } finally {
                countDownLatch.countDown();
            }
        });
        createBindableChannel.send(build);
        Assert.isTrue(countDownLatch.await(5L, TimeUnit.SECONDS), "Failed to receive message");
        Assertions.assertThat((Foo) new KryoMessageConverter((List) null, true).fromMessage((Message) atomicReference.get(), Foo.class)).isNotNull();
        Assertions.assertThat(((Message) atomicReference.get()).getHeaders().get("originalContentType")).isNull();
        bindProducer.unbind();
        bindConsumer.unbind();
    }

    @Test
    public void testSendAndReceiveJavaSerialization() throws Exception {
        B binder = getBinder();
        BindingProperties createProducerBindingProperties = createProducerBindingProperties(createProducerProperties());
        DirectChannel createBindableChannel = createBindableChannel("output", createProducerBindingProperties);
        BindingProperties createConsumerBindingProperties = createConsumerBindingProperties(createConsumerProperties());
        DirectChannel createBindableChannel2 = createBindableChannel("input", createConsumerBindingProperties);
        Binding bindProducer = binder.bindProducer(String.format("foo%s0y", getDestinationNameDelimiter()), createBindableChannel, createProducerBindingProperties.getProducer());
        Binding bindConsumer = binder.bindConsumer(String.format("foo%s0y", getDestinationNameDelimiter()), "testSendAndReceiveJavaSerialization", createBindableChannel2, createConsumerBindingProperties.getConsumer());
        Message build = MessageBuilder.withPayload(new SerializableFoo()).setHeader("contentType", MessageConverterUtils.X_JAVA_SERIALIZED_OBJECT).build();
        binderBindUnbindLatency();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        createBindableChannel2.subscribe(message -> {
            try {
                atomicReference.set(message);
            } finally {
                countDownLatch.countDown();
            }
        });
        createBindableChannel.send(build);
        Assert.isTrue(countDownLatch.await(5L, TimeUnit.SECONDS), "Failed to receive message");
        Assertions.assertThat((SerializableFoo) new JavaSerializationMessageConverter().convertFromInternal((Message) atomicReference.get(), SerializableFoo.class, (Object) null)).isNotNull();
        Assertions.assertThat(((Message) atomicReference.get()).getHeaders().get("originalContentType")).isNull();
        Assertions.assertThat(((Message) atomicReference.get()).getHeaders().get("contentType")).isEqualTo(MessageConverterUtils.X_JAVA_SERIALIZED_OBJECT);
        bindProducer.unbind();
        bindConsumer.unbind();
    }

    @Test
    public void testSendAndReceiveMultipleTopics() throws Exception {
        B binder = getBinder();
        BindingProperties createProducerBindingProperties = createProducerBindingProperties(createProducerProperties());
        DirectChannel createBindableChannel = createBindableChannel("output1", createProducerBindingProperties);
        DirectChannel createBindableChannel2 = createBindableChannel("output2", createProducerBindingProperties);
        QueueChannel queueChannel = new QueueChannel();
        Binding bindProducer = binder.bindProducer(String.format("foo%sxy", getDestinationNameDelimiter()), createBindableChannel, createProducerBindingProperties.getProducer());
        Binding bindProducer2 = binder.bindProducer(String.format("foo%syz", getDestinationNameDelimiter()), createBindableChannel2, createProducerBindingProperties.getProducer());
        Binding bindConsumer = binder.bindConsumer(String.format("foo%sxy", getDestinationNameDelimiter()), "testSendAndReceiveMultipleTopics", queueChannel, createConsumerProperties());
        Binding bindConsumer2 = binder.bindConsumer(String.format("foo%syz", getDestinationNameDelimiter()), "testSendAndReceiveMultipleTopics", queueChannel, createConsumerProperties());
        String str = "foo" + UUID.randomUUID().toString();
        Message build = MessageBuilder.withPayload(str.getBytes()).setHeader("contentType", MimeTypeUtils.APPLICATION_OCTET_STREAM).build();
        String str2 = "foo" + UUID.randomUUID().toString();
        Message build2 = MessageBuilder.withPayload(str2.getBytes()).setHeader("contentType", MimeTypeUtils.APPLICATION_OCTET_STREAM).build();
        binderBindUnbindLatency();
        createBindableChannel.send(build);
        createBindableChannel2.send(build2);
        Message[] messageArr = {receive(queueChannel), receive(queueChannel)};
        Assertions.assertThat(messageArr[0]).isNotNull();
        Assertions.assertThat(messageArr[1]).isNotNull();
        Assertions.assertThat(messageArr).extracting("payload").containsExactlyInAnyOrder(new Object[]{str.getBytes(), str2.getBytes()});
        bindProducer.unbind();
        bindProducer2.unbind();
        bindConsumer.unbind();
        bindConsumer2.unbind();
    }

    @Test
    public void testSendAndReceiveNoOriginalContentType() throws Exception {
        B binder = getBinder();
        BindingProperties createProducerBindingProperties = createProducerBindingProperties(createProducerProperties());
        DirectChannel createBindableChannel = createBindableChannel("output", createProducerBindingProperties);
        DirectChannel createBindableChannel2 = createBindableChannel("input", createConsumerBindingProperties(createConsumerProperties()));
        Binding bindProducer = binder.bindProducer(String.format("bar%s0", getDestinationNameDelimiter()), createBindableChannel, createProducerBindingProperties.getProducer());
        Binding bindConsumer = binder.bindConsumer(String.format("bar%s0", getDestinationNameDelimiter()), "testSendAndReceiveNoOriginalContentType", createBindableChannel2, createConsumerProperties());
        binderBindUnbindLatency();
        Message build = MessageBuilder.withPayload("foo").setHeader("contentType", MimeTypeUtils.TEXT_PLAIN).build();
        createBindableChannel.send(build);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        createBindableChannel2.subscribe(message -> {
            try {
                atomicReference.set(message);
            } finally {
                countDownLatch.countDown();
            }
        });
        createBindableChannel.send(build);
        Assert.isTrue(countDownLatch.await(5L, TimeUnit.SECONDS), "Failed to receive message");
        Assertions.assertThat(atomicReference.get()).isNotNull();
        Assertions.assertThat((byte[]) ((Message) atomicReference.get()).getPayload()).isEqualTo("foo".getBytes());
        Assertions.assertThat(((Message) atomicReference.get()).getHeaders().get("contentType").toString()).isEqualTo("text/plain");
        bindProducer.unbind();
        bindConsumer.unbind();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract B getBinder() throws Exception;

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract CP createConsumerProperties();

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract PP createProducerProperties();

    protected final BindingProperties createConsumerBindingProperties(CP cp) {
        BindingProperties bindingProperties = new BindingProperties();
        bindingProperties.setConsumer(cp);
        return bindingProperties;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BindingProperties createProducerBindingProperties(PP pp) {
        BindingProperties bindingProperties = new BindingProperties();
        bindingProperties.setProducer(pp);
        return bindingProperties;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DirectChannel createBindableChannel(String str, BindingProperties bindingProperties) throws Exception {
        return createBindableChannel(str, bindingProperties, str.contains("input"));
    }

    protected DirectChannel createBindableChannel(String str, BindingProperties bindingProperties, boolean z) throws Exception {
        MessageConverterConfigurer createConverterConfigurer = createConverterConfigurer(str, bindingProperties);
        DirectChannel directChannel = new DirectChannel();
        directChannel.setBeanName(str);
        if (z) {
            createConverterConfigurer.configureInputChannel(directChannel, str);
        } else {
            createConverterConfigurer.configureOutputChannel(directChannel, str);
        }
        return directChannel;
    }

    protected DefaultPollableMessageSource createBindableMessageSource(String str, BindingProperties bindingProperties) throws Exception {
        DefaultPollableMessageSource defaultPollableMessageSource = new DefaultPollableMessageSource(new CompositeMessageConverterFactory().getMessageConverterForAllRegistered());
        createConverterConfigurer(str, bindingProperties).configurePolledMessageSource(defaultPollableMessageSource, str);
        return defaultPollableMessageSource;
    }

    private MessageConverterConfigurer createConverterConfigurer(String str, BindingProperties bindingProperties) throws Exception {
        BindingServiceProperties bindingServiceProperties = new BindingServiceProperties();
        bindingServiceProperties.getBindings().put(str, bindingProperties);
        GenericApplicationContext genericApplicationContext = new GenericApplicationContext();
        genericApplicationContext.refresh();
        bindingServiceProperties.setApplicationContext(genericApplicationContext);
        bindingServiceProperties.setConversionService(new DefaultConversionService());
        bindingServiceProperties.afterPropertiesSet();
        MessageConverterConfigurer messageConverterConfigurer = new MessageConverterConfigurer(bindingServiceProperties, new CompositeMessageConverterFactory((List) null, (ObjectMapper) null));
        messageConverterConfigurer.setBeanFactory(genericApplicationContext.getBeanFactory());
        return messageConverterConfigurer;
    }

    @After
    public void cleanup() {
        if (this.testBinder != null) {
            this.testBinder.cleanup();
        }
    }

    protected void binderBindUnbindLatency() throws InterruptedException {
    }

    public abstract Spy spyOn(String str);

    /* JADX INFO: Access modifiers changed from: protected */
    public String getDestinationNameDelimiter() {
        return ".";
    }

    @Test
    public void testSendPojoReceivePojoWithStreamListenerDefaultContentType() throws Exception {
        StreamListenerMessageHandler buildStreamListener = buildStreamListener(AbstractBinderTests.class, "echoStation", Station.class);
        B binder = getBinder();
        BindingProperties createProducerBindingProperties = createProducerBindingProperties(createProducerProperties());
        DirectChannel createBindableChannel = createBindableChannel("output", createProducerBindingProperties);
        BindingProperties createConsumerBindingProperties = createConsumerBindingProperties(createConsumerProperties());
        DirectChannel createBindableChannel2 = createBindableChannel("input", createConsumerBindingProperties);
        Binding bindProducer = binder.bindProducer(String.format("bad%s0a", getDestinationNameDelimiter()), createBindableChannel, createProducerBindingProperties.getProducer());
        Binding bindConsumer = binder.bindConsumer(String.format("bad%s0a", getDestinationNameDelimiter()), "test-1", createBindableChannel2, createConsumerBindingProperties.getConsumer());
        Message build = MessageBuilder.withPayload(new Station()).build();
        createBindableChannel2.subscribe(buildStreamListener);
        createBindableChannel.send(build);
        Assertions.assertThat(buildStreamListener.getOutputChannel().receive(5000L).getPayload() instanceof Station).isTrue();
        bindProducer.unbind();
        bindConsumer.unbind();
    }

    @Test
    public void testSendPojoReceivePojoKryoWithStreamListener() throws Exception {
        StreamListenerMessageHandler buildStreamListener = buildStreamListener(AbstractBinderTests.class, "echoStation", Station.class);
        B binder = getBinder();
        BindingProperties createProducerBindingProperties = createProducerBindingProperties(createProducerProperties());
        DirectChannel createBindableChannel = createBindableChannel("output", createProducerBindingProperties);
        BindingProperties createConsumerBindingProperties = createConsumerBindingProperties(createConsumerProperties());
        DirectChannel createBindableChannel2 = createBindableChannel("input", createConsumerBindingProperties);
        Binding bindProducer = binder.bindProducer(String.format("bad%s0b", getDestinationNameDelimiter()), createBindableChannel, createProducerBindingProperties.getProducer());
        Binding bindConsumer = binder.bindConsumer(String.format("bad%s0b", getDestinationNameDelimiter()), "test-2", createBindableChannel2, createConsumerBindingProperties.getConsumer());
        Message build = MessageBuilder.withPayload(new Station()).setHeader("contentType", MessageConverterUtils.X_JAVA_OBJECT).build();
        createBindableChannel2.subscribe(buildStreamListener);
        createBindableChannel.send(build);
        Assertions.assertThat(buildStreamListener.getOutputChannel().receive(5000L).getPayload() instanceof Station).isTrue();
        bindProducer.unbind();
        bindConsumer.unbind();
    }

    @Test(expected = MessageDeliveryException.class)
    public void testStreamListenerJavaSerializationNonSerializable() throws Exception {
        B binder = getBinder();
        BindingProperties createProducerBindingProperties = createProducerBindingProperties(createProducerProperties());
        DirectChannel createBindableChannel = createBindableChannel("output", createProducerBindingProperties);
        BindingProperties createConsumerBindingProperties = createConsumerBindingProperties(createConsumerProperties());
        DirectChannel createBindableChannel2 = createBindableChannel("input", createConsumerBindingProperties);
        Binding bindProducer = binder.bindProducer(String.format("bad%s0c", getDestinationNameDelimiter()), createBindableChannel, createProducerBindingProperties.getProducer());
        Binding bindConsumer = binder.bindConsumer(String.format("bad%s0c", getDestinationNameDelimiter()), "test-3", createBindableChannel2, createConsumerBindingProperties.getConsumer());
        try {
            createBindableChannel.send(MessageBuilder.withPayload(new Station()).setHeader("contentType", MessageConverterUtils.X_JAVA_SERIALIZED_OBJECT).build());
            bindProducer.unbind();
            bindConsumer.unbind();
        } catch (Throwable th) {
            bindProducer.unbind();
            bindConsumer.unbind();
            throw th;
        }
    }

    @Test
    public void testSendJsonReceivePojoWithStreamListener() throws Exception {
        StreamListenerMessageHandler buildStreamListener = buildStreamListener(AbstractBinderTests.class, "echoStation", Station.class);
        B binder = getBinder();
        BindingProperties createProducerBindingProperties = createProducerBindingProperties(createProducerProperties());
        DirectChannel createBindableChannel = createBindableChannel("output", createProducerBindingProperties);
        BindingProperties createConsumerBindingProperties = createConsumerBindingProperties(createConsumerProperties());
        DirectChannel createBindableChannel2 = createBindableChannel("input", createConsumerBindingProperties);
        Binding bindProducer = binder.bindProducer(String.format("bad%s0d", getDestinationNameDelimiter()), createBindableChannel, createProducerBindingProperties.getProducer());
        Binding bindConsumer = binder.bindConsumer(String.format("bad%s0d", getDestinationNameDelimiter()), "test-4", createBindableChannel2, createConsumerBindingProperties.getConsumer());
        Message build = MessageBuilder.withPayload("{\"readings\":[{\"stationid\":\"fgh\",\"customerid\":\"12345\",\"timestamp\":null},{\"stationid\":\"hjk\",\"customerid\":\"222\",\"timestamp\":null}]}").setHeader("contentType", MimeTypeUtils.APPLICATION_JSON).build();
        createBindableChannel2.subscribe(buildStreamListener);
        createBindableChannel.send(build);
        Message receive = buildStreamListener.getOutputChannel().receive(5000L);
        Assertions.assertThat(receive).isNotNull();
        Assertions.assertThat(receive.getPayload() instanceof Station).isTrue();
        bindProducer.unbind();
        bindConsumer.unbind();
    }

    @Test
    public void testSendJsonReceiveJsonWithStreamListener() throws Exception {
        StreamListenerMessageHandler buildStreamListener = buildStreamListener(AbstractBinderTests.class, "echoStationString", String.class);
        B binder = getBinder();
        BindingProperties createProducerBindingProperties = createProducerBindingProperties(createProducerProperties());
        DirectChannel createBindableChannel = createBindableChannel("output", createProducerBindingProperties);
        BindingProperties createConsumerBindingProperties = createConsumerBindingProperties(createConsumerProperties());
        DirectChannel createBindableChannel2 = createBindableChannel("input", createConsumerBindingProperties);
        Binding bindProducer = binder.bindProducer(String.format("bad%s0e", getDestinationNameDelimiter()), createBindableChannel, createProducerBindingProperties.getProducer());
        Binding bindConsumer = binder.bindConsumer(String.format("bad%s0e", getDestinationNameDelimiter()), "test-5", createBindableChannel2, createConsumerBindingProperties.getConsumer());
        Message build = MessageBuilder.withPayload("{\"readings\":[{\"stationid\":\"fgh\",\"customerid\":\"12345\",\"timestamp\":null},{\"stationid\":\"hjk\",\"customerid\":\"222\",\"timestamp\":null}]}").setHeader("contentType", MimeTypeUtils.APPLICATION_JSON).build();
        createBindableChannel2.subscribe(buildStreamListener);
        createBindableChannel.send(build);
        Message receive = buildStreamListener.getOutputChannel().receive(5000L);
        Assertions.assertThat(receive).isNotNull();
        Assertions.assertThat(receive.getPayload() instanceof String).isTrue();
        bindProducer.unbind();
        bindConsumer.unbind();
    }

    @Test
    public void testSendPojoReceivePojoWithStreamListener() throws Exception {
        StreamListenerMessageHandler buildStreamListener = buildStreamListener(AbstractBinderTests.class, "echoStation", Station.class);
        B binder = getBinder();
        BindingProperties createProducerBindingProperties = createProducerBindingProperties(createProducerProperties());
        DirectChannel createBindableChannel = createBindableChannel("output", createProducerBindingProperties);
        BindingProperties createConsumerBindingProperties = createConsumerBindingProperties(createConsumerProperties());
        DirectChannel createBindableChannel2 = createBindableChannel("input", createConsumerBindingProperties);
        Binding bindProducer = binder.bindProducer(String.format("bad%s0f", getDestinationNameDelimiter()), createBindableChannel, createProducerBindingProperties.getProducer());
        Binding bindConsumer = binder.bindConsumer(String.format("bad%s0f", getDestinationNameDelimiter()), "test-6", createBindableChannel2, createConsumerBindingProperties.getConsumer());
        Station.Readings readings = new Station.Readings();
        readings.setCustomerid("123");
        readings.setStationid("XYZ");
        Station.Readings readings2 = new Station.Readings();
        readings2.setCustomerid("546");
        readings2.setStationid("ABC");
        Station station = new Station();
        station.setReadings(Arrays.asList(readings, readings2));
        Message build = MessageBuilder.withPayload(station).setHeader("contentType", MimeTypeUtils.APPLICATION_JSON).build();
        createBindableChannel2.subscribe(buildStreamListener);
        createBindableChannel.send(build);
        Message receive = buildStreamListener.getOutputChannel().receive(5000L);
        Assertions.assertThat(receive).isNotNull();
        Assertions.assertThat(receive.getPayload() instanceof Station).isTrue();
        bindProducer.unbind();
        bindConsumer.unbind();
    }

    private Station echoStation(Station station) {
        return station;
    }

    private String echoStationString(String str) {
        return str;
    }

    private StreamListenerMessageHandler buildStreamListener(Class<?> cls, String str, Class<?>... clsArr) throws Exception {
        String str2 = "reply_" + System.nanoTime();
        GenericApplicationContext genericApplicationContext = new GenericApplicationContext();
        genericApplicationContext.getBeanFactory().registerSingleton(str2, new QueueChannel());
        InvocableHandlerMethod invocableHandlerMethod = new InvocableHandlerMethod(this, ReflectionUtils.findMethod(cls, str, clsArr));
        HandlerMethodArgumentResolverComposite handlerMethodArgumentResolverComposite = new HandlerMethodArgumentResolverComposite();
        handlerMethodArgumentResolverComposite.addResolver(new PayloadArgumentResolver(new CompositeMessageConverterFactory().getMessageConverterForAllRegistered()));
        invocableHandlerMethod.setMessageMethodArgumentResolvers(handlerMethodArgumentResolverComposite);
        StreamListenerMessageHandler streamListenerMessageHandler = (StreamListenerMessageHandler) ReflectionUtils.accessibleConstructor(StreamListenerMessageHandler.class, new Class[]{InvocableHandlerMethod.class, Boolean.TYPE, String[].class}).newInstance(invocableHandlerMethod, false, new String[0]);
        streamListenerMessageHandler.setOutputChannelName(str2);
        streamListenerMessageHandler.setBeanFactory(genericApplicationContext);
        streamListenerMessageHandler.afterPropertiesSet();
        genericApplicationContext.refresh();
        return streamListenerMessageHandler;
    }
}
