package org.apache.activemq.artemis.tests.integration.amqp.connect;

import jakarta.jms.Connection;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.io.PrintStream;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.cli.commands.tools.PrintData;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.integration.client.AutoCreateJmsDestinationTest;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.StringPrintStream;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.class */
public class BrokerInSyncTest extends AmqpClientTestSupport {
    public static final int TIME_BEFORE_RESTART = 1000;
    protected static final int AMQP_PORT_2 = 5673;
    protected static final int AMQP_PORT_3 = 5674;
    private static final Logger logger = Logger.getLogger(BrokerInSyncTest.class);
    ActiveMQServer server_2;

    @Before
    public void startLogging() {
        AssertionLoggerHandler.startCapture();
    }

    @After
    public void stopLogging() {
        try {
            Assert.assertFalse(AssertionLoggerHandler.findText(new String[]{"AMQ222214"}));
        } finally {
            AssertionLoggerHandler.stopCapture();
        }
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport
    protected ActiveMQServer createServer() throws Exception {
        return createServer(5672, false);
    }

    @Test
    public void testSyncOnCreateQueues() throws Exception {
        this.server.setIdentity("Server1");
        AMQPBrokerConnectConfiguration retryInterval = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:5673").setReconnectAttempts(3).setRetryInterval(100);
        retryInterval.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
        this.server.getConfiguration().addAMQPConnection(retryInterval);
        this.server.start();
        this.server_2 = createServer(AMQP_PORT_2, false);
        this.server_2.setIdentity("Server2");
        AMQPBrokerConnectConfiguration retryInterval2 = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:5672").setReconnectAttempts(-1).setRetryInterval(100);
        retryInterval2.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
        this.server_2.getConfiguration().addAMQPConnection(retryInterval2);
        this.server_2.start();
        this.server_2.addAddressInfo(new AddressInfo("sometest").setAutoCreated(false));
        this.server_2.createQueue(new QueueConfiguration("sometest").setDurable(true));
        Wait.assertTrue(() -> {
            return this.server_2.locateQueue("sometest") != null;
        });
        Wait.assertTrue(() -> {
            return this.server.locateQueue("sometest") != null;
        });
        this.server.addAddressInfo(new AddressInfo("OnServer1").setAutoCreated(false));
        this.server.createQueue(new QueueConfiguration("OnServer1").setDurable(true));
        Wait.assertTrue(() -> {
            return this.server.locateQueue("OnServer1") != null;
        });
        Wait.assertTrue("Sync is not working on the way back", () -> {
            return this.server_2.locateQueue("OnServer1") != null;
        }, 2000L);
        Wait.assertTrue(() -> {
            return this.server_2.locateQueue("sometest") != null;
        });
        Wait.assertTrue(() -> {
            return this.server.locateQueue("sometest") != null;
        });
        for (int i = 0; i < 10; i++) {
            int i2 = i;
            this.server_2.createQueue(new QueueConfiguration("test2_" + i).setDurable(true));
            this.server.createQueue(new QueueConfiguration("test1_" + i).setDurable(true));
            Wait.assertTrue(() -> {
                return this.server.locateQueue(new StringBuilder().append("test2_").append(i2).toString()) != null;
            });
            Wait.assertTrue(() -> {
                return this.server.locateQueue(new StringBuilder().append("test1_").append(i2).toString()) != null;
            });
            Wait.assertTrue(() -> {
                return this.server_2.locateQueue(new StringBuilder().append("test2_").append(i2).toString()) != null;
            });
            Wait.assertTrue(() -> {
                return this.server_2.locateQueue(new StringBuilder().append("test1_").append(i2).toString()) != null;
            });
        }
        this.server_2.stop();
        this.server.stop();
    }

    @Test
    public void testSingleMessage() throws Exception {
        this.server.setIdentity("Server1");
        AMQPBrokerConnectConfiguration retryInterval = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:5673").setReconnectAttempts(3).setRetryInterval(100);
        retryInterval.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
        this.server.getConfiguration().addAMQPConnection(retryInterval);
        this.server.start();
        this.server_2 = createServer(AMQP_PORT_2, false);
        this.server_2.setIdentity("Server2");
        AMQPBrokerConnectConfiguration retryInterval2 = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:5672").setReconnectAttempts(-1).setRetryInterval(100);
        retryInterval2.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
        this.server_2.getConfiguration().addAMQPConnection(retryInterval2);
        this.server_2.start();
        this.server_2.addAddressInfo(new AddressInfo(getQueueName()).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
        this.server_2.createQueue(new QueueConfiguration(getQueueName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
        Wait.assertTrue(() -> {
            return this.server_2.locateQueue(getQueueName()) != null;
        });
        Wait.assertTrue(() -> {
            return this.server.locateQueue(getQueueName()) != null;
        });
        Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
        Session createSession = createConnection.createSession(true, 0);
        createConnection.start();
        Connection createConnection2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5673").createConnection();
        Session createSession2 = createConnection2.createSession(true, 0);
        createConnection2.start();
        Queue createQueue = createSession.createQueue(getQueueName());
        MessageProducer createProducer = createSession.createProducer(createQueue);
        MessageProducer createProducer2 = createSession2.createProducer(createQueue);
        TextMessage createTextMessage = createSession.createTextMessage(AutoCreateJmsDestinationTest.QUEUE_NAME);
        createTextMessage.setIntProperty("i", 0);
        createTextMessage.setStringProperty("server", this.server.getIdentity());
        createProducer.send(createTextMessage);
        createSession.commit();
        org.apache.activemq.artemis.core.server.Queue locateQueue = this.server.locateQueue(getQueueName());
        org.apache.activemq.artemis.core.server.Queue locateQueue2 = this.server_2.locateQueue(getQueueName());
        Assert.assertNotNull(locateQueue);
        Assert.assertNotNull(locateQueue2);
        locateQueue.getClass();
        Wait.assertEquals(1L, locateQueue::getMessageCount);
        locateQueue2.getClass();
        Wait.assertEquals(1L, locateQueue2::getMessageCount);
        TextMessage createTextMessage2 = createSession.createTextMessage(AutoCreateJmsDestinationTest.QUEUE_NAME);
        createTextMessage2.setIntProperty("i", 1);
        createTextMessage2.setStringProperty("server", this.server_2.getIdentity());
        createProducer2.send(createTextMessage2);
        createSession2.commit();
        if (logger.isDebugEnabled() && !Wait.waitFor(() -> {
            return locateQueue.getMessageCount() == 2;
        })) {
            debugData();
        }
        locateQueue.getClass();
        Wait.assertEquals(2L, locateQueue::getMessageCount);
        locateQueue2.getClass();
        Wait.assertEquals(2L, locateQueue2::getMessageCount);
        createConnection.close();
        createConnection2.close();
        this.server_2.stop();
        this.server.stop();
    }

    @Test
    public void testSyncData() throws Exception {
        int i = 100;
        this.server.setIdentity("Server1");
        AMQPBrokerConnectConfiguration retryInterval = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:5673").setReconnectAttempts(3).setRetryInterval(100);
        retryInterval.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
        this.server.getConfiguration().addAMQPConnection(retryInterval);
        this.server.start();
        this.server_2 = createServer(AMQP_PORT_2, false);
        this.server_2.setIdentity("Server2");
        AMQPBrokerConnectConfiguration retryInterval2 = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:5672").setReconnectAttempts(-1).setRetryInterval(100);
        retryInterval2.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
        this.server_2.getConfiguration().addAMQPConnection(retryInterval2);
        this.server_2.start();
        this.server_2.addAddressInfo(new AddressInfo(getQueueName()).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
        this.server_2.createQueue(new QueueConfiguration(getQueueName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
        Wait.assertTrue(() -> {
            return this.server_2.locateQueue(getQueueName()) != null;
        });
        Wait.assertTrue(() -> {
            return this.server.locateQueue(getQueueName()) != null;
        });
        Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
        Session createSession = createConnection.createSession(true, 0);
        createConnection.start();
        Connection createConnection2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5673").createConnection();
        Session createSession2 = createConnection2.createSession(true, 0);
        createConnection2.start();
        Queue createQueue = createSession.createQueue(getQueueName());
        MessageProducer createProducer = createSession.createProducer(createQueue);
        MessageProducer createProducer2 = createSession2.createProducer(createQueue);
        for (int i2 = 0; i2 < 100; i2++) {
            TextMessage createTextMessage = createSession.createTextMessage("test " + i2);
            createTextMessage.setIntProperty("i", i2);
            createTextMessage.setStringProperty("server", this.server.getIdentity());
            createProducer.send(createTextMessage);
        }
        createSession.commit();
        org.apache.activemq.artemis.core.server.Queue locateQueue = this.server.locateQueue(getQueueName());
        org.apache.activemq.artemis.core.server.Queue locateQueue2 = this.server_2.locateQueue(getQueueName());
        Assert.assertNotNull(locateQueue);
        Assert.assertNotNull(locateQueue2);
        locateQueue.getClass();
        Wait.assertEquals(100, locateQueue::getMessageCount);
        locateQueue2.getClass();
        Wait.assertEquals(100, locateQueue2::getMessageCount);
        for (int i3 = 100; i3 < 100 * 2; i3++) {
            TextMessage createTextMessage2 = createSession.createTextMessage("test " + i3);
            createTextMessage2.setIntProperty("i", i3);
            createTextMessage2.setStringProperty("server", this.server_2.getIdentity());
            createProducer2.send(createTextMessage2);
        }
        createSession2.commit();
        if (logger.isDebugEnabled() && !Wait.waitFor(() -> {
            return locateQueue.getMessageCount() == ((long) (i * 2));
        })) {
            debugData();
        }
        locateQueue.getClass();
        Wait.assertEquals(100 * 2, locateQueue::getMessageCount);
        locateQueue2.getClass();
        Wait.assertEquals(100 * 2, locateQueue2::getMessageCount);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        for (int i4 = 0; i4 < 100 * 2; i4++) {
            TextMessage receive = createConsumer.receive(5000L);
            logger.debug("### Client acking message(" + i4 + ") on server 1, a message that was original sent on " + receive.getStringProperty("server") + " text = " + receive.getText());
            Assert.assertNotNull(receive);
            Assert.assertEquals(i4, receive.getIntProperty("i"));
            Assert.assertEquals("test " + i4, receive.getText());
            createSession.commit();
        }
        boolean waitFor = Wait.waitFor(() -> {
            long messageCount = locateQueue.getMessageCount();
            long messageCount2 = locateQueue2.getMessageCount();
            logger.debug("Queue on Server 1 = " + messageCount);
            logger.debug("Queue on Server 2 = " + messageCount2);
            return messageCount == 0 && messageCount2 == 0;
        }, 5000L, 1000L);
        if (logger.isDebugEnabled() && !waitFor) {
            debugData();
            Assert.fail("q1 = " + locateQueue.getMessageCount() + ", q2 = " + locateQueue2.getMessageCount());
        }
        Assert.assertEquals(0L, locateQueue.getMessageCount());
        Assert.assertEquals(0L, locateQueue2.getConsumerCount());
        System.out.println("Queue on Server 1 = " + locateQueue.getMessageCount());
        System.out.println("Queue on Server 2 = " + locateQueue2.getMessageCount());
        this.server_2.stop();
        this.server.stop();
    }

    private void debugData() throws Exception {
        StringPrintStream stringPrintStream = new StringPrintStream();
        PrintStream newStream = stringPrintStream.newStream();
        org.apache.activemq.artemis.core.server.Queue locateQueue = this.server.locateQueue(getQueueName());
        org.apache.activemq.artemis.core.server.Queue locateQueue2 = this.server_2.locateQueue(getQueueName());
        newStream.println("*******************************************************************************************************************************");
        newStream.println("Queue on Server 1 with count = " + locateQueue.getMessageCount());
        locateQueue.forEach(messageReference -> {
            newStream.println("Server1 has reference " + messageReference.getMessage());
        });
        newStream.println("*******************************************************************************************************************************");
        newStream.println("Queue on Server 2 with count = " + locateQueue2.getMessageCount());
        locateQueue2.forEach(messageReference2 -> {
            newStream.println("Server2 has reference " + messageReference2.getMessage());
        });
        newStream.println("*******************************************************************************************************************************");
        newStream.println("PrintData Server 1");
        PrintData.printMessages(this.server.getConfiguration().getJournalLocation(), newStream, false, false, true, false);
        newStream.println("*******************************************************************************************************************************");
        newStream.println("PrintData Server 2");
        PrintData.printMessages(this.server_2.getConfiguration().getJournalLocation(), newStream, false, false, true, false);
        logger.debug("Data Available on Servers:\n" + stringPrintStream.toString());
    }

    @Test
    public void testSyncDataNoSuppliedID() throws Exception {
        this.server.setIdentity("Server1");
        AMQPBrokerConnectConfiguration retryInterval = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:5673").setReconnectAttempts(3).setRetryInterval(100);
        retryInterval.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
        this.server.getConfiguration().addAMQPConnection(retryInterval);
        this.server.start();
        this.server_2 = createServer(AMQP_PORT_2, false);
        this.server_2.setIdentity("Server2");
        AMQPBrokerConnectConfiguration retryInterval2 = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:5672").setReconnectAttempts(-1).setRetryInterval(100);
        retryInterval2.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
        this.server_2.getConfiguration().addAMQPConnection(retryInterval2);
        this.server_2.start();
        this.server_2.addAddressInfo(new AddressInfo(getQueueName()).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
        this.server_2.createQueue(new QueueConfiguration(getQueueName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
        Wait.assertTrue(() -> {
            return this.server_2.locateQueue(getQueueName()) != null;
        });
        Wait.assertTrue(() -> {
            return this.server.locateQueue(getQueueName()) != null;
        });
        AmqpConnection createConnection = new AmqpClient(new URI("tcp://localhost:5672"), (String) null, (String) null).createConnection();
        createConnection.connect();
        AmqpSession createSession = createConnection.createSession();
        AmqpConnection createConnection2 = new AmqpClient(new URI("tcp://localhost:5673"), (String) null, (String) null).createConnection();
        createConnection2.connect();
        AmqpSession createSession2 = createConnection2.createSession();
        AmqpSender createSender = createSession.createSender(getQueueName());
        AmqpSender createSender2 = createSession2.createSender(getQueueName());
        for (int i = 0; i < 100; i++) {
            AmqpMessage amqpMessage = new AmqpMessage();
            amqpMessage.setDurable(true);
            amqpMessage.setApplicationProperty("i", Integer.valueOf(i));
            createSender.send(amqpMessage);
        }
        org.apache.activemq.artemis.core.server.Queue locateQueue = this.server.locateQueue(getQueueName());
        org.apache.activemq.artemis.core.server.Queue locateQueue2 = this.server_2.locateQueue(getQueueName());
        Assert.assertNotNull(locateQueue);
        Assert.assertNotNull(locateQueue2);
        locateQueue.getClass();
        Wait.assertEquals(100, locateQueue::getMessageCount);
        locateQueue2.getClass();
        Wait.assertEquals(100, locateQueue2::getMessageCount);
        for (int i2 = 100; i2 < 100 * 2; i2++) {
            AmqpMessage amqpMessage2 = new AmqpMessage();
            amqpMessage2.setDurable(true);
            amqpMessage2.setApplicationProperty("i", Integer.valueOf(i2));
            createSender2.send(amqpMessage2);
        }
        locateQueue.getClass();
        Wait.assertEquals(100 * 2, locateQueue::getMessageCount);
        locateQueue2.getClass();
        Wait.assertEquals(100 * 2, locateQueue2::getMessageCount);
        AmqpReceiver createReceiver = createSession.createReceiver(getQueueName());
        createReceiver.flow((100 * 2) + 1);
        for (int i3 = 0; i3 < 100 * 2; i3++) {
            AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
            Assert.assertNotNull(receive);
            receive.accept();
            Assert.assertEquals(i3, ((Integer) receive.getApplicationProperty("i")).intValue());
        }
        locateQueue.getClass();
        Wait.assertEquals(0L, locateQueue::getMessageCount);
        Wait.assertEquals(0L, () -> {
            System.out.println(locateQueue2.getMessageCount());
            return locateQueue2.getMessageCount();
        });
        createConnection.close();
        createConnection2.close();
        this.server_2.stop();
        this.server.stop();
    }
}
