package org.apache.activemq.artemis.tests.integration.cluster.failover;

import jakarta.jms.Connection;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.util.HashMap;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/failover/PageCleanupWhileReplicaCatchupTest.class */
public class PageCleanupWhileReplicaCatchupTest extends FailoverTestBase {
    private static final Logger logger = Logger.getLogger(PageCleanupWhileReplicaCatchupTest.class);
    volatile boolean running = true;

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/failover/PageCleanupWhileReplicaCatchupTest$Worker.class */
    class Worker extends Thread {
        final String queueName;
        final Queue queue;
        volatile Throwable throwable;

        Worker(String str) {
            super("Worker on queue " + str + " for test on PageCleanupWhileReplicaCatchupTest");
            this.queueName = str;
            this.queue = PageCleanupWhileReplicaCatchupTest.this.liveServer.getServer().locateQueue(this.queueName);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                Connection createConnection = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616").createConnection();
                Throwable th = null;
                try {
                    try {
                        Session createSession = createConnection.createSession(false, 1);
                        createConnection.start();
                        jakarta.jms.Queue createQueue = createSession.createQueue(this.queueName);
                        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
                        MessageProducer createProducer = createSession.createProducer(createQueue);
                        while (PageCleanupWhileReplicaCatchupTest.this.running) {
                            this.queue.getPagingStore().startPaging();
                            for (int i = 0; i < 10; i++) {
                                createProducer.send(createSession.createTextMessage("hello " + i));
                            }
                            PagingStore pagingStore = this.queue.getPagingStore();
                            pagingStore.getClass();
                            Wait.assertTrue(pagingStore::isPaging);
                            for (int i2 = 0; i2 < 10; i2++) {
                                Assert.assertNotNull(createConsumer.receive(5000L));
                            }
                            String str = "Waiting for !Paging on " + this.queueName + " with folder " + this.queue.getPagingStore().getFolder();
                            PagingStore pagingStore2 = this.queue.getPagingStore();
                            pagingStore2.getClass();
                            Wait.assertFalse(str, pagingStore2::isPaging);
                        }
                        if (createConnection != null) {
                            if (0 != 0) {
                                try {
                                    createConnection.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                createConnection.close();
                            }
                        }
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } finally {
                }
            } catch (Throwable th4) {
                th4.printStackTrace(System.out);
                this.throwable = th4;
            }
        }
    }

    @Override // org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase
    @Before
    public void setUp() throws Exception {
        this.startBackupServer = false;
        super.setUp();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase
    public void createConfigs() throws Exception {
        createReplicatedConfigs();
    }

    @Override // org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase
    protected TransportConfiguration getAcceptorTransportConfiguration(boolean z) {
        return getNettyAcceptorTransportConfiguration(z);
    }

    @Override // org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase
    protected TransportConfiguration getConnectorTransportConfiguration(boolean z) {
        return getNettyConnectorTransportConfiguration(z);
    }

    protected ActiveMQServer createInVMFailoverServer(boolean z, Configuration configuration, NodeManager nodeManager, int i) {
        HashMap hashMap = new HashMap();
        hashMap.put(ADDRESS.toString(), new AddressSettings().setMaxSizeBytes(2048L).setPageSizeBytes(1024).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE));
        return createInVMFailoverServer(z, configuration, 1024, 2048, hashMap, nodeManager, i);
    }

    @Test(timeout = 120000)
    public void testPageCleanup() throws Throwable {
        Worker[] workerArr = new Worker[20];
        for (int i = 0; i < 20; i++) {
            this.liveServer.getServer().addAddressInfo(new AddressInfo("WORKER_" + i).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
            this.liveServer.getServer().createQueue(new QueueConfiguration("WORKER_" + i).setRoutingType(RoutingType.ANYCAST).setDurable(true));
            workerArr[i] = new Worker("WORKER_" + i);
            workerArr[i].start();
        }
        for (int i2 = 0; i2 < 25; i2++) {
            logger.debug("Starting replica " + i2);
            this.backupServer.start();
            ActiveMQServer server = this.backupServer.getServer();
            server.getClass();
            Wait.assertTrue(server::isReplicaSync);
            this.backupServer.stop();
        }
        this.running = false;
        for (Worker worker : workerArr) {
            worker.join();
        }
        Throwable th = null;
        for (Worker worker2 : workerArr) {
            if (worker2.throwable != null) {
                worker2.queue.getPagingStore().getCursorProvider().scheduleCleanup();
                Thread.sleep(2000L);
                worker2.queue.getPagingStore().getCursorProvider().cleanup();
                System.out.println("PagingStore(" + worker2.queueName + ")::isPaging() = " + worker2.queue.getPagingStore().isPaging() + " after test failure " + worker2.throwable.getMessage());
                th = worker2.throwable;
            }
        }
        if (th != null) {
            throw th;
        }
        for (Worker worker3 : workerArr) {
            PagingStoreImpl pagingStore = worker3.queue.getPagingStore();
            Assert.assertTrue("Store impl " + worker3.queueName + " had more files than expected on " + pagingStore.getFolder(), pagingStore.getNumberOfFiles() <= 1);
        }
    }
}
