package org.wso2.mb.integration.tests.amqp.functional;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import javax.naming.NamingException;
import javax.xml.xpath.XPathExpressionException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.wso2.carbon.automation.engine.context.TestUserMode;
import org.wso2.mb.integration.common.clients.AndesClient;
import org.wso2.mb.integration.common.clients.configurations.AndesJMSConsumerClientConfiguration;
import org.wso2.mb.integration.common.clients.configurations.AndesJMSPublisherClientConfiguration;
import org.wso2.mb.integration.common.clients.exceptions.AndesClientConfigurationException;
import org.wso2.mb.integration.common.clients.exceptions.AndesClientException;
import org.wso2.mb.integration.common.clients.operations.utils.AndesClientUtils;
import org.wso2.mb.integration.common.clients.operations.utils.ExchangeType;
import org.wso2.mb.integration.common.utils.backend.MBIntegrationBaseTest;

/* loaded from: input_file:org/wso2/mb/integration/tests/amqp/functional/QueueMessageExpireTestCase.class */
public class QueueMessageExpireTestCase extends MBIntegrationBaseTest {
    private static Log log = LogFactory.getLog(QueueMessageExpireTestCase.class);

    @BeforeClass(alwaysRun = true)
    public void init() throws XPathExpressionException {
        super.init(TestUserMode.SUPER_TENANT_USER);
    }

    @Test(groups = {"wso2.mb"}, description = "Single queue send-receive test case with 50% expired messages")
    public void performSingleQueueExpirySendReceiveTestCase() throws AndesClientConfigurationException, NamingException, JMSException, IOException, CloneNotSupportedException, AndesClientException, XPathExpressionException {
        long j = 1000 / 2;
        long j2 = 1000 - j;
        AndesJMSConsumerClientConfiguration andesJMSConsumerClientConfiguration = new AndesJMSConsumerClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "queueWithExpiration");
        andesJMSConsumerClientConfiguration.setMaximumMessagesToReceived(j);
        andesJMSConsumerClientConfiguration.setPrintsPerMessageCount(j / 10);
        andesJMSConsumerClientConfiguration.setAsync(false);
        AndesJMSPublisherClientConfiguration andesJMSPublisherClientConfiguration = new AndesJMSPublisherClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "queueWithExpiration");
        andesJMSPublisherClientConfiguration.setNumberOfMessagesToSend(j);
        andesJMSPublisherClientConfiguration.setPrintsPerMessageCount(j / 10);
        AndesJMSPublisherClientConfiguration andesJMSPublisherClientConfiguration2 = new AndesJMSPublisherClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "queueWithExpiration");
        andesJMSPublisherClientConfiguration2.setNumberOfMessagesToSend(j2);
        andesJMSPublisherClientConfiguration2.setPrintsPerMessageCount(j2 / 10);
        andesJMSPublisherClientConfiguration2.setJMSMessageExpiryTime(1L);
        AndesClient andesClient = new AndesClient(andesJMSConsumerClientConfiguration, true);
        andesClient.startClient();
        AndesClient andesClient2 = new AndesClient(andesJMSPublisherClientConfiguration, true);
        andesClient2.startClient();
        AndesClient andesClient3 = new AndesClient(andesJMSPublisherClientConfiguration2, true);
        andesClient3.startClient();
        AndesClientUtils.waitForMessagesAndShutdown(andesClient, 10000L);
        Assert.assertEquals(andesClient2.getSentMessageCount(), j, "Message send failed for publisher without expiration.");
        Assert.assertEquals(andesClient3.getSentMessageCount(), j2, "Message send failed for publisher with expiration.");
        Assert.assertEquals(andesClient.getReceivedMessageCount(), j, "Message receiving failed.");
    }

    @Test(groups = {"wso2.mb"}, description = "send messages to a queue which has two consumers with jms expiration")
    public void performManyQueueExpirySendReceiveTestCase() throws AndesClientConfigurationException, JMSException, NamingException, IOException, AndesClientException, XPathExpressionException, InterruptedException {
        long j = 1000 / 2;
        long j2 = 1000 - j;
        long j3 = j / 2;
        AndesJMSConsumerClientConfiguration andesJMSConsumerClientConfiguration = new AndesJMSConsumerClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "queueWithExpiryAndManyConsumers");
        andesJMSConsumerClientConfiguration.setMaximumMessagesToReceived(j3);
        andesJMSConsumerClientConfiguration.setPrintsPerMessageCount(j3 / 10);
        andesJMSConsumerClientConfiguration.setAsync(false);
        AndesJMSConsumerClientConfiguration andesJMSConsumerClientConfiguration2 = new AndesJMSConsumerClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "queueWithExpiryAndManyConsumers");
        andesJMSConsumerClientConfiguration2.setMaximumMessagesToReceived(j3);
        andesJMSConsumerClientConfiguration2.setPrintsPerMessageCount(j3 / 10);
        andesJMSConsumerClientConfiguration2.setAsync(false);
        AndesJMSPublisherClientConfiguration andesJMSPublisherClientConfiguration = new AndesJMSPublisherClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "queueWithExpiryAndManyConsumers");
        andesJMSPublisherClientConfiguration.setNumberOfMessagesToSend(j);
        andesJMSPublisherClientConfiguration.setPrintsPerMessageCount(j / 10);
        AndesJMSPublisherClientConfiguration andesJMSPublisherClientConfiguration2 = new AndesJMSPublisherClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "queueWithExpiryAndManyConsumers");
        andesJMSPublisherClientConfiguration2.setPrintsPerMessageCount(j2 / 10);
        andesJMSPublisherClientConfiguration2.setNumberOfMessagesToSend(j2);
        andesJMSPublisherClientConfiguration2.setJMSMessageExpiryTime(1L);
        AndesClient andesClient = new AndesClient(andesJMSConsumerClientConfiguration, true);
        andesClient.startClient();
        AndesClient andesClient2 = new AndesClient(andesJMSConsumerClientConfiguration2, true);
        andesClient2.startClient();
        AndesClient andesClient3 = new AndesClient(andesJMSPublisherClientConfiguration, true);
        andesClient3.startClient();
        AndesClient andesClient4 = new AndesClient(andesJMSPublisherClientConfiguration2, true);
        andesClient4.startClient();
        long receivedMessageCount = andesClient.getReceivedMessageCount();
        long receivedMessageCount2 = andesClient2.getReceivedMessageCount();
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            if (receivedMessageCount + receivedMessageCount2 >= j) {
                break;
            }
            TimeUnit.SECONDS.sleep(10L);
            receivedMessageCount = andesClient.getReceivedMessageCount();
            receivedMessageCount2 = andesClient2.getReceivedMessageCount();
            if (System.currentTimeMillis() - currentTimeMillis > 600000) {
                log.error("Expected number of messages didn't receive after 600000 milliseconds. Therefore, no longer waiting for new messages.");
                break;
            }
        }
        AndesClientUtils.shutdownClient(andesClient);
        AndesClientUtils.shutdownClient(andesClient2);
        Assert.assertEquals(andesClient3.getSentMessageCount(), j, "Message send failed for publisher without expiration.");
        Assert.assertEquals(andesClient4.getSentMessageCount(), j2, "Message send failed for publisher with expiration");
        Assert.assertEquals(receivedMessageCount + receivedMessageCount2, j, "Message receiving failed. Expected " + j + " but received " + (receivedMessageCount + receivedMessageCount2));
    }
}
