package org.apache.activemq.artemis.tests.integration.amqp;

import jakarta.jms.Connection;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/AmqpPurgeOnNoConsumersTest.class */
public class AmqpPurgeOnNoConsumersTest extends AmqpClientTestSupport {
    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport
    protected String getConfiguredProtocols() {
        return "AMQP,OPENWIRE,CORE";
    }

    @Test(timeout = 60000)
    public void testQueueReceiverReadMessage() throws Exception {
        SimpleString simpleString = new SimpleString("purgeQueue");
        this.server.addAddressInfo(new AddressInfo(simpleString, RoutingType.ANYCAST));
        this.server.createQueue(new QueueConfiguration(simpleString).setRoutingType(RoutingType.ANYCAST).setMaxConsumers(1).setPurgeOnNoConsumers(true).setAutoCreateAddress(false));
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpReceiver createReceiver = addConnection.createSession().createReceiver("purgeQueue");
        QueueImpl proxyToQueue = getProxyToQueue("purgeQueue");
        assertEquals(0L, proxyToQueue.getPageSubscription().getPagingStore().getAddressSize());
        assertEquals(0L, proxyToQueue.getMessageCount());
        sendMessages("purgeQueue", 5, (RoutingType) null, true);
        proxyToQueue.getClass();
        Wait.assertEquals(5L, proxyToQueue::getMessageCount);
        createReceiver.flow(5);
        for (int i = 0; i < 4; i++) {
            try {
                AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
                receive.accept();
                assertNotNull(receive);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        try {
            createReceiver.close();
        } catch (IOException e2) {
            e2.printStackTrace();
        }
        proxyToQueue.getClass();
        Wait.assertEquals(0L, proxyToQueue::getMessageCount);
        assertEquals(0L, proxyToQueue.getPageSubscription().getPagingStore().getAddressSize());
        addConnection.close();
        this.server.stop();
        this.server.start();
        QueueImpl proxyToQueue2 = getProxyToQueue("purgeQueue");
        assertEquals(0L, proxyToQueue2.getMessageCount());
        assertEquals(0L, proxyToQueue2.getPageSubscription().getPagingStore().getAddressSize());
    }

    @Test(timeout = 60000)
    public void testPurgeQueueCoreRollback() throws Exception {
        SimpleString simpleString = new SimpleString("purgeQueue");
        this.server.addAddressInfo(new AddressInfo(simpleString, RoutingType.ANYCAST));
        this.server.createQueue(new QueueConfiguration(simpleString).setRoutingType(RoutingType.ANYCAST).setMaxConsumers(1).setPurgeOnNoConsumers(true).setAutoCreateAddress(false));
        Connection createConnection = new ActiveMQConnectionFactory("tcp://localhost:5672").createConnection();
        Session createSession = createConnection.createSession(true, 0);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue("purgeQueue"));
        MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue("purgeQueue"));
        for (int i = 0; i < 10; i++) {
            createProducer.send(createSession.createTextMessage("hello " + i));
        }
        createSession.commit();
        QueueImpl proxyToQueue = getProxyToQueue("purgeQueue");
        proxyToQueue.getClass();
        Wait.assertEquals(10L, proxyToQueue::getMessageCount);
        createConnection.start();
        for (int i2 = 0; i2 < 10; i2++) {
            TextMessage receive = createConsumer.receive(1000L);
            assertNotNull(receive);
            assertEquals("hello " + i2, receive.getText());
        }
        createConsumer.close();
        createSession.rollback();
        createConnection.close();
        proxyToQueue.getClass();
        Wait.assertEquals(0L, proxyToQueue::getMessageCount);
        this.server.stop();
        this.server.start();
        QueueImpl proxyToQueue2 = getProxyToQueue("purgeQueue");
        assertEquals(0L, proxyToQueue2.getMessageCount());
        assertEquals(0L, proxyToQueue2.getPageSubscription().getPagingStore().getAddressSize());
    }
}
