package org.apache.activemq.artemis.tests.integration.openwire.amq;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.IllegalStateException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.Session;
import java.io.IOException;
import java.lang.Thread;
import java.net.Socket;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/openwire/amq/JMSConsumer2Test.class */
public class JMSConsumer2Test extends BasicOpenWireTest {
    @Test
    public void testMessageListenerWithConsumerCanBeStoppedConcurently() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.connection.start();
        Session createSession = this.connection.createSession(false, 2);
        ActiveMQDestination createDestination = createDestination(createSession, (byte) 1);
        sendMessages(createSession, (Destination) createDestination, 2000);
        final ActiveMQMessageConsumer createConsumer = createSession.createConsumer(createDestination);
        final Map synchronizedMap = Collections.synchronizedMap(new HashMap());
        Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: org.apache.activemq.artemis.tests.integration.openwire.amq.JMSConsumer2Test.1
            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                synchronizedMap.put(thread, th);
            }
        });
        final ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.artemis.tests.integration.openwire.amq.JMSConsumer2Test.2
            public void onMessage(Message message) {
                newCachedThreadPool.execute(new Runnable(message, atomicInteger, createConsumer, countDownLatch, synchronizedMap) { // from class: org.apache.activemq.artemis.tests.integration.openwire.amq.JMSConsumer2Test.1AckAndClose
                    private final Message message;
                    final /* synthetic */ AtomicInteger val$counter;
                    final /* synthetic */ ActiveMQMessageConsumer val$consumer;
                    final /* synthetic */ CountDownLatch val$closeDone;
                    final /* synthetic */ Map val$exceptions;

                    {
                        this.val$counter = r6;
                        this.val$consumer = r7;
                        this.val$closeDone = r8;
                        this.val$exceptions = r9;
                        this.message = message;
                    }

                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            int incrementAndGet = this.val$counter.incrementAndGet();
                            if (incrementAndGet == 590) {
                                this.val$consumer.close();
                                this.val$closeDone.countDown();
                            }
                            if (incrementAndGet % 200 == 0) {
                                this.message.acknowledge();
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                            if (e instanceof IllegalStateException) {
                                return;
                            }
                            this.val$exceptions.put(Thread.currentThread(), e);
                        }
                    }
                });
            }
        });
        assertTrue(countDownLatch.await(20L, TimeUnit.SECONDS));
        Thread.sleep(1000L);
        assertTrue("no exceptions: " + synchronizedMap, synchronizedMap.isEmpty());
        newCachedThreadPool.shutdown();
    }

    @Test
    public void testDupsOkConsumer() throws Exception {
        this.connection.start();
        Session createSession = this.connection.createSession(false, 3);
        ActiveMQDestination createDestination = createDestination(createSession, (byte) 1);
        MessageConsumer createConsumer = createSession.createConsumer(createDestination);
        sendMessages(createSession, (Destination) createDestination, 4);
        for (int i = 0; i < 4; i++) {
            assertNotNull(createConsumer.receive(1000L));
        }
        assertNull(createConsumer.receive(1000L));
        createConsumer.close();
        assertNull(createSession.createConsumer(createDestination).receive(1000L));
    }

    @Test
    public void testRedispatchOfUncommittedTx() throws Exception {
        this.connection.start();
        Session createSession = this.connection.createSession(true, 0);
        ActiveMQDestination createDestination = createDestination(createSession, (byte) 1);
        sendMessages((Connection) this.connection, (Destination) createDestination, 2);
        MessageConsumer createConsumer = createSession.createConsumer(createDestination);
        assertNotNull(createConsumer.receive(1000L));
        Message receive = createConsumer.receive(5000L);
        assertNotNull(receive);
        assertFalse("redelivered flag set", receive.getJMSRedelivered());
        Session createSession2 = this.connection.createSession(true, 0);
        MessageConsumer createConsumer2 = createSession2.createConsumer(createDestination);
        createSession.close();
        Message receive2 = createConsumer2.receive(3000L);
        assertNotNull(receive2);
        assertTrue("redelivered flag set", receive2.getJMSRedelivered());
        assertEquals(2L, receive2.getLongProperty("JMSXDeliveryCount"));
        Message receive3 = createConsumer2.receive(1000L);
        assertNotNull(receive3);
        assertTrue(receive3.getJMSRedelivered());
        assertEquals(2L, receive3.getLongProperty("JMSXDeliveryCount"));
        createSession2.commit();
        assertNull(createConsumer2.receive(500L));
        createSession2.close();
    }

    @Test
    public void testRedispatchOfRolledbackTx() throws Exception {
        this.connection.start();
        Session createSession = this.connection.createSession(true, 0);
        ActiveMQDestination createDestination = createDestination(createSession, (byte) 1);
        sendMessages((Connection) this.connection, (Destination) createDestination, 2);
        MessageConsumer createConsumer = createSession.createConsumer(createDestination);
        assertNotNull(createConsumer.receive(1000L));
        assertNotNull(createConsumer.receive(1000L));
        Session createSession2 = this.connection.createSession(true, 0);
        MessageConsumer createConsumer2 = createSession2.createConsumer(createDestination);
        createSession.rollback();
        createSession.close();
        Message receive = createConsumer2.receive(1000L);
        assertNotNull(receive);
        assertTrue(receive.getJMSRedelivered());
        assertEquals(2L, receive.getLongProperty("JMSXDeliveryCount"));
        Message receive2 = createConsumer2.receive(1000L);
        assertNotNull(receive2);
        assertTrue(receive2.getJMSRedelivered());
        assertEquals(2L, receive2.getLongProperty("JMSXDeliveryCount"));
        createSession2.commit();
        assertNull(createConsumer2.receive(500L));
        createSession2.close();
    }

    @Test
    public void testRedeliveryOnServerConnectionFailWithPendingAckInLocalTx() throws Exception {
        this.connection.start();
        sendMessages((Connection) this.connection, (Destination) new ActiveMQQueue(this.queueName), 1);
        this.connection.close();
        this.factory.setWatchTopicAdvisories(false);
        this.factory.setNonBlockingRedelivery(true);
        this.connection = this.factory.createConnection();
        this.connection.start();
        Session createSession = this.connection.createSession(true, 0);
        ActiveMQDestination createDestination = createDestination(createSession, (byte) 1);
        MessageConsumer createConsumer = createSession.createConsumer(createDestination);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        createConsumer.setMessageListener(message -> {
            countDownLatch.countDown();
        });
        assertTrue(countDownLatch.await(1L, TimeUnit.SECONDS));
        final Queue locateQueue = this.server.locateQueue(new SimpleString(this.queueName));
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.openwire.amq.JMSConsumer2Test.3
            public boolean isSatisfied() throws Exception {
                return locateQueue.getAcknowledgeAttempts() > 0;
            }
        });
        try {
            ((Socket) this.connection.getTransport().narrow(Socket.class)).close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        try {
            this.connection.close();
        } catch (Exception e2) {
        }
        this.connection = this.factory.createConnection();
        this.connection.start();
        Session createSession2 = this.connection.createSession(true, 0);
        assertNotNull(createSession2.createConsumer(createDestination).receive(2000L));
        createSession2.commit();
        this.connection.close();
    }
}
