package org.apache.activemq.artemis.tests.integration.amqp;

import jakarta.jms.JMSException;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.AmqpSupport;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpUnknownFilterType;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Session;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.class */
public class AmqpReceiverTest extends AmqpClientTestSupport {
    @Test(timeout = 60000)
    public void testCreateQueueReceiver() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpReceiver createReceiver = addConnection.createSession().createReceiver(getQueueName());
        assertNotNull(getProxyToQueue(getQueueName()));
        createReceiver.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testCreateTopicReceiver() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpReceiver createReceiver = addConnection.createSession().createReceiver(getTopicName());
        assertNotNull(getProxyToQueue(getQueueName()));
        createReceiver.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testCreateQueueReceiverWithNoLocalSet() throws Exception {
        AmqpClient createAmqpClient = createAmqpClient();
        createAmqpClient.setValidator(new AmqpValidator() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpReceiverTest.1
            public void inspectOpenedResource(Receiver receiver) {
                if (receiver.getRemoteSource() == null) {
                    markAsInvalid("Link opened with null source.");
                }
                if (AmqpSupport.findFilter(receiver.getRemoteSource().getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS) != null) {
                    markAsInvalid("Broker did not return the NoLocal Filter on Attach");
                }
            }
        });
        AmqpConnection addConnection = addConnection(createAmqpClient.connect());
        addConnection.createSession().createReceiver(getQueueName(), (String) null, true);
        addConnection.getStateInspector().assertValid();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testCreateQueueReceiverWithJMSSelector() throws Exception {
        AmqpClient createAmqpClient = createAmqpClient();
        createAmqpClient.setValidator(new AmqpValidator() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpReceiverTest.2
            public void inspectOpenedResource(Receiver receiver) {
                if (receiver.getRemoteSource() == null) {
                    markAsInvalid("Link opened with null source.");
                }
                if (AmqpSupport.findFilter(receiver.getRemoteSource().getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS) == null) {
                    markAsInvalid("Broker did not return the JMS Filter on Attach");
                }
            }
        });
        AmqpConnection addConnection = addConnection(createAmqpClient.connect());
        addConnection.createSession().createReceiver(getQueueName(), "JMSPriority > 8");
        addConnection.getStateInspector().assertValid();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testInvalidFilter() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        try {
            addConnection.createSession().createReceiver(getQueueName(), "null = 'f''", true);
            fail("should throw exception");
        } catch (Exception e) {
            assertTrue(e.getCause() instanceof JMSException);
        }
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testSenderSettlementModeSettledIsHonored() throws Exception {
        doTestSenderSettlementModeIsHonored(SenderSettleMode.SETTLED);
    }

    @Test(timeout = 60000)
    public void testSenderSettlementModeUnsettledIsHonored() throws Exception {
        doTestSenderSettlementModeIsHonored(SenderSettleMode.UNSETTLED);
    }

    @Test(timeout = 60000)
    public void testSenderSettlementModeMixedIsHonored() throws Exception {
        doTestSenderSettlementModeIsHonored(SenderSettleMode.MIXED);
    }

    public void doTestSenderSettlementModeIsHonored(SenderSettleMode senderSettleMode) throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpReceiver createReceiver = addConnection.createSession().createReceiver("queue://" + getTestName(), senderSettleMode, ReceiverSettleMode.FIRST);
        Queue proxyToQueue = getProxyToQueue(getQueueName());
        assertNotNull(proxyToQueue);
        assertEquals(0L, proxyToQueue.getMessageCount());
        assertEquals(1L, this.server.getTotalConsumerCount());
        assertEquals(senderSettleMode, createReceiver.getEndpoint().getRemoteSenderSettleMode());
        createReceiver.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testReceiverSettlementModeSetToFirst() throws Exception {
        doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.FIRST);
    }

    @Test(timeout = 60000)
    public void testReceiverSettlementModeSetToSecond() throws Exception {
        doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.SECOND);
    }

    private void doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode receiverSettleMode) throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpReceiver createReceiver = addConnection.createSession().createReceiver("queue://" + getTestName(), SenderSettleMode.MIXED, receiverSettleMode);
        Queue proxyToQueue = getProxyToQueue(getQueueName());
        assertNotNull(proxyToQueue);
        assertEquals(0L, proxyToQueue.getMessageCount());
        assertEquals(1L, this.server.getTotalConsumerCount());
        assertEquals(ReceiverSettleMode.FIRST, createReceiver.getEndpoint().getRemoteReceiverSettleMode());
        createReceiver.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testClientIdIsSetInSubscriptionList() throws Exception {
        this.server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("mytopic"), RoutingType.ANYCAST));
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        addConnection.setContainerId("testClient");
        addConnection.connect();
        try {
            try {
                AmqpSession createSession = addConnection.createSession();
                Source source = new Source();
                source.setDurable(TerminusDurability.UNSETTLED_STATE);
                source.setCapabilities(new Symbol[]{Symbol.getSymbol("topic")});
                source.setAddress("mytopic");
                createSession.createReceiver(source, "testSub");
                assertNotNull(this.server.locateQueue(new SimpleString("testClient.testSub:mytopic")));
                addConnection.close();
            } catch (Exception e) {
                e.printStackTrace();
                addConnection.close();
            }
        } catch (Throwable th) {
            addConnection.close();
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testLinkDetachSentWhenQueueDeleted() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        try {
            AmqpReceiver createReceiver = addConnection.createSession().createReceiver(getQueueName());
            this.server.destroyQueue(new SimpleString(getQueueName()), (SecurityAuth) null, false, true);
            createReceiver.getClass();
            Wait.assertTrue("Receiver should have closed", createReceiver::isClosed);
            addConnection.close();
        } catch (Throwable th) {
            addConnection.close();
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testLinkDetatchErrorIsCorrectWhenQueueDoesNotExists() throws Exception {
        AddressSettings addressSettings = new AddressSettings();
        addressSettings.setAutoCreateQueues(false);
        addressSettings.setAutoCreateAddresses(false);
        this.server.getAddressSettingsRepository().addMatch("AnAddressThatDoesNotExist", addressSettings);
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        try {
            Exception exc = null;
            try {
                addConnection.createSession().createSender("AnAddressThatDoesNotExist");
                fail("Creating a sender here on an address that doesn't exist should fail");
            } catch (Exception e) {
                exc = e;
            }
            assertNotNull(exc);
            assertTrue(exc.getMessage().contains("amqp:not-found"));
            assertTrue(exc.getMessage().contains("target address does not exist"));
            addConnection.close();
        } catch (Throwable th) {
            addConnection.close();
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testUnsupportedFiltersAreNotListedAsSupported() throws Exception {
        AmqpClient createAmqpClient = createAmqpClient();
        createAmqpClient.setValidator(new AmqpValidator() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpReceiverTest.3
            public void inspectOpenedResource(Receiver receiver) {
                if (receiver.getRemoteSource() == null) {
                    markAsInvalid("Link opened with null source.");
                }
                if (AmqpSupport.findFilter(receiver.getRemoteSource().getFilter(), AmqpUnknownFilterType.UNKNOWN_FILTER_IDS) != null) {
                    markAsInvalid("Broker should not return unsupported filter on attach.");
                }
            }
        });
        HashMap hashMap = new HashMap();
        hashMap.put(AmqpUnknownFilterType.UNKNOWN_FILTER_NAME, AmqpUnknownFilterType.UNKNOWN_FILTER);
        Source source = new Source();
        source.setAddress(getQueueName());
        source.setFilter(hashMap);
        source.setDurable(TerminusDurability.NONE);
        source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
        AmqpConnection addConnection = addConnection(createAmqpClient.connect());
        AmqpSession createSession = addConnection.createSession();
        assertEquals(0L, this.server.getTotalConsumerCount());
        createSession.createReceiver(source);
        assertEquals(1L, this.server.getTotalConsumerCount());
        addConnection.getStateInspector().assertValid();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testReceiverCloseSendsRemoteClose() throws Exception {
        AmqpClient createAmqpClient = createAmqpClient();
        assertNotNull(createAmqpClient);
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        createAmqpClient.setValidator(new AmqpValidator() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpReceiverTest.4
            public void inspectClosedResource(Session session) {
            }

            public void inspectDetachedResource(Receiver receiver) {
                markAsInvalid("Broker should not detach receiver linked to closed session.");
            }

            public void inspectClosedResource(Receiver receiver) {
                atomicBoolean.set(true);
            }
        });
        AmqpConnection addConnection = addConnection(createAmqpClient.connect());
        assertNotNull(addConnection);
        AmqpSession createSession = addConnection.createSession();
        assertNotNull(createSession);
        AmqpReceiver createReceiver = createSession.createReceiver(getQueueName());
        assertNotNull(createReceiver);
        createReceiver.close();
        assertTrue("Did not process remote close as expected", atomicBoolean.get());
        addConnection.getStateInspector().assertValid();
        addConnection.close();
    }
}
