package org.apache.activemq.artemis.tests.integration.cluster.crossprotocol;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSException;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.RetryRule;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.class */
public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {

    @Rule
    public RetryRule retryRule = new RetryRule(2);
    private static final int NUMBER_OF_SERVERS = 2;
    private static final SimpleString queueName = SimpleString.toSimpleString("queues.0");
    private static final int NUMBER_OF_MESSAGES = 154;

    @Parameterized.Parameter(0)
    public String protocol;

    @Parameterized.Parameters(name = "protocol={0}")
    public static Collection getParameters() {
        return Arrays.asList(new Object[]{"AMQP"}, new Object[]{"CORE"}, new Object[]{"OPENWIRE"});
    }

    @Override // org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase
    @Before
    public void setUp() throws Exception {
        super.setUp();
    }

    private void startServers(MessageLoadBalancingType messageLoadBalancingType) throws Exception {
        setupServers();
        setRedistributionDelay(0L);
        setupCluster(messageLoadBalancingType);
        AddressSettings expiryAddress = new AddressSettings().setRedistributionDelay(0L).setExpiryAddress(SimpleString.toSimpleString("queues.expiry"));
        getServer(0).getAddressSettingsRepository().addMatch("queues.*", expiryAddress);
        getServer(1).getAddressSettingsRepository().addMatch("queues.*", expiryAddress);
        startServers(0);
        startServers(1);
        createQueue(SimpleString.toSimpleString("queues.expiry"));
        createQueue(queueName);
    }

    private void createQueue(SimpleString simpleString) throws Exception {
        QueueConfiguration routingType = new QueueConfiguration(simpleString).setRoutingType(RoutingType.ANYCAST);
        this.servers[0].createQueue(routingType);
        this.servers[1].createQueue(routingType);
    }

    protected boolean isNetty() {
        return true;
    }

    private ConnectionFactory getJmsConnectionFactory(int i) {
        if (this.protocol.equals("AMQP")) {
            return new JmsConnectionFactory("amqp://localhost:" + (61616 + i));
        }
        if (this.protocol.equals("OPENWIRE")) {
            return new ActiveMQConnectionFactory("tcp://localhost:" + (61616 + i));
        }
        if (this.protocol.equals("CORE")) {
            return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:" + (61616 + i));
        }
        Assert.fail("Protocol " + this.protocol + " unkown");
        return null;
    }

    private void pauseClusteringBridges(ActiveMQServer activeMQServer) throws Exception {
        Iterator it = activeMQServer.getClusterManager().getClusterConnections().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((ClusterConnection) it.next()).getRecords().values().iterator();
            while (it2.hasNext()) {
                ((MessageFlowRecord) it2.next()).getBridge().pause();
            }
        }
    }

    @Test
    public void testLoadBalancing() throws Exception {
        startServers(MessageLoadBalancingType.STRICT);
        ConnectionFactory[] connectionFactoryArr = new ConnectionFactory[2];
        Connection[] connectionArr = new Connection[2];
        Session[] sessionArr = new Session[2];
        MessageConsumer[] messageConsumerArr = new MessageConsumer[2];
        for (int i = 0; i < 2; i++) {
            connectionFactoryArr[i] = getJmsConnectionFactory(i);
            connectionArr[i] = connectionFactoryArr[i].createConnection();
            sessionArr[i] = connectionArr[i].createSession(false, 1);
            messageConsumerArr[i] = sessionArr[i].createConsumer(sessionArr[i].createQueue(queueName.toString()));
        }
        waitForBindings(0, "queues.0", 1, 1, true);
        waitForBindings(1, "queues.0", 1, 1, true);
        waitForBindings(0, "queues.0", 1, 1, false);
        waitForBindings(1, "queues.0", 1, 1, false);
        pauseClusteringBridges(this.servers[0]);
        Connection createConnection = getJmsConnectionFactory(0).createConnection();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(queueName.toString()));
        for (int i2 = 0; i2 < NUMBER_OF_MESSAGES; i2++) {
            createProducer.send(createSession.createTextMessage("hello " + i2));
        }
        createConnection.close();
        receiveMessages(connectionArr[0], messageConsumerArr[0], 77, true);
        connectionArr[1].start();
        Assert.assertNull(messageConsumerArr[1].receiveNoWait());
        connectionArr[1].stop();
        this.servers[0].stop();
        clearServer(0);
        setupServer(0, isFileStorage(), isNetty());
        this.servers[0].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
        setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.STRICT, 1, isNetty(), 0, 1);
        this.servers[0].start();
        receiveMessages(connectionArr[1], messageConsumerArr[1], 77, true);
        for (int i3 = 0; i3 < 2; i3++) {
            connectionArr[i3].close();
        }
    }

    @Test
    public void testRedistributionStoppedWithNoRemoteConsumers() throws Exception {
        startServers(MessageLoadBalancingType.ON_DEMAND);
        getJmsConnectionFactory(1);
        waitForBindings(0, "queues.0", 1, 0, true);
        waitForBindings(1, "queues.0", 1, 0, true);
        waitForBindings(0, "queues.0", 1, 0, false);
        waitForBindings(1, "queues.0", 1, 0, false);
        ConnectionFactory jmsConnectionFactory = getJmsConnectionFactory(0);
        ConnectionFactory jmsConnectionFactory2 = getJmsConnectionFactory(1);
        Connection createConnection = jmsConnectionFactory.createConnection();
        Session createSession = createConnection.createSession(false, 1);
        MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(queueName.toString()));
        createConnection.start();
        Connection createConnection2 = jmsConnectionFactory2.createConnection();
        Session createSession2 = createConnection2.createSession(false, 1);
        MessageConsumer createConsumer2 = createSession2.createConsumer(createSession.createQueue(queueName.toString()));
        createConnection2.start();
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(queueName.toString()));
        waitForBindings(0, "queues.0", 1, 1, false);
        waitForBindings(1, "queues.0", 1, 1, false);
        createConsumer.close();
        for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
            createProducer.send(createSession.createTextMessage("hello " + i));
        }
        for (int i2 = 0; i2 < NUMBER_OF_MESSAGES; i2++) {
            assertNotNull(createConsumer2.receive(1000L));
        }
        createConsumer2.close();
        createSession2.close();
        createConnection2.close();
        for (int i3 = 0; i3 < NUMBER_OF_MESSAGES; i3++) {
            createProducer.send(createSession.createTextMessage("hello " + i3));
        }
        createConnection.close();
        assertEquals(154L, this.servers[0].locateQueue(queueName).getMessageCount());
        assertEquals(0L, this.servers[1].locateQueue(queueName).getMessageCount());
    }

    @Test
    public void testExpireRedistributed() throws Exception {
        startServers(MessageLoadBalancingType.ON_DEMAND);
        ConnectionFactory jmsConnectionFactory = getJmsConnectionFactory(1);
        waitForBindings(0, "queues.0", 1, 0, true);
        waitForBindings(1, "queues.0", 1, 0, true);
        waitForBindings(0, "queues.0", 1, 0, false);
        waitForBindings(1, "queues.0", 1, 0, false);
        Connection createConnection = getJmsConnectionFactory(0).createConnection();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(queueName.toString()));
        createProducer.setTimeToLive(200L);
        for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
            createProducer.send(createSession.createTextMessage("hello " + i));
        }
        createConnection.close();
        Thread.sleep(200L);
        Connection createConnection2 = jmsConnectionFactory.createConnection();
        Session createSession2 = createConnection2.createSession(false, 1);
        receiveMessages(createConnection2, createSession2.createConsumer(createSession2.createQueue("queues.expiry")), NUMBER_OF_MESSAGES, true);
        createConnection2.close();
    }

    @Test
    public void testRestartConnection() throws Exception {
        startServers(MessageLoadBalancingType.STRICT);
        this.instanceLog.debug("connections " + this.servers[1].getRemotingService().getConnections().size());
        Wait.assertEquals(3, () -> {
            return this.servers[1].getRemotingService().getConnections().size();
        });
        Wait.assertEquals(3, () -> {
            return this.servers[0].getRemotingService().getConnections().size();
        });
        ConnectionFactory[] connectionFactoryArr = new ConnectionFactory[2];
        Connection[] connectionArr = new Connection[2];
        Session[] sessionArr = new Session[2];
        MessageConsumer[] messageConsumerArr = new MessageConsumer[2];
        for (int i = 0; i < 2; i++) {
            connectionFactoryArr[i] = getJmsConnectionFactory(i);
            connectionArr[i] = connectionFactoryArr[i].createConnection();
            sessionArr[i] = connectionArr[i].createSession(false, 1);
            messageConsumerArr[i] = sessionArr[i].createConsumer(sessionArr[i].createQueue(queueName.toString()));
        }
        waitForBindings(0, "queues.0", 1, 1, true);
        waitForBindings(1, "queues.0", 1, 1, true);
        waitForBindings(0, "queues.0", 1, 1, false);
        waitForBindings(1, "queues.0", 1, 1, false);
        Iterator it = this.servers[1].getRemotingService().getConnections().iterator();
        while (it.hasNext()) {
            ((RemotingConnection) it.next()).fail(new ActiveMQException("forcing failure"));
        }
        Iterator it2 = this.servers[1].getRemotingService().getConnections().iterator();
        while (it2.hasNext()) {
            ((RemotingConnection) it2.next()).fail(new ActiveMQException("forcing failure"));
        }
        Thread.sleep(500L);
        for (int i2 = 0; i2 < 2; i2++) {
            try {
                connectionArr[i2].close();
            } catch (Throwable th) {
                th.printStackTrace();
            }
            connectionFactoryArr[i2] = getJmsConnectionFactory(i2);
            connectionArr[i2] = connectionFactoryArr[i2].createConnection();
            sessionArr[i2] = connectionArr[i2].createSession(false, 1);
            messageConsumerArr[i2] = sessionArr[i2].createConsumer(sessionArr[i2].createQueue(queueName.toString()));
        }
        waitForBindings(0, "queues.0", 1, 1, true);
        waitForBindings(1, "queues.0", 1, 1, true);
        waitForBindings(0, "queues.0", 1, 1, false);
        waitForBindings(1, "queues.0", 1, 1, false);
        this.instanceLog.debug("connections " + this.servers[1].getRemotingService().getConnections().size());
        Connection createConnection = getJmsConnectionFactory(0).createConnection();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(queueName.toString()));
        for (int i3 = 0; i3 < NUMBER_OF_MESSAGES; i3++) {
            createProducer.send(createSession.createTextMessage("hello " + i3));
        }
        createConnection.close();
        receiveMessages(connectionArr[0], messageConsumerArr[0], 77, true);
        receiveMessages(connectionArr[1], messageConsumerArr[1], 77, true);
        for (int i4 = 0; i4 < 2; i4++) {
            connectionArr[i4].close();
        }
    }

    @Test
    public void testRedistributeAfterLoadBalanced() throws Exception {
        startServers(MessageLoadBalancingType.ON_DEMAND);
        ConnectionFactory[] connectionFactoryArr = new ConnectionFactory[2];
        Connection[] connectionArr = new Connection[2];
        Session[] sessionArr = new Session[2];
        MessageConsumer[] messageConsumerArr = new MessageConsumer[2];
        for (int i = 0; i < 2; i++) {
            connectionFactoryArr[i] = getJmsConnectionFactory(i);
            connectionArr[i] = connectionFactoryArr[i].createConnection();
            sessionArr[i] = connectionArr[i].createSession(false, 1);
            messageConsumerArr[i] = sessionArr[i].createConsumer(sessionArr[i].createQueue(queueName.toString()));
        }
        waitForBindings(0, "queues.0", 1, 1, true);
        waitForBindings(1, "queues.0", 1, 1, true);
        waitForBindings(0, "queues.0", 1, 1, false);
        waitForBindings(1, "queues.0", 1, 1, false);
        if (this.protocol.equals("AMQP")) {
            ServerLocator createServerLocator = ActiveMQClient.createServerLocator("tcp://localhost:61616");
            createServerLocator.setMinLargeMessageSize(1024);
            ClientSessionFactory createSessionFactory = createServerLocator.createSessionFactory();
            ClientSession createSession = createSessionFactory.createSession();
            ClientProducer createProducer = createSession.createProducer(queueName);
            for (int i2 = 0; i2 < NUMBER_OF_MESSAGES; i2++) {
                ClientMessage createMessage = createSession.createMessage((byte) 0, true);
                StringBuffer stringBuffer = new StringBuffer();
                stringBuffer.append("hello");
                if (i2 % 3 == 0) {
                    for (int i3 = 0; i3 < 10240; i3++) {
                        stringBuffer.append(" ");
                    }
                }
                createMessage.getBodyBuffer().writeUTF(stringBuffer.toString());
                createProducer.send(createMessage);
            }
            createSessionFactory.close();
        } else {
            Connection createConnection = getJmsConnectionFactory(0).createConnection();
            Session createSession2 = createConnection.createSession(false, 1);
            MessageProducer createProducer2 = createSession2.createProducer(createSession2.createQueue(queueName.toString()));
            for (int i4 = 0; i4 < NUMBER_OF_MESSAGES; i4++) {
                StringBuffer stringBuffer2 = new StringBuffer();
                stringBuffer2.append("hello");
                if (i4 % 3 == 0) {
                    for (int i5 = 0; i5 < 307200; i5++) {
                        stringBuffer2.append(" ");
                    }
                }
                createProducer2.send(createSession2.createTextMessage(stringBuffer2.toString()));
            }
            createConnection.close();
        }
        Queue locateQueue = this.servers[1].locateQueue(queueName);
        locateQueue.getClass();
        Wait.assertEquals(77L, locateQueue::getMessageCount);
        Queue locateQueue2 = this.servers[0].locateQueue(queueName);
        locateQueue2.getClass();
        Wait.assertEquals(77L, locateQueue2::getMessageCount);
        receiveMessages(connectionArr[0], messageConsumerArr[0], 77, true);
        connectionArr[1].close();
        receiveMessages(connectionArr[0], messageConsumerArr[0], 77, true);
        connectionArr[0].close();
    }

    private void receiveMessages(Connection connection, MessageConsumer messageConsumer, int i, boolean z) throws JMSException {
        connection.start();
        for (int i2 = 0; i2 < i; i2++) {
            Assert.assertNotNull(messageConsumer.receive(5000L));
        }
        if (z) {
            Assert.assertNull(messageConsumer.receiveNoWait());
        }
    }

    protected void setupCluster(MessageLoadBalancingType messageLoadBalancingType) throws Exception {
        setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 1);
        setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, 0);
    }

    protected void setRedistributionDelay(long j) {
    }

    protected void setupServers() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
        this.servers[0].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
        this.servers[1].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
        this.servers[0].addProtocolManagerFactory(new OpenWireProtocolManagerFactory());
        this.servers[1].addProtocolManagerFactory(new OpenWireProtocolManagerFactory());
    }

    protected void stopServers() throws Exception {
        closeAllConsumers();
        closeAllSessionFactories();
        closeAllServerLocatorsFactories();
        stopServers(0, 1);
        clearServer(0, 1);
    }

    protected ConfigurationImpl createBasicConfig(int i) {
        ConfigurationImpl createBasicConfig = super.createBasicConfig(i);
        createBasicConfig.setMessageExpiryScanPeriod(100L);
        return createBasicConfig;
    }
}
