package org.apache.activemq.artemis.tests.integration.server;

import java.lang.reflect.Field;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.collections.IterableStream;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.class */
public class ScaleDownTest extends ClusterTestBase {
    private static final String AMQP_ACCEPTOR_URI = "tcp://127.0.0.1:5672";
    private boolean useScaleDownGroupName;

    @Parameterized.Parameters(name = "useScaleDownGroupName={0}")
    public static Collection getParameters() {
        return Arrays.asList(new Object[]{true}, new Object[]{false});
    }

    public ScaleDownTest(boolean z) {
        this.useScaleDownGroupName = z;
    }

    @Override // org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase
    @Before
    public void setUp() throws Exception {
        super.setUp();
        setupLiveServer(0, isFileStorage(), isNetty(), true);
        setupLiveServer(1, isFileStorage(), isNetty(), true);
        this.servers[0].getConfiguration().addAcceptorConfiguration("amqp", "tcp://127.0.0.1:5672?protocols=AMQP");
        LiveOnlyPolicyConfiguration hAPolicyConfiguration = this.servers[0].getConfiguration().getHAPolicyConfiguration();
        hAPolicyConfiguration.setScaleDownConfiguration(new ScaleDownConfiguration());
        LiveOnlyPolicyConfiguration hAPolicyConfiguration2 = this.servers[1].getConfiguration().getHAPolicyConfiguration();
        hAPolicyConfiguration2.setScaleDownConfiguration(new ScaleDownConfiguration());
        if (this.useScaleDownGroupName) {
            hAPolicyConfiguration.getScaleDownConfiguration().setGroupName("bill");
            hAPolicyConfiguration2.getScaleDownConfiguration().setGroupName("bill");
        }
        setupClusterConnection("cluster0", "testAddress", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
        setupClusterConnection("cluster0", "testAddress", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
        hAPolicyConfiguration.getScaleDownConfiguration().getConnectors().addAll(((ClusterConnectionConfiguration) this.servers[0].getConfiguration().getClusterConfigurations().iterator().next()).getStaticConnectors());
        hAPolicyConfiguration2.getScaleDownConfiguration().getConnectors().addAll(((ClusterConnectionConfiguration) this.servers[1].getConfiguration().getClusterConfigurations().iterator().next()).getStaticConnectors());
        this.servers[0].getConfiguration().getAddressesSettings().put("#", new AddressSettings().setRedistributionDelay(0L));
        this.servers[1].getConfiguration().getAddressesSettings().put("#", new AddressSettings().setRedistributionDelay(0L));
        startServers(0, 1);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
    }

    protected boolean isNetty() {
        return true;
    }

    @Override // org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase
    protected boolean isResolveProtocols() {
        return true;
    }

    @Test
    public void testBasicScaleDown() throws Exception {
        createQueue(0, "testAddress", "testQueue1", null, true);
        createQueue(0, "testAddress", "testQueue2", null, true);
        createQueue(1, "testAddress", "testQueue1", null, true);
        createQueue(1, "testAddress", "testQueue2", null, true);
        send(0, "testAddress", 2, true, null);
        addConsumer(1, 0, "testQueue2", null, false);
        ClientMessage receive = this.consumers[1].getConsumer().receive(1000L);
        Assert.assertNotNull(receive);
        receive.acknowledge();
        this.consumers[1].getSession().commit();
        Assert.assertEquals(2L, getMessageCount(this.servers[0].getPostOffice().getBinding(new SimpleString("testQueue1")).getQueue()));
        Assert.assertEquals(1L, getMessageCount(this.servers[0].getPostOffice().getBinding(new SimpleString("testQueue2")).getQueue()));
        this.servers[0].stop();
        addConsumer(0, 1, "testQueue1", null);
        ClientMessage receive2 = this.consumers[0].getConsumer().receive(1000L);
        Assert.assertNotNull(receive2);
        receive2.acknowledge();
        ClientMessage receive3 = this.consumers[0].getConsumer().receive(1000L);
        Assert.assertNotNull(receive3);
        receive3.acknowledge();
        Assert.assertNull(this.consumers[0].getConsumer().receiveImmediate());
        removeConsumer(0);
        addConsumer(0, 1, "testQueue2", null);
        ClientMessage receive4 = this.consumers[0].getConsumer().receive(1000L);
        Assert.assertNotNull(receive4);
        receive4.acknowledge();
        Assert.assertNull(this.consumers[0].getConsumer().receiveImmediate());
        removeConsumer(0);
    }

    @Test
    public void testScaleDownNodeReconnect() throws Exception {
        try {
            Iterator it = this.servers[0].getClusterManager().getClusterController().getLocators().entrySet().iterator();
            assertTrue(it.hasNext());
            ServerLocatorImpl serverLocatorImpl = (ServerLocatorImpl) ((Map.Entry) it.next()).getValue();
            waitForClusterConnected(serverLocatorImpl);
            this.servers[1].stop();
            this.servers[1].start();
            checkClusterConnectionExecutorNotBlocking(serverLocatorImpl);
            this.servers[1].stop();
            this.servers[0].stop();
        } catch (Throwable th) {
            this.servers[1].stop();
            this.servers[0].stop();
            throw th;
        }
    }

    private void checkClusterConnectionExecutorNotBlocking(ServerLocatorImpl serverLocatorImpl) throws NoSuchFieldException, IllegalAccessException {
        Field declaredField = serverLocatorImpl.getClass().getDeclaredField("factories");
        declaredField.setAccessible(true);
        Set set = (Set) declaredField.get(serverLocatorImpl);
        assertEquals(1L, set.size());
        ClientSessionFactoryImpl clientSessionFactoryImpl = (ClientSessionFactoryImpl) set.iterator().next();
        Field declaredField2 = clientSessionFactoryImpl.getClass().getDeclaredField("closeExecutor");
        declaredField2.setAccessible(true);
        Executor executor = (Executor) declaredField2.get(clientSessionFactoryImpl);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        executor.execute(() -> {
            countDownLatch.countDown();
        });
        boolean z = false;
        try {
            z = countDownLatch.await(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
        }
        assertTrue("executor got blocked.", z);
    }

    private void waitForClusterConnected(final ServerLocatorImpl serverLocatorImpl) throws Exception {
        assertTrue("topology should not be empty", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.server.ScaleDownTest.1
            public boolean isSatisfied() throws Exception {
                return !serverLocatorImpl.getTopology().isEmpty();
            }
        }, 5000L));
    }

    @Test
    public void testStoreAndForward() throws Exception {
        createQueue(0, "testAddress1", "testQueue1", null, false);
        createQueue(1, "testAddress1", "testQueue1", null, false);
        createQueue(0, "testAddress2", "testQueue2", null, false);
        createQueue(1, "testAddress2", "testQueue2", null, false);
        String str = null;
        for (LocalQueueBinding localQueueBinding : IterableStream.iterableOf(this.servers[0].getPostOffice().getAllBindings())) {
            String simpleString = localQueueBinding.getAddress().toString();
            if (simpleString.startsWith(this.servers[1].getInternalNamingPrefix() + "sf.") && simpleString.endsWith(this.servers[1].getNodeID().toString())) {
                localQueueBinding.getQueue().pause();
                str = simpleString;
            }
        }
        assertNotNull(str);
        send(0, "testAddress1", 50, false, null);
        send(0, "testAddress2", 50, false, null);
        addConsumer(0, 1, "testQueue1", null);
        addConsumer(1, 1, "testQueue2", null);
        LocalQueueBinding binding = this.servers[0].getPostOffice().getBinding(new SimpleString("testQueue1"));
        LocalQueueBinding binding2 = this.servers[0].getPostOffice().getBinding(new SimpleString("testQueue2"));
        LocalQueueBinding binding3 = this.servers[0].getPostOffice().getBinding(new SimpleString(str));
        long currentTimeMillis = System.currentTimeMillis();
        while (getMessageCount(binding.getQueue()) > 0 && System.currentTimeMillis() - currentTimeMillis <= 5000) {
            Thread.sleep(50L);
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (getMessageCount(binding2.getQueue()) > 0 && System.currentTimeMillis() - currentTimeMillis2 <= 5000) {
            Thread.sleep(50L);
        }
        long currentTimeMillis3 = System.currentTimeMillis();
        while (getMessageCount(binding3.getQueue()) < 100 && System.currentTimeMillis() - currentTimeMillis3 <= 5000) {
            Thread.sleep(50L);
        }
        Assert.assertEquals(0L, getMessageCount(binding.getQueue()));
        Assert.assertEquals(0L, getMessageCount(binding2.getQueue()));
        Assert.assertEquals(100L, getMessageCount(binding3.getQueue()));
        removeConsumer(0);
        removeConsumer(1);
        this.servers[0].stop();
        addConsumer(0, 1, "testQueue1", null);
        for (int i = 0; i < 50; i++) {
            ClientMessage receive = this.consumers[0].getConsumer().receive(250L);
            Assert.assertNotNull(receive);
            receive.acknowledge();
        }
        Assert.assertNull(this.consumers[0].getConsumer().receiveImmediate());
        removeConsumer(0);
        addConsumer(0, 1, "testQueue2", null);
        for (int i2 = 0; i2 < 50; i2++) {
            ClientMessage receive2 = this.consumers[0].getConsumer().receive(250L);
            Assert.assertNotNull(receive2);
            receive2.acknowledge();
        }
        Assert.assertNull(this.consumers[0].getConsumer().receiveImmediate());
        removeConsumer(0);
    }

    @Test
    public void testScaleDownWithMissingQueue() throws Exception {
        createQueue(0, "testAddress", "testQueue1", null, false);
        createQueue(0, "testAddress", "testQueue2", null, false);
        createQueue(1, "testAddress", "testQueue1", null, false);
        send(0, "testAddress", 2, false, null);
        addConsumer(1, 0, "testQueue2", null, false);
        ClientMessage receive = this.consumers[1].getConsumer().receive(250L);
        Assert.assertNotNull(receive);
        receive.acknowledge();
        this.consumers[1].getSession().commit();
        this.servers[0].stop();
        addConsumer(0, 1, "testQueue1", null);
        ClientMessage receive2 = this.consumers[0].getConsumer().receive(250L);
        Assert.assertNotNull(receive2);
        receive2.acknowledge();
        ClientMessage receive3 = this.consumers[0].getConsumer().receive(250L);
        Assert.assertNotNull(receive3);
        receive3.acknowledge();
        Assert.assertNull(this.consumers[0].getConsumer().receiveImmediate());
        removeConsumer(0);
        addConsumer(0, 1, "testQueue2", null);
        ClientMessage receive4 = this.consumers[0].getConsumer().receive(250L);
        Assert.assertNotNull(receive4);
        receive4.acknowledge();
        Assert.assertNull(this.consumers[0].getConsumer().receiveImmediate());
        removeConsumer(0);
    }

    @Test
    public void testScaleDownWithMissingAnycastQueue() throws Exception {
        createQueue(0, "testAddress", "testQueue", null, false, null, null, RoutingType.ANYCAST);
        send(0, "testAddress", 2, false, null);
        this.servers[0].stop();
        Assert.assertEquals(this.servers[1].getPostOffice().getBinding(new SimpleString("testQueue")).getBindable().getRoutingType(), RoutingType.ANYCAST);
        addConsumer(0, 1, "testQueue", null);
        ClientMessage receive = this.consumers[0].getConsumer().receive(250L);
        Assert.assertNotNull(receive);
        receive.acknowledge();
    }

    private void sendAMQPMessages(String str, int i, boolean z) throws Exception {
        AmqpConnection connect = new AmqpClient(new URI(AMQP_ACCEPTOR_URI), "admin", "admin").connect();
        try {
            AmqpSender createSender = connect.createSession().createSender(str);
            for (int i2 = 0; i2 < i; i2++) {
                AmqpMessage amqpMessage = new AmqpMessage();
                amqpMessage.setMessageId("MessageID:" + i2);
                amqpMessage.setDurable(z);
                createSender.send(amqpMessage);
            }
        } finally {
            connect.close();
        }
    }

    @Test
    public void testScaleDownAMQPMessagesWithMissingAnycastQueue() throws Exception {
        createQueue(0, "testAddress", "testQueue", null, false, null, null, RoutingType.ANYCAST);
        sendAMQPMessages("testAddress", 2, false);
        this.servers[0].stop();
        Assert.assertEquals(this.servers[1].getPostOffice().getBinding(new SimpleString("testQueue")).getBindable().getRoutingType(), RoutingType.ANYCAST);
        addConsumer(0, 1, "testQueue", null);
        ClientMessage receive = this.consumers[0].getConsumer().receive(250L);
        Assert.assertNotNull(receive);
        receive.acknowledge();
    }

    @Test
    public void testScaleDownAMQPMessagesWithMissingMulticastQueues() throws Exception {
        createQueue(0, "testAddress", "testQueue1", null, false, null, null, RoutingType.MULTICAST);
        createQueue(0, "testAddress", "testQueue2", null, false, null, null, RoutingType.MULTICAST);
        sendAMQPMessages("testAddress", 2, false);
        Assert.assertEquals(this.servers[0].getPostOffice().getBinding(new SimpleString("testQueue1")).getBindable().getMessageCount(), 2L);
        Assert.assertEquals(this.servers[0].getPostOffice().getBinding(new SimpleString("testQueue2")).getBindable().getMessageCount(), 2L);
        this.servers[0].stop();
        Assert.assertEquals(this.servers[1].getPostOffice().getBinding(new SimpleString("testQueue1")).getBindable().getRoutingType(), RoutingType.MULTICAST);
        Assert.assertEquals(this.servers[1].getPostOffice().getBinding(new SimpleString("testQueue2")).getBindable().getRoutingType(), RoutingType.MULTICAST);
        Assert.assertEquals(this.servers[1].getPostOffice().getBinding(new SimpleString("testQueue1")).getBindable().getMessageCount(), 2L);
        Assert.assertEquals(this.servers[1].getPostOffice().getBinding(new SimpleString("testQueue2")).getBindable().getMessageCount(), 2L);
        addConsumer(0, 1, "testQueue1", null);
        ClientMessage receive = this.consumers[0].getConsumer().receive(250L);
        Assert.assertNotNull(receive);
        receive.acknowledge();
        addConsumer(1, 1, "testQueue2", null);
        ClientMessage receive2 = this.consumers[1].getConsumer().receive(250L);
        Assert.assertNotNull(receive2);
        receive2.acknowledge();
    }

    @Test
    public void testMessageProperties() throws Exception {
        createQueue(0, "testAddress", "testQueue", null, false);
        createQueue(1, "testAddress", "testQueue", null, false);
        ClientSession addClientSession = addClientSession(this.sfs[0].createSession(false, true, true));
        ClientProducer addClientProducer = addClientProducer(addClientSession.createProducer("testAddress"));
        StringBuilder sb = new StringBuilder();
        char c = 800;
        while (true) {
            char c2 = c;
            if (c2 >= 1200) {
                break;
            }
            sb.append(c2);
            c = (char) (c2 + 1);
        }
        for (int i = 0; i < 5; i++) {
            ClientMessage createMessage = addClientSession.createMessage(true);
            createMessage.getBodyBuffer().writeString("Bob the giant pig " + i);
            createMessage.putBooleanProperty("myBooleanProperty", Boolean.TRUE.booleanValue());
            createMessage.putByteProperty("myByteProperty", Byte.valueOf("0").byteValue());
            createMessage.putBytesProperty("myBytesProperty", new byte[]{0, 1, 2, 3, 4});
            createMessage.putDoubleProperty("myDoubleProperty", i * 1.6d);
            createMessage.putFloatProperty("myFloatProperty", i * 2.5f);
            createMessage.putIntProperty("myIntProperty", i);
            createMessage.putLongProperty("myLongProperty", Long.MAX_VALUE - i);
            createMessage.putObjectProperty("myObjectProperty", Integer.valueOf(i));
            createMessage.putShortProperty("myShortProperty", Integer.valueOf(i).shortValue());
            createMessage.putStringProperty("myStringProperty", "myStringPropertyValue_" + i);
            createMessage.putStringProperty("myNonAsciiStringProperty", sb.toString());
            createMessage.putStringProperty("mySpecialCharacters", "\"<>'&");
            addClientProducer.send(createMessage);
        }
        this.servers[0].stop();
        ClientSession addClientSession2 = addClientSession(this.sfs[1].createSession(false, true, true));
        ClientConsumer addClientConsumer = addClientConsumer(addClientSession2.createConsumer("testQueue"));
        addClientSession2.start();
        for (int i2 = 0; i2 < 5; i2++) {
            ClientMessage receive = addClientConsumer.receive(250L);
            byte[] bArr = new byte[receive.getBodySize()];
            receive.getBodyBuffer().readBytes(bArr);
            Assert.assertTrue(new String(bArr).contains("Bob the giant pig " + i2));
            Assert.assertEquals(receive.getBooleanProperty("myBooleanProperty"), Boolean.TRUE);
            Assert.assertEquals(receive.getByteProperty("myByteProperty"), Byte.valueOf("0"));
            byte[] bytesProperty = receive.getBytesProperty("myBytesProperty");
            for (int i3 = 0; i3 < 5; i3++) {
                Assert.assertEquals(i3, bytesProperty[i3]);
            }
            Assert.assertEquals(i2 * 1.6d, receive.getDoubleProperty("myDoubleProperty").doubleValue(), 1.0E-6d);
            Assert.assertEquals(i2 * 2.5f, receive.getFloatProperty("myFloatProperty").floatValue(), 1.0E-6d);
            Assert.assertEquals(i2, receive.getIntProperty("myIntProperty").intValue());
            Assert.assertEquals(Long.MAX_VALUE - i2, receive.getLongProperty("myLongProperty").longValue());
            Assert.assertEquals(Integer.valueOf(i2), receive.getObjectProperty("myObjectProperty"));
            Assert.assertEquals(Integer.valueOf(i2).shortValue(), receive.getShortProperty("myShortProperty").shortValue());
            Assert.assertEquals("myStringPropertyValue_" + i2, receive.getStringProperty("myStringProperty"));
            Assert.assertEquals(sb.toString(), receive.getStringProperty("myNonAsciiStringProperty"));
            Assert.assertEquals("\"<>'&", receive.getStringProperty("mySpecialCharacters"));
        }
    }

    @Test
    public void testLargeMessage() throws Exception {
        createQueue(0, "testAddress", "testQueue", null, true);
        createQueue(1, "testAddress", "testQueue", null, true);
        ClientSession addClientSession = addClientSession(this.sfs[0].createSession(false, false));
        ClientProducer addClientProducer = addClientProducer(addClientSession.createProducer("testAddress"));
        byte[] bArr = new byte[204800];
        for (int i = 0; i < bArr.length; i++) {
            bArr[i] = getSamplebyte(i);
        }
        for (int i2 = 0; i2 < 10; i2++) {
            ClientMessage createMessage = addClientSession.createMessage(true);
            createMessage.getBodyBuffer().writeBytes(bArr);
            addClientProducer.send(createMessage);
            addClientSession.commit();
        }
        this.servers[0].stop();
        ClientSession addClientSession2 = addClientSession(this.sfs[1].createSession(false, true, true));
        ClientConsumer addClientConsumer = addClientConsumer(addClientSession2.createConsumer("testQueue"));
        addClientSession2.start();
        for (int i3 = 0; i3 < 10; i3++) {
            ClientMessage receive = addClientConsumer.receive(250L);
            Assert.assertNotNull(receive);
            Assert.assertEquals(204800L, receive.getBodySize());
            for (int i4 = 0; i4 < 204800; i4++) {
                Assert.assertEquals(receive + " Is different", ActiveMQTestBase.getSamplebyte(i4), receive.getBodyBuffer().readByte());
            }
            receive.acknowledge();
            addClientSession2.commit();
        }
    }

    @Test
    public void testPaging() throws Exception {
        int i = 0;
        createQueue(0, "testAddress", "testQueue", null, false);
        createQueue(1, "testAddress", "testQueue", null, false);
        ClientSession addClientSession = addClientSession(this.sfs[0].createSession(false, false));
        ClientProducer addClientProducer = addClientProducer(addClientSession.createProducer("testAddress"));
        this.servers[0].getAddressSettingsRepository().addMatch("#", new AddressSettings().setPageSizeBytes(10240).setMaxSizeBytes(20480L));
        while (!this.servers[0].getPagingManager().getPageStore(new SimpleString("testAddress")).isPaging()) {
            for (int i2 = 0; i2 < 50; i2++) {
                ClientMessage createMessage = addClientSession.createMessage(true);
                createMessage.getBodyBuffer().writeBytes(new byte[1024]);
                addClientProducer.send(createMessage);
                i++;
            }
            addClientSession.commit();
        }
        this.servers[0].stop();
        addConsumer(0, 1, "testQueue", null);
        for (int i3 = 0; i3 < i; i3++) {
            Assert.assertNotNull(this.consumers[0].getConsumer().receive(250L));
        }
        Assert.assertNull(this.consumers[0].getConsumer().receiveImmediate());
        removeConsumer(0);
    }

    @Test
    public void testOrderWithPaging() throws Exception {
        int i = 0;
        createQueue(0, "testAddress", "testQueue", null, false);
        createQueue(1, "testAddress", "testQueue", null, false);
        ClientSession addClientSession = addClientSession(this.sfs[0].createSession(false, false));
        ClientProducer addClientProducer = addClientProducer(addClientSession.createProducer("testAddress"));
        this.servers[0].getAddressSettingsRepository().addMatch("#", new AddressSettings().setPageSizeBytes(10240).setMaxSizeBytes(20480L));
        while (!this.servers[0].getPagingManager().getPageStore(new SimpleString("testAddress")).isPaging()) {
            for (int i2 = 0; i2 < 50; i2++) {
                ClientMessage createMessage = addClientSession.createMessage(true);
                createMessage.getBodyBuffer().writeBytes(new byte[1024]);
                createMessage.putIntProperty("order", i2);
                addClientProducer.send(createMessage);
                i++;
            }
            addClientSession.commit();
        }
        this.servers[0].stop();
        addConsumer(0, 1, "testQueue", null);
        for (int i3 = 0; i3 < i; i3++) {
            Assert.assertEquals(i3, this.consumers[0].getConsumer().receive(250L).getIntProperty("order").intValue());
        }
        Assert.assertNull(this.consumers[0].getConsumer().receiveImmediate());
        removeConsumer(0);
    }

    @Test
    public void testFilters() throws Exception {
        ClientMessage receive;
        String str;
        createQueue(0, "testAddress", "evenQueue", "0", false);
        createQueue(0, "testAddress", "oddQueue", "1", false);
        createQueue(1, "testAddress", "evenQueue", "0", false);
        createQueue(1, "testAddress", "oddQueue", "1", false);
        ClientSession addClientSession = addClientSession(this.sfs[0].createSession(false, false));
        ClientProducer addClientProducer = addClientProducer(addClientSession.createProducer("testAddress"));
        for (int i = 0; i < 50; i++) {
            ClientMessage createMessage = addClientSession.createMessage(false);
            if (i % 2 == 0) {
                createMessage.putStringProperty(ClusterTestBase.FILTER_PROP, new SimpleString("0"));
            } else {
                createMessage.putStringProperty(ClusterTestBase.FILTER_PROP, new SimpleString("1"));
            }
            addClientProducer.send(createMessage);
        }
        addClientSession.commit();
        this.servers[0].stop();
        addConsumer(0, 1, "evenQueue", null);
        addConsumer(1, 1, "oddQueue", null);
        for (int i2 = 0; i2 < 50; i2++) {
            if (i2 % 2 == 0) {
                receive = this.consumers[0].getConsumer().receive(250L);
                str = "0";
            } else {
                receive = this.consumers[1].getConsumer().receive(250L);
                str = "1";
            }
            Assert.assertEquals(str, receive.getStringProperty(ClusterTestBase.FILTER_PROP));
        }
        Assert.assertNull(this.consumers[0].getConsumer().receiveImmediate());
        Assert.assertNull(this.consumers[1].getConsumer().receiveImmediate());
        removeConsumer(0);
        removeConsumer(1);
    }

    @Test
    public void testScaleDownMessageWithAutoCreatedDLAResources() throws Exception {
        SimpleString simpleString = new SimpleString("DLA");
        SimpleString simpleString2 = new SimpleString("q1");
        SimpleString simpleString3 = new SimpleString("q1");
        AddressSettings addressSettings = (AddressSettings) this.servers[0].getAddressSettingsRepository().getMatch(simpleString3.toString());
        this.servers[0].getAddressSettingsRepository().addMatch(simpleString3.toString(), new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(simpleString).setAutoCreateDeadLetterResources(true));
        SimpleString concat = addressSettings.getDeadLetterQueuePrefix().concat(simpleString3).concat(addressSettings.getDeadLetterQueueSuffix());
        createQueue(0, simpleString3.toString(), simpleString2.toString(), null, false, null, null, RoutingType.ANYCAST);
        ClientSession addClientSession = addClientSession(this.sfs[0].createSession(true, false));
        ClientProducer addClientProducer = addClientProducer(addClientSession.createProducer(simpleString3));
        ClientMessage createTextMessage = createTextMessage(addClientSession, "Put me on DLA");
        createTextMessage.putByteProperty(Message.HDR_ROUTING_TYPE, (byte) 1);
        addClientProducer.send(createTextMessage);
        addClientSession.start();
        ClientConsumer createConsumer = addClientSession.createConsumer(simpleString2);
        ClientMessage receive = createConsumer.receive(1000L);
        Assert.assertNotNull(receive);
        Assert.assertEquals(receive.getBodyBuffer().readString(), "Put me on DLA");
        assertTrue(receive.getRoutingType() == RoutingType.ANYCAST);
        receive.acknowledge();
        addClientSession.rollback();
        Assert.assertNull(createConsumer.receiveImmediate());
        createConsumer.close();
        ClientMessage receive2 = addClientSession.createConsumer(concat.toString()).receive(1000L);
        Assert.assertNotNull(receive2);
        assertTrue(receive2.getRoutingType() == null);
        this.servers[0].stop();
        ClientSession addClientSession2 = addClientSession(this.sfs[1].createSession(false, true, true));
        ClientConsumer createConsumer2 = addClientSession2.createConsumer(concat.toString());
        addClientSession2.start();
        ClientMessage receive3 = createConsumer2.receive(1000L);
        Assert.assertNotNull(receive3);
        receive3.acknowledge();
        Assert.assertEquals("Put me on DLA", receive3.getBodyBuffer().readString());
        createConsumer2.close();
    }
}
