package org.apache.activemq.artemis.tests.integration.amqp;

import jakarta.jms.Connection;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.integration.client.AutoCreateJmsDestinationTest;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Sender;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.class */
public class AmqpSenderTest extends AmqpClientTestSupport {

    @Parameterized.Parameter(0)
    public boolean persistCache;

    @Parameterized.Parameters(name = "persistentCache={0}")
    public static Collection<Object[]> parameters() {
        return Arrays.asList(new Object[]{true}, new Object[]{false});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport
    public void addConfiguration(ActiveMQServer activeMQServer) {
        activeMQServer.getConfiguration().setPersistIDCache(this.persistCache);
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport
    protected void addAdditionalAcceptors(ActiveMQServer activeMQServer) throws Exception {
    }

    @Test(timeout = 60000)
    public void testSenderSettlementModeSettledIsHonored() throws Exception {
        doTestSenderSettlementModeIsHonored(SenderSettleMode.SETTLED);
    }

    @Test(timeout = 60000)
    public void testSenderSettlementModeUnsettledIsHonored() throws Exception {
        doTestSenderSettlementModeIsHonored(SenderSettleMode.UNSETTLED);
    }

    @Test(timeout = 60000)
    public void testSenderSettlementModeMixedIsHonored() throws Exception {
        doTestSenderSettlementModeIsHonored(SenderSettleMode.MIXED);
    }

    public void doTestSenderSettlementModeIsHonored(SenderSettleMode senderSettleMode) throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSender createSender = addConnection.createSession().createSender("queue://" + getTestName(), senderSettleMode, ReceiverSettleMode.FIRST);
        Queue proxyToQueue = getProxyToQueue(getQueueName());
        assertNotNull(proxyToQueue);
        assertEquals(0L, proxyToQueue.getMessageCount());
        assertEquals(senderSettleMode, createSender.getEndpoint().getRemoteSenderSettleMode());
        AmqpMessage amqpMessage = new AmqpMessage();
        amqpMessage.setText("Test-Message");
        createSender.send(amqpMessage);
        createSender.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testReceiverSettlementModeSetToFirst() throws Exception {
        doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.FIRST);
    }

    @Test(timeout = 60000)
    public void testReceiverSettlementModeSetToSecond() throws Exception {
        doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.SECOND);
    }

    private void doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode receiverSettleMode) throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSender createSender = addConnection.createSession().createSender("queue://" + getTestName(), SenderSettleMode.UNSETTLED, receiverSettleMode);
        Queue proxyToQueue = getProxyToQueue(getQueueName());
        assertNotNull(proxyToQueue);
        assertEquals(0L, proxyToQueue.getMessageCount());
        assertEquals(ReceiverSettleMode.FIRST, createSender.getEndpoint().getRemoteReceiverSettleMode());
        createSender.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testUnsettledSender() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1000);
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        addConnection.setStateInspector(new AmqpValidator() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpSenderTest.1
            public void inspectDeliveryUpdate(Sender sender, Delivery delivery) {
                if (delivery.remotelySettled()) {
                    countDownLatch.countDown();
                }
            }
        });
        AmqpSender createSender = addConnection.createSession().createSender(getQueueName(), false);
        for (int i = 1; i <= 1000; i++) {
            AmqpMessage amqpMessage = new AmqpMessage();
            amqpMessage.setText("Test-Message: " + i);
            createSender.send(amqpMessage);
        }
        Queue proxyToQueue = getProxyToQueue(getQueueName());
        Wait.assertTrue("All messages should arrive", () -> {
            return proxyToQueue.getMessageCount() == 1000;
        });
        createSender.close();
        assertTrue("Remote should have settled all deliveries", countDownLatch.await(5L, TimeUnit.MINUTES));
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testMixDurableAndNonDurable() throws Exception {
        Connection createConnection = CFUtil.createConnectionFactory("AMQP", getBrokerAmqpConnectionURI().toString() + "?jms.forceAsyncSend=true").createConnection();
        Session createSession = createConnection.createSession(false, 1);
        jakarta.jms.Queue createQueue = createSession.createQueue(getQueueName());
        MessageProducer createProducer = createSession.createProducer(createQueue);
        boolean z = false;
        for (int i = 1; i <= 2000; i++) {
            Message createMessage = createSession.createMessage();
            createMessage.setIntProperty("i", i);
            createProducer.setDeliveryMode(z ? 2 : 1);
            z = !z;
            createProducer.send(createMessage);
        }
        createConnection.start();
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        for (int i2 = 1; i2 <= 2000; i2++) {
            Assert.assertNotNull(createConsumer.receive(10000L));
            Assert.assertEquals(i2, r0.getIntProperty("i"));
        }
        Assert.assertNull(createConsumer.receiveNoWait());
        createConnection.close();
    }

    @Test(timeout = 60000)
    public void testPresettledSender() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSender createSender = addConnection.createSession().createSender(getQueueName(), true);
        for (int i = 1; i <= 1000; i++) {
            AmqpMessage amqpMessage = new AmqpMessage();
            amqpMessage.setText("Test-Message: " + i);
            createSender.send(amqpMessage);
        }
        Queue proxyToQueue = getProxyToQueue(getQueueName());
        Wait.assertTrue("All messages should arrive", () -> {
            return proxyToQueue.getMessageCount() == 1000;
        });
        createSender.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testDuplicateDetection() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpSender createSender = createSession.createSender(getQueueName(), true);
        AmqpReceiver createReceiver = createSession.createReceiver(getQueueName());
        createReceiver.setPresettle(true);
        createReceiver.flow(10);
        Assert.assertNull("somehow the queue had messages from a previous test", createReceiver.receiveNoWait());
        for (int i = 1; i <= 10; i++) {
            AmqpMessage amqpMessage = new AmqpMessage();
            amqpMessage.setApplicationProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), "123");
            createSender.send(amqpMessage);
        }
        createReceiver.receive(5L, TimeUnit.SECONDS);
        Assert.assertNull(createReceiver.receiveNoWait());
        createSender.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testDuplicateDetectionRollback() throws Exception {
        Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
        Throwable th = null;
        try {
            Session createSession = createConnection.createSession(true, 0);
            Throwable th2 = null;
            try {
                try {
                    jakarta.jms.Queue createQueue = createSession.createQueue(getQueueName());
                    MessageProducer createProducer = createSession.createProducer(createQueue);
                    TextMessage createTextMessage = createSession.createTextMessage(AutoCreateJmsDestinationTest.QUEUE_NAME);
                    createTextMessage.setStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), "123");
                    createProducer.send(createTextMessage);
                    createSession.rollback();
                    createProducer.send(createTextMessage);
                    createSession.commit();
                    createConnection.start();
                    MessageConsumer createConsumer = createSession.createConsumer(createQueue);
                    Assert.assertNotNull(createConsumer.receive(5000L));
                    Assert.assertNull(createConsumer.receiveNoWait());
                    createSession.commit();
                    Queue locateQueue = this.server.locateQueue(getQueueName());
                    locateQueue.getClass();
                    Wait.assertEquals(0L, locateQueue::getMessageCount);
                    TextMessage createTextMessage2 = createSession.createTextMessage(AutoCreateJmsDestinationTest.QUEUE_NAME);
                    createTextMessage2.setStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), "123");
                    createProducer.send(createTextMessage2);
                    boolean z = false;
                    try {
                        createSession.commit();
                    } catch (Exception e) {
                        z = true;
                    }
                    Assert.assertTrue(z);
                    if (createSession != null) {
                        if (0 != 0) {
                            try {
                                createSession.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createSession.close();
                        }
                    }
                    if (createConnection != null) {
                        if (0 == 0) {
                            createConnection.close();
                            return;
                        }
                        try {
                            createConnection.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (createSession != null) {
                    if (th2 != null) {
                        try {
                            createSession.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        createSession.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (createConnection != null) {
                if (0 != 0) {
                    try {
                        createConnection.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    createConnection.close();
                }
            }
            throw th8;
        }
    }

    @Test(timeout = 60000)
    public void testSenderCreditReplenishment() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        AmqpClient createAmqpClient = createAmqpClient(this.guestUser, this.guestPass);
        createAmqpClient.setValidator(new AmqpValidator() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpSenderTest.2
            public void inspectCredit(Sender sender) {
                int incrementAndGet = atomicInteger.incrementAndGet();
                switch (incrementAndGet) {
                    case 1:
                        Assert.assertEquals("Unexpected initial credit", 1000L, sender.getCredit());
                        countDownLatch.countDown();
                        return;
                    case 2:
                        Assert.assertEquals("Unexpected replenished credit", 1000L, sender.getCredit());
                        countDownLatch2.countDown();
                        return;
                    default:
                        throw new IllegalStateException("Unexpected additional flow: " + incrementAndGet);
                }
            }
        });
        AmqpConnection addConnection = addConnection(createAmqpClient.connect());
        try {
            AmqpSender createSender = addConnection.createSession().createSender(getQueueName());
            assertTrue("Expected credit did not arrive", countDownLatch.await(3000L, TimeUnit.MILLISECONDS));
            for (int i = 1; i <= 699; i++) {
                AmqpMessage amqpMessage = new AmqpMessage();
                amqpMessage.setText("Test-Message: " + i);
                createSender.send(amqpMessage);
            }
            assertFalse("Expected credit not to have been refreshed yet", countDownLatch2.await(50L, TimeUnit.MILLISECONDS));
            AmqpMessage amqpMessage2 = new AmqpMessage();
            amqpMessage2.setText("Test-Message: 700");
            createSender.send(amqpMessage2);
            assertTrue("Expected credit refresh did not occur", countDownLatch2.await(3000L, TimeUnit.MILLISECONDS));
            addConnection.close();
            addConnection.getStateInspector().assertValid();
        } catch (Throwable th) {
            addConnection.getStateInspector().assertValid();
            throw th;
        }
    }
}
