package org.apache.activemq.artemis.tests.integration.paging;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.ToIntFunction;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.persistence.XmlImportExportTest;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.Wait;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/paging/PageAckScanTest.class */
public class PageAckScanTest extends ActiveMQTestBase {
    private static final Logger logger = Logger.getLogger(PageAckScanTest.class);
    private static final String ADDRESS = "MessagesExpiredPagingTest";
    ActiveMQServer server;
    protected static final int PAGE_MAX = 10240;
    protected static final int PAGE_SIZE = 1024;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/paging/PageAckScanTest$CompareI.class */
    public class CompareI implements ToIntFunction<PagedReference> {
        final int i;

        CompareI(int i) {
            this.i = i;
        }

        @Override // java.util.function.ToIntFunction
        public int applyAsInt(PagedReference pagedReference) {
            return pagedReference.getMessage().getIntProperty("i").intValue() - this.i;
        }
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
        Configuration journalSyncNonTransactional = createDefaultConfig(0, true).setJournalSyncNonTransactional(false);
        journalSyncNonTransactional.setMessageExpiryScanPeriod(-1L);
        this.server = createServer(true, journalSyncNonTransactional, PAGE_SIZE, 10240L);
        this.server.getAddressSettingsRepository().clear();
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setPageSizeBytes(PAGE_SIZE).setMaxSizeBytes(10240L).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setAutoCreateAddresses(false).setAutoCreateQueues(false));
        this.server.start();
        this.server.addAddressInfo(new AddressInfo(ADDRESS).addRoutingType(RoutingType.ANYCAST));
        this.server.createQueue(new QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST));
    }

    @Test
    public void testScanCore() throws Exception {
        testScan("CORE", XmlImportExportTest.CONSUMER_TIMEOUT, 1000, 100, PAGE_SIZE);
    }

    @Test
    public void testScanAMQP() throws Exception {
        testScan("AMQP", XmlImportExportTest.CONSUMER_TIMEOUT, 1000, 100, PAGE_SIZE);
    }

    public void testScan(String str, int i, int i2, int i3, int i4) throws Exception {
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory(str, "tcp://localhost:61616");
        StringBuffer stringBuffer = new StringBuffer();
        for (int i5 = 0; i5 < i4; i5++) {
            stringBuffer.append("*");
        }
        String stringBuffer2 = stringBuffer.toString();
        Queue locateQueue = this.server.locateQueue(ADDRESS);
        locateQueue.getPagingStore().startPaging();
        Connection createConnection = createConnectionFactory.createConnection();
        Throwable th = null;
        try {
            Session createSession = createConnection.createSession(true, 0);
            MessageProducer createProducer = createSession.createProducer(createSession.createQueue(ADDRESS));
            for (int i6 = 0; i6 < 20; i6++) {
                TextMessage createTextMessage = createSession.createTextMessage(stringBuffer2);
                createTextMessage.setIntProperty("i", i6);
                createProducer.send(createTextMessage);
            }
            createSession.commit();
            if (createConnection != null) {
                if (0 != 0) {
                    try {
                        createConnection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    createConnection.close();
                }
            }
            AtomicInteger atomicInteger = new AtomicInteger(0);
            ReusableLatch reusableLatch = new ReusableLatch(4);
            reusableLatch.getClass();
            Runnable runnable = reusableLatch::countDown;
            Runnable runnable2 = () -> {
                atomicInteger.incrementAndGet();
                runnable.run();
            };
            AtomicInteger atomicInteger2 = new AtomicInteger(0);
            PageSubscription pageSubscription = locateQueue.getPageSubscription();
            pageSubscription.scanAck(() -> {
                return false;
            }, new CompareI(15), runnable, runnable2);
            pageSubscription.scanAck(() -> {
                return false;
            }, new CompareI(11), runnable, runnable2);
            pageSubscription.scanAck(() -> {
                return false;
            }, new CompareI(99), runnable, runnable2);
            pageSubscription.scanAck(() -> {
                return false;
            }, new CompareI(-30), runnable, runnable2);
            pageSubscription.scanAck(() -> {
                atomicInteger2.incrementAndGet();
                return true;
            }, new CompareI(333), runnable, runnable2);
            Assert.assertTrue(reusableLatch.await(5L, TimeUnit.MINUTES));
            Assert.assertEquals(2L, atomicInteger.get());
            atomicInteger2.getClass();
            Wait.assertEquals(1, atomicInteger2::get);
            Connection createConnection2 = createConnectionFactory.createConnection();
            Throwable th3 = null;
            try {
                try {
                    Session createSession2 = createConnection2.createSession(false, 1);
                    jakarta.jms.Queue createQueue = createSession2.createQueue(ADDRESS);
                    createConnection2.start();
                    MessageConsumer createConsumer = createSession2.createConsumer(createQueue);
                    for (int i7 = 0; i7 < 18; i7++) {
                        TextMessage receive = createConsumer.receive(5000L);
                        Assert.assertNotNull(receive);
                        Assert.assertTrue((receive.getIntProperty("i") == 11 || receive.getIntProperty("i") == 15) ? false : true);
                    }
                    Assert.assertNull(createConsumer.receiveNoWait());
                    if (createConnection2 != null) {
                        if (0 == 0) {
                            createConnection2.close();
                            return;
                        }
                        try {
                            createConnection2.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th3 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (createConnection2 != null) {
                    if (th3 != null) {
                        try {
                            createConnection2.close();
                        } catch (Throwable th7) {
                            th3.addSuppressed(th7);
                        }
                    } else {
                        createConnection2.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (createConnection != null) {
                if (0 != 0) {
                    try {
                        createConnection.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    createConnection.close();
                }
            }
            throw th8;
        }
    }
}
