package org.apache.activemq.artemis.tests.integration.divert;

import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/divert/PersistentDivertTest.class */
public class PersistentDivertTest extends ActiveMQTestBase {
    final int minLargeMessageSize = 204800;

    @Test
    public void testPersistentDivert() throws Exception {
        doTestPersistentDivert(false);
    }

    @Test
    public void testPersistentDiverLargeMessage() throws Exception {
        doTestPersistentDivert(true);
    }

    public void doTestPersistentDivert(boolean z) throws Exception {
        DivertConfiguration forwardingAddress = new DivertConfiguration().setName("divert1").setRoutingName("divert1").setAddress("testAddress").setForwardingAddress("forwardAddress1");
        addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().addDivertConfiguration(forwardingAddress).addDivertConfiguration(new DivertConfiguration().setName("divert2").setRoutingName("divert2").setAddress("testAddress").setForwardingAddress("forwardAddress2")).addDivertConfiguration(new DivertConfiguration().setName("divert3").setRoutingName("divert3").setAddress("testAddress").setForwardingAddress("forwardAddress3")))).start();
        ClientSession createSession = createSessionFactory(createInVMNonHALocator().setBlockOnAcknowledge(true).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true)).createSession(true, true, 0);
        SimpleString simpleString = new SimpleString("queue1");
        SimpleString simpleString2 = new SimpleString("queue2");
        SimpleString simpleString3 = new SimpleString("queue3");
        SimpleString simpleString4 = new SimpleString("queue4");
        createSession.createQueue(new QueueConfiguration(simpleString).setAddress("forwardAddress1"));
        createSession.createQueue(new QueueConfiguration(simpleString2).setAddress("forwardAddress2"));
        createSession.createQueue(new QueueConfiguration(simpleString3).setAddress("forwardAddress3"));
        createSession.createQueue(new QueueConfiguration(simpleString4).setAddress("testAddress"));
        createSession.start();
        ClientProducer createProducer = createSession.createProducer(new SimpleString("testAddress"));
        ClientConsumer createConsumer = createSession.createConsumer(simpleString);
        ClientConsumer createConsumer2 = createSession.createConsumer(simpleString2);
        ClientConsumer createConsumer3 = createSession.createConsumer(simpleString3);
        ClientConsumer createConsumer4 = createSession.createConsumer(simpleString4);
        SimpleString simpleString5 = new SimpleString("testkey");
        for (int i = 0; i < 10; i++) {
            ClientMessage createMessage = createSession.createMessage(true);
            if (z) {
                createMessage.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(204800L));
            }
            createMessage.putIntProperty(simpleString5, i);
            createProducer.send(createMessage);
        }
        for (int i2 = 0; i2 < 10; i2++) {
            ClientMessage receive = createConsumer.receive(5000L);
            Assert.assertNotNull(receive);
            Assert.assertEquals(Integer.valueOf(i2), receive.getObjectProperty(simpleString5));
            if (z) {
                checkLargeMessage(receive);
            }
            receive.acknowledge();
        }
        Assert.assertNull(createConsumer.receiveImmediate());
        for (int i3 = 0; i3 < 10; i3++) {
            ClientMessage receive2 = createConsumer2.receive(5000L);
            Assert.assertNotNull(receive2);
            Assert.assertEquals(Integer.valueOf(i3), receive2.getObjectProperty(simpleString5));
            if (z) {
                checkLargeMessage(receive2);
            }
            receive2.acknowledge();
        }
        Assert.assertNull(createConsumer2.receiveImmediate());
        for (int i4 = 0; i4 < 10; i4++) {
            ClientMessage receive3 = createConsumer3.receive(5000L);
            Assert.assertNotNull(receive3);
            Assert.assertEquals(Integer.valueOf(i4), receive3.getObjectProperty(simpleString5));
            if (z) {
                checkLargeMessage(receive3);
            }
            receive3.acknowledge();
        }
        Assert.assertNull(createConsumer3.receiveImmediate());
        for (int i5 = 0; i5 < 10; i5++) {
            ClientMessage receive4 = createConsumer4.receive(5000L);
            Assert.assertNotNull(receive4);
            Assert.assertEquals(Integer.valueOf(i5), receive4.getObjectProperty(simpleString5));
            if (z) {
                checkLargeMessage(receive4);
            }
            receive4.acknowledge();
        }
        Assert.assertNull(createConsumer4.receiveImmediate());
    }

    private void checkLargeMessage(ClientMessage clientMessage) {
        for (int i = 0; i < 204800; i++) {
            Assert.assertEquals(ActiveMQTestBase.getSamplebyte(i), clientMessage.getBodyBuffer().readByte());
        }
    }

    @Test
    public void testPersistentDivertRestartBeforeConsume() throws Exception {
        doTestPersistentDivertRestartBeforeConsume(false);
    }

    @Test
    public void testPersistentDivertRestartBeforeConsumeLargeMessage() throws Exception {
        doTestPersistentDivertRestartBeforeConsume(true);
    }

    public void doTestPersistentDivertRestartBeforeConsume(boolean z) throws Exception {
        DivertConfiguration forwardingAddress = new DivertConfiguration().setName("divert1").setRoutingName("divert1").setAddress("testAddress").setForwardingAddress("forwardAddress1");
        ActiveMQServer addServer = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().addDivertConfiguration(forwardingAddress).addDivertConfiguration(new DivertConfiguration().setName("divert2").setRoutingName("divert2").setAddress("testAddress").setForwardingAddress("forwardAddress2")).addDivertConfiguration(new DivertConfiguration().setName("divert3").setRoutingName("divert3").setAddress("testAddress").setForwardingAddress("forwardAddress3"))));
        addServer.start();
        ClientSessionFactory createSessionFactory = createSessionFactory(createInVMNonHALocator().setBlockOnAcknowledge(true).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true));
        ClientSession createSession = createSessionFactory.createSession(true, true, 0);
        SimpleString simpleString = new SimpleString("queue1");
        SimpleString simpleString2 = new SimpleString("queue2");
        SimpleString simpleString3 = new SimpleString("queue3");
        SimpleString simpleString4 = new SimpleString("queue4");
        createSession.createQueue(new QueueConfiguration(simpleString).setAddress("forwardAddress1"));
        createSession.createQueue(new QueueConfiguration(simpleString2).setAddress("forwardAddress2"));
        createSession.createQueue(new QueueConfiguration(simpleString3).setAddress("forwardAddress3"));
        createSession.createQueue(new QueueConfiguration(simpleString4).setAddress("testAddress"));
        ClientProducer createProducer = createSession.createProducer(new SimpleString("testAddress"));
        SimpleString simpleString5 = new SimpleString("testkey");
        for (int i = 0; i < 10; i++) {
            ClientMessage createMessage = createSession.createMessage(true);
            createMessage.putIntProperty(simpleString5, i);
            if (z) {
                createMessage.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(204800L));
            }
            createProducer.send(createMessage);
        }
        createSession.close();
        createSessionFactory.close();
        addServer.stop();
        waitForServerToStop(addServer);
        addServer.start();
        ClientSessionFactory createSessionFactory2 = createSessionFactory(createInVMNonHALocator().setBlockOnDurableSend(true));
        ClientSession createSession2 = createSessionFactory2.createSession(false, true, true);
        createSession2.start();
        ClientConsumer createConsumer = createSession2.createConsumer(simpleString);
        ClientConsumer createConsumer2 = createSession2.createConsumer(simpleString2);
        ClientConsumer createConsumer3 = createSession2.createConsumer(simpleString3);
        ClientConsumer createConsumer4 = createSession2.createConsumer(simpleString4);
        for (int i2 = 0; i2 < 10; i2++) {
            ClientMessage receive = createConsumer.receive(5000L);
            Assert.assertNotNull(receive);
            if (z) {
                checkLargeMessage(receive);
            }
            Assert.assertEquals(Integer.valueOf(i2), receive.getObjectProperty(simpleString5));
            receive.acknowledge();
        }
        Assert.assertNull(createConsumer.receiveImmediate());
        for (int i3 = 0; i3 < 10; i3++) {
            ClientMessage receive2 = createConsumer2.receive(5000L);
            Assert.assertNotNull(receive2);
            if (z) {
                checkLargeMessage(receive2);
            }
            Assert.assertEquals(Integer.valueOf(i3), receive2.getObjectProperty(simpleString5));
            receive2.acknowledge();
        }
        Assert.assertNull(createConsumer2.receiveImmediate());
        for (int i4 = 0; i4 < 10; i4++) {
            ClientMessage receive3 = createConsumer3.receive(5000L);
            Assert.assertNotNull(receive3);
            if (z) {
                checkLargeMessage(receive3);
            }
            Assert.assertEquals(Integer.valueOf(i4), receive3.getObjectProperty(simpleString5));
            receive3.acknowledge();
        }
        Assert.assertNull(createConsumer3.receiveImmediate());
        for (int i5 = 0; i5 < 10; i5++) {
            ClientMessage receive4 = createConsumer4.receive(5000L);
            Assert.assertNotNull(receive4);
            if (z) {
                checkLargeMessage(receive4);
            }
            Assert.assertEquals(Integer.valueOf(i5), receive4.getObjectProperty(simpleString5));
            receive4.acknowledge();
        }
        Assert.assertNull(createConsumer4.receiveImmediate());
        createSession2.close();
        createSessionFactory2.close();
        addServer.stop();
        waitForServerToStop(addServer);
        addServer.start();
        ClientSession createSession3 = createSessionFactory(createInVMNonHALocator().setBlockOnDurableSend(true)).createSession(false, true, true);
        ClientConsumer createConsumer5 = createSession3.createConsumer(simpleString);
        ClientConsumer createConsumer6 = createSession3.createConsumer(simpleString2);
        ClientConsumer createConsumer7 = createSession3.createConsumer(simpleString3);
        ClientConsumer createConsumer8 = createSession3.createConsumer(simpleString4);
        Assert.assertNull(createConsumer5.receiveImmediate());
        Assert.assertNull(createConsumer6.receiveImmediate());
        Assert.assertNull(createConsumer7.receiveImmediate());
        Assert.assertNull(createConsumer8.receiveImmediate());
    }
}
