package org.apache.activemq.artemis.tests.integration.client;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/ConsumerCloseTest.class */
public class ConsumerCloseTest extends ActiveMQTestBase {
    private ClientSessionFactory sf;
    private ActiveMQServer server;
    private ClientSession session;
    private SimpleString queue;
    private SimpleString address;
    private ServerLocator locator;

    @Test
    public void testCanNotUseAClosedConsumer() throws Exception {
        final ClientConsumer createConsumer = this.session.createConsumer(this.queue);
        createConsumer.close();
        Assert.assertTrue(createConsumer.isClosed());
        expectActiveMQException(ActiveMQExceptionType.OBJECT_CLOSED, new ActiveMQTestBase.ActiveMQAction() { // from class: org.apache.activemq.artemis.tests.integration.client.ConsumerCloseTest.1
            public void run() throws ActiveMQException {
                createConsumer.receive();
            }
        });
        expectActiveMQException(ActiveMQExceptionType.OBJECT_CLOSED, new ActiveMQTestBase.ActiveMQAction() { // from class: org.apache.activemq.artemis.tests.integration.client.ConsumerCloseTest.2
            public void run() throws ActiveMQException {
                createConsumer.receiveImmediate();
            }
        });
        expectActiveMQException(ActiveMQExceptionType.OBJECT_CLOSED, new ActiveMQTestBase.ActiveMQAction() { // from class: org.apache.activemq.artemis.tests.integration.client.ConsumerCloseTest.3
            public void run() throws ActiveMQException {
                createConsumer.setMessageHandler(new MessageHandler() { // from class: org.apache.activemq.artemis.tests.integration.client.ConsumerCloseTest.3.1
                    public void onMessage(ClientMessage clientMessage) {
                    }
                });
            }
        });
    }

    @Test
    public void testCloseWithManyMessagesInBufferAndSlowConsumer() throws Exception {
        ClientConsumerImpl createConsumer = this.session.createConsumer(this.queue);
        ClientProducer createProducer = this.session.createProducer(this.address);
        for (int i = 0; i < 100; i++) {
            createProducer.send(this.session.createMessage(false));
        }
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        createConsumer.setMessageHandler(new MessageHandler() { // from class: org.apache.activemq.artemis.tests.integration.client.ConsumerCloseTest.1MyHandler
            public void onMessage(ClientMessage clientMessage) {
                try {
                    countDownLatch.countDown();
                    countDownLatch2.await();
                } catch (Exception e) {
                }
            }
        });
        this.session.start();
        assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
        long currentTimeMillis = System.currentTimeMillis() + 1000;
        while (createConsumer.getBufferSize() < 2 && System.currentTimeMillis() > currentTimeMillis) {
            Thread.sleep(10L);
        }
        countDownLatch2.countDown();
        long currentTimeMillis2 = System.currentTimeMillis();
        createConsumer.close();
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis2 <= 1500);
    }

    @Test
    public void testCloseWithScheduledRedelivery() throws Exception {
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setRedeliveryDelay(50000L));
        ClientConsumerImpl createConsumer = this.session.createConsumer(this.queue);
        ClientProducer createProducer = this.session.createProducer(this.address);
        for (int i = 0; i < 100; i++) {
            createProducer.send(this.session.createMessage(false));
        }
        this.session.start();
        createConsumer.receive(5000L).acknowledge();
        long currentTimeMillis = System.currentTimeMillis() + 1000;
        while (createConsumer.getBufferSize() < 2 && System.currentTimeMillis() > currentTimeMillis) {
            Thread.sleep(10L);
        }
        createConsumer.close();
        ClientConsumer createConsumer2 = this.session.createConsumer(this.queue);
        for (int i2 = 0; i2 < 99; i2++) {
            ClientMessage receive = createConsumer2.receive(1000L);
            assertNotNull("Expected message at i=" + i2, receive);
            receive.acknowledge();
        }
        assertNull(createConsumer2.receiveImmediate());
        long currentTimeMillis2 = System.currentTimeMillis();
        createConsumer2.close();
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis2 <= 1500);
    }

    @Test
    public void testCloseWithScheduledRedeliveryWithTX() throws Exception {
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setRedeliveryDelay(1000L));
        ClientProducer createProducer = this.session.createProducer(this.address);
        for (int i = 0; i < 100; i++) {
            ClientMessage createMessage = this.session.createMessage(false);
            createMessage.putIntProperty("count", i);
            createProducer.send(createMessage);
        }
        this.session.close();
        this.session = addClientSession(this.sf.createSession(false, false));
        ClientConsumerImpl createConsumer = this.session.createConsumer(this.queue);
        this.session.start();
        createConsumer.receive(500L).acknowledge();
        long currentTimeMillis = System.currentTimeMillis() + 1000;
        while (createConsumer.getBufferSize() < 2 && System.currentTimeMillis() > currentTimeMillis) {
            Thread.sleep(10L);
        }
        createConsumer.close();
        this.session.rollback();
        ClientConsumer createConsumer2 = this.session.createConsumer(this.queue);
        for (int i2 = 0; i2 < 99; i2++) {
            ClientMessage receive = createConsumer2.receive(1000L);
            assertNotNull("Expected message at i=" + i2, receive);
            receive.acknowledge();
        }
        assertNull(createConsumer2.receiveImmediate());
        ClientMessage receive2 = createConsumer2.receive(5000L);
        assertNotNull(receive2);
        assertEquals(0L, receive2.getIntProperty("count").intValue());
        receive2.acknowledge();
        this.session.commit();
        assertNull(createConsumer2.receiveImmediate());
        long currentTimeMillis2 = System.currentTimeMillis();
        createConsumer2.close();
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis2 <= 1500);
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig(), false));
        this.server.start();
        this.address = RandomUtil.randomSimpleString();
        this.queue = RandomUtil.randomSimpleString();
        this.locator = createInVMNonHALocator();
        this.sf = createSessionFactory(this.locator);
        this.session = addClientSession(this.sf.createSession(false, true, true));
        this.session.createQueue(new QueueConfiguration(this.queue).setAddress(this.address).setDurable(false));
    }
}
