package org.springframework.cloud.stream.binder;

import java.util.Arrays;
import java.util.UUID;
import org.hamcrest.CustomMatcher;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.cloud.stream.binder.AbstractTestBinder;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;

/* loaded from: input_file:org/springframework/cloud/stream/binder/PartitionCapableBinderTests.class */
public abstract class PartitionCapableBinderTests<B extends AbstractTestBinder<? extends AbstractBinder<MessageChannel, CP, PP>, CP, PP>, CP extends ConsumerProperties, PP extends ProducerProperties> extends BrokerBinderTests<B, CP, PP> {
    protected static final SpelExpressionParser spelExpressionParser = new SpelExpressionParser();

    @Test
    public void testAnonymousGroup() throws Exception {
        B binder = getBinder();
        DirectChannel directChannel = new DirectChannel();
        Binding<MessageChannel> bindProducer = binder.bindProducer("defaultGroup.0", directChannel, createProducerProperties());
        QueueChannel queueChannel = new QueueChannel();
        Binding<MessageChannel> bindConsumer = binder.bindConsumer("defaultGroup.0", null, queueChannel, createConsumerProperties());
        QueueChannel queueChannel2 = new QueueChannel();
        Binding<MessageChannel> bindConsumer2 = binder.bindConsumer("defaultGroup.0", null, queueChannel2, createConsumerProperties());
        String str = "foo-" + UUID.randomUUID().toString();
        directChannel.send(new GenericMessage(str.getBytes()));
        Message<?> receive = receive(queueChannel);
        Assert.assertThat(receive, Matchers.not(Matchers.nullValue()));
        Assert.assertThat(new String((byte[]) receive.getPayload()), Matchers.equalTo(str));
        Message<?> receive2 = receive(queueChannel2);
        Assert.assertThat(receive2, Matchers.not(Matchers.nullValue()));
        Assert.assertThat(new String((byte[]) receive2.getPayload()), Matchers.equalTo(str));
        bindConsumer2.unbind();
        String str2 = "foo-" + UUID.randomUUID().toString();
        directChannel.send(new GenericMessage(str2.getBytes()));
        Binding<MessageChannel> bindConsumer3 = binder.bindConsumer("defaultGroup.0", null, queueChannel2, createConsumerProperties());
        String str3 = "foo-" + UUID.randomUUID().toString();
        directChannel.send(new GenericMessage(str3.getBytes()));
        Message<?> receive3 = receive(queueChannel);
        Assert.assertThat(receive3, Matchers.not(Matchers.nullValue()));
        Assert.assertThat(new String((byte[]) receive3.getPayload()), Matchers.equalTo(str2));
        Message<?> receive4 = receive(queueChannel);
        Assert.assertThat(receive4, Matchers.not(Matchers.nullValue()));
        Assert.assertThat(new String((byte[]) receive4.getPayload()), Matchers.equalTo(str3));
        Message<?> receive5 = receive(queueChannel2);
        Assert.assertThat(receive5, Matchers.not(Matchers.nullValue()));
        Assert.assertThat(new String((byte[]) receive5.getPayload()), Matchers.equalTo(str3));
        bindProducer.unbind();
        bindConsumer.unbind();
        bindConsumer3.unbind();
    }

    @Test
    public void testOneRequiredGroup() throws Exception {
        B binder = getBinder();
        DirectChannel directChannel = new DirectChannel();
        PP createProducerProperties = createProducerProperties();
        String str = "testDestination" + UUID.randomUUID().toString().replace("-", "");
        createProducerProperties.setRequiredGroups(new String[]{"test1"});
        Binding<MessageChannel> bindProducer = binder.bindProducer(str, directChannel, createProducerProperties);
        String str2 = "foo-" + UUID.randomUUID().toString();
        directChannel.send(new GenericMessage(str2.getBytes()));
        QueueChannel queueChannel = new QueueChannel();
        Binding<MessageChannel> bindConsumer = binder.bindConsumer(str, "test1", queueChannel, createConsumerProperties());
        Message<?> receive = receive(queueChannel);
        Assert.assertThat(receive, Matchers.not(Matchers.nullValue()));
        Assert.assertThat(new String((byte[]) receive.getPayload()), Matchers.equalTo(str2));
        bindProducer.unbind();
        bindConsumer.unbind();
    }

    @Test
    public void testTwoRequiredGroups() throws Exception {
        B binder = getBinder();
        DirectChannel directChannel = new DirectChannel();
        String str = "testDestination" + UUID.randomUUID().toString().replace("-", "");
        PP createProducerProperties = createProducerProperties();
        createProducerProperties.setRequiredGroups(new String[]{"test1", "test2"});
        Binding<MessageChannel> bindProducer = binder.bindProducer(str, directChannel, createProducerProperties);
        String str2 = "foo-" + UUID.randomUUID().toString();
        directChannel.send(new GenericMessage(str2.getBytes()));
        QueueChannel queueChannel = new QueueChannel();
        Binding<MessageChannel> bindConsumer = binder.bindConsumer(str, "test1", queueChannel, createConsumerProperties());
        QueueChannel queueChannel2 = new QueueChannel();
        Binding<MessageChannel> bindConsumer2 = binder.bindConsumer(str, "test2", queueChannel2, createConsumerProperties());
        Message<?> receive = receive(queueChannel);
        Assert.assertThat(receive, Matchers.not(Matchers.nullValue()));
        Assert.assertThat(new String((byte[]) receive.getPayload()), Matchers.equalTo(str2));
        Message<?> receive2 = receive(queueChannel2);
        Assert.assertThat(receive2, Matchers.not(Matchers.nullValue()));
        Assert.assertThat(new String((byte[]) receive2.getPayload()), Matchers.equalTo(str2));
        bindConsumer.unbind();
        bindConsumer2.unbind();
        bindProducer.unbind();
    }

    @Test
    public void testPartitionedModuleSpEL() throws Exception {
        B binder = getBinder();
        CP createConsumerProperties = createConsumerProperties();
        createConsumerProperties.setConcurrency(2);
        createConsumerProperties.setInstanceIndex(0);
        createConsumerProperties.setInstanceCount(3);
        createConsumerProperties.setPartitioned(true);
        QueueChannel queueChannel = new QueueChannel();
        queueChannel.setBeanName("test.input0S");
        Binding<MessageChannel> bindConsumer = binder.bindConsumer("part.0", "test", queueChannel, createConsumerProperties);
        createConsumerProperties.setInstanceIndex(1);
        QueueChannel queueChannel2 = new QueueChannel();
        queueChannel2.setBeanName("test.input1S");
        Binding<MessageChannel> bindConsumer2 = binder.bindConsumer("part.0", "test", queueChannel2, createConsumerProperties);
        createConsumerProperties.setInstanceIndex(2);
        QueueChannel queueChannel3 = new QueueChannel();
        queueChannel3.setBeanName("test.input2S");
        Binding<MessageChannel> bindConsumer3 = binder.bindConsumer("part.0", "test", queueChannel3, createConsumerProperties);
        PP createProducerProperties = createProducerProperties();
        createProducerProperties.setPartitionKeyExpression(spelExpressionParser.parseExpression("payload"));
        createProducerProperties.setPartitionSelectorExpression(spelExpressionParser.parseExpression("hashCode()"));
        createProducerProperties.setPartitionCount(3);
        DirectChannel directChannel = new DirectChannel();
        directChannel.setBeanName("test.output");
        Binding<MessageChannel> bindProducer = binder.bindProducer("part.0", directChannel, createProducerProperties);
        try {
            Assert.assertThat(getEndpointRouting(extractEndpoint(bindProducer)), Matchers.containsString(getExpectedRoutingBaseDestination("part.0", "test") + "-' + headers['partition']"));
        } catch (UnsupportedOperationException e) {
        }
        directChannel.send(MessageBuilder.withPayload(2).setHeader("correlationId", "foo").setHeader("sequenceNumber", 42).setHeader("sequenceSize", 43).build());
        directChannel.send(new GenericMessage(1));
        directChannel.send(new GenericMessage(0));
        Message<?> receive = receive(queueChannel);
        Assert.assertNotNull(receive);
        Message<?> receive2 = receive(queueChannel2);
        Assert.assertNotNull(receive2);
        Message<?> receive3 = receive(queueChannel3);
        Assert.assertNotNull(receive3);
        Matcher matcher = new CustomMatcher<Message<?>>("the message with 'foo' as its correlationId") { // from class: org.springframework.cloud.stream.binder.PartitionCapableBinderTests.1
            public boolean matches(Object obj) {
                IntegrationMessageHeaderAccessor integrationMessageHeaderAccessor = new IntegrationMessageHeaderAccessor((Message) obj);
                return "foo".equals(integrationMessageHeaderAccessor.getCorrelationId()) && 42 == integrationMessageHeaderAccessor.getSequenceNumber().intValue() && 43 == integrationMessageHeaderAccessor.getSequenceSize().intValue();
            }
        };
        if (usesExplicitRouting()) {
            Assert.assertEquals(0, receive.getPayload());
            Assert.assertEquals(1, receive2.getPayload());
            Assert.assertEquals(2, receive3.getPayload());
            Assert.assertThat(receive3, matcher);
        } else {
            Assert.assertThat(Arrays.asList((Integer) receive.getPayload(), (Integer) receive2.getPayload(), (Integer) receive3.getPayload()), Matchers.containsInAnyOrder(new Integer[]{0, 1, 2}));
            Assert.assertThat(Arrays.asList(receive, receive2, receive3), Matchers.containsInAnyOrder(new Matcher[]{matcher, Matchers.hasProperty("payload", Matchers.equalTo(0)), Matchers.hasProperty("payload", Matchers.equalTo(1))}));
        }
        bindConsumer.unbind();
        bindConsumer2.unbind();
        bindConsumer3.unbind();
        bindProducer.unbind();
    }

    @Test
    public void testPartitionedModuleJava() throws Exception {
        B binder = getBinder();
        CP createConsumerProperties = createConsumerProperties();
        createConsumerProperties.setConcurrency(2);
        createConsumerProperties.setInstanceCount(3);
        createConsumerProperties.setInstanceIndex(0);
        createConsumerProperties.setPartitioned(true);
        QueueChannel queueChannel = new QueueChannel();
        queueChannel.setBeanName("test.input0J");
        Binding<MessageChannel> bindConsumer = binder.bindConsumer("partJ.0", "test", queueChannel, createConsumerProperties);
        createConsumerProperties.setInstanceIndex(1);
        QueueChannel queueChannel2 = new QueueChannel();
        queueChannel2.setBeanName("test.input1J");
        Binding<MessageChannel> bindConsumer2 = binder.bindConsumer("partJ.0", "test", queueChannel2, createConsumerProperties);
        createConsumerProperties.setInstanceIndex(2);
        QueueChannel queueChannel3 = new QueueChannel();
        queueChannel3.setBeanName("test.input2J");
        Binding<MessageChannel> bindConsumer3 = binder.bindConsumer("partJ.0", "test", queueChannel3, createConsumerProperties);
        PP createProducerProperties = createProducerProperties();
        createProducerProperties.setPartitionKeyExtractorClass(PartitionTestSupport.class);
        createProducerProperties.setPartitionSelectorClass(PartitionTestSupport.class);
        createProducerProperties.setPartitionCount(3);
        DirectChannel directChannel = new DirectChannel();
        directChannel.setBeanName("test.output");
        Binding<MessageChannel> bindProducer = binder.bindProducer("partJ.0", directChannel, createProducerProperties);
        if (usesExplicitRouting()) {
            Assert.assertThat(getEndpointRouting(extractEndpoint(bindProducer)), Matchers.containsString(getExpectedRoutingBaseDestination("partJ.0", "test") + "-' + headers['partition']"));
        }
        directChannel.send(new GenericMessage(2));
        directChannel.send(new GenericMessage(1));
        directChannel.send(new GenericMessage(0));
        Message<?> receive = receive(queueChannel);
        Assert.assertNotNull(receive);
        Message<?> receive2 = receive(queueChannel2);
        Assert.assertNotNull(receive2);
        Message<?> receive3 = receive(queueChannel3);
        Assert.assertNotNull(receive3);
        if (usesExplicitRouting()) {
            Assert.assertEquals(0, receive.getPayload());
            Assert.assertEquals(1, receive2.getPayload());
            Assert.assertEquals(2, receive3.getPayload());
        } else {
            Assert.assertThat(Arrays.asList((Integer) receive.getPayload(), (Integer) receive2.getPayload(), (Integer) receive3.getPayload()), Matchers.containsInAnyOrder(new Integer[]{0, 1, 2}));
        }
        bindConsumer.unbind();
        bindConsumer2.unbind();
        bindConsumer3.unbind();
        bindProducer.unbind();
    }

    protected abstract boolean usesExplicitRouting();

    protected String getEndpointRouting(AbstractEndpoint abstractEndpoint) {
        throw new UnsupportedOperationException();
    }

    protected String getExpectedRoutingBaseDestination(String str, String str2) {
        throw new UnsupportedOperationException();
    }

    protected String getPubSubEndpointRouting(AbstractEndpoint abstractEndpoint) {
        throw new UnsupportedOperationException();
    }

    protected abstract String getClassUnderTestName();

    protected AbstractEndpoint extractEndpoint(Binding<MessageChannel> binding) {
        return (AbstractEndpoint) new DirectFieldAccessor(binding).getPropertyValue("endpoint");
    }
}
