package org.wso2.mb.integration.tests.amqp.functional;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Iterator;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import javax.naming.NamingException;
import javax.xml.xpath.XPathExpressionException;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.wso2.andes.configuration.enums.AndesConfiguration;
import org.wso2.carbon.andes.stub.AndesAdminServiceBrokerManagerAdminException;
import org.wso2.carbon.authenticator.stub.LogoutAuthenticationExceptionException;
import org.wso2.carbon.automation.engine.context.TestUserMode;
import org.wso2.carbon.integration.common.utils.LoginLogoutClient;
import org.wso2.carbon.integration.common.utils.exceptions.AutomationUtilException;
import org.wso2.carbon.integration.common.utils.mgt.ServerConfigurationManager;
import org.wso2.mb.integration.common.clients.AndesClient;
import org.wso2.mb.integration.common.clients.AndesJMSConsumer;
import org.wso2.mb.integration.common.clients.AndesJMSPublisher;
import org.wso2.mb.integration.common.clients.configurations.AndesJMSConsumerClientConfiguration;
import org.wso2.mb.integration.common.clients.configurations.AndesJMSPublisherClientConfiguration;
import org.wso2.mb.integration.common.clients.exceptions.AndesClientConfigurationException;
import org.wso2.mb.integration.common.clients.exceptions.AndesClientException;
import org.wso2.mb.integration.common.clients.operations.clients.AndesAdminClient;
import org.wso2.mb.integration.common.clients.operations.utils.AndesClientUtils;
import org.wso2.mb.integration.common.clients.operations.utils.ExchangeType;
import org.wso2.mb.integration.common.clients.operations.utils.JMSAcknowledgeMode;
import org.wso2.mb.integration.common.utils.backend.ConfigurationEditor;
import org.wso2.mb.integration.common.utils.backend.MBIntegrationBaseTest;

/* loaded from: input_file:org/wso2/mb/integration/tests/amqp/functional/RedeliveryDelayTestCase.class */
public class RedeliveryDelayTestCase extends MBIntegrationBaseTest {
    private Log log = LogFactory.getLog(RedeliveryDelayTestCase.class);
    private String defaultAndesAckWaitTimeOut = null;
    private String defaultAndesRedeliveryDelay = null;

    @BeforeClass(alwaysRun = true)
    public void init() throws XPathExpressionException, IOException, AutomationUtilException, ConfigurationException {
        super.init(TestUserMode.SUPER_TENANT_USER);
        ((MBIntegrationBaseTest) this).serverManager = new ServerConfigurationManager(this.automationContext);
        ConfigurationEditor configurationEditor = new ConfigurationEditor(ServerConfigurationManager.getCarbonHome() + File.separator + "wso2" + File.separator + "broker" + File.separator + "conf" + File.separator + "broker.xml");
        configurationEditor.updateProperty(AndesConfiguration.TRANSPORTS_AMQP_MAXIMUM_REDELIVERY_ATTEMPTS, "1");
        configurationEditor.applyUpdatedConfigurationAndRestartServer(this.serverManager);
        this.defaultAndesAckWaitTimeOut = System.getProperty("AndesAckWaitTimeOut");
        System.setProperty("AndesAckWaitTimeOut", "0");
        this.defaultAndesRedeliveryDelay = System.getProperty("AndesRedeliveryDelay");
        System.setProperty("AndesRedeliveryDelay", "10000");
    }

    @Test(groups = {"wso2.mb", "queue"})
    public void firstMessageInvalidOnlyQueueMessageListenerTestCase() throws AndesClientConfigurationException, XPathExpressionException, IOException, JMSException, AndesClientException, NamingException {
        final ArrayList arrayList = new ArrayList();
        AndesJMSConsumerClientConfiguration andesJMSConsumerClientConfiguration = new AndesJMSConsumerClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "firstMessageInvalidOnlyQueue");
        andesJMSConsumerClientConfiguration.setAcknowledgeMode(JMSAcknowledgeMode.PER_MESSAGE_ACKNOWLEDGE);
        andesJMSConsumerClientConfiguration.setAsync(false);
        AndesJMSPublisherClientConfiguration andesJMSPublisherClientConfiguration = new AndesJMSPublisherClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "firstMessageInvalidOnlyQueue");
        andesJMSPublisherClientConfiguration.setNumberOfMessagesToSend(10L);
        andesJMSPublisherClientConfiguration.setPrintsPerMessageCount(10 / 10);
        AndesClient andesClient = new AndesClient(andesJMSConsumerClientConfiguration, true);
        final AndesJMSConsumer andesJMSConsumer = (AndesJMSConsumer) andesClient.getConsumers().get(0);
        andesJMSConsumer.getReceiver().setMessageListener(new MessageListener() { // from class: org.wso2.mb.integration.tests.amqp.functional.RedeliveryDelayTestCase.1
            private boolean receivedFirstMessage = false;

            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    if (this.receivedFirstMessage || !"#0".equals(textMessage.getText())) {
                        message.acknowledge();
                    } else {
                        this.receivedFirstMessage = true;
                    }
                    arrayList.add(ImmutablePair.of(textMessage.getText(), Calendar.getInstance()));
                    andesJMSConsumer.getReceivedMessageCount().incrementAndGet();
                } catch (JMSException e) {
                    throw new RuntimeException("Exception occurred when receiving messages.", e);
                }
            }
        });
        AndesJMSPublisher andesJMSPublisher = (AndesJMSPublisher) new AndesClient(andesJMSPublisherClientConfiguration, true).getPublishers().get(0);
        MessageProducer sender = andesJMSPublisher.getSender();
        for (int i = 0; i < 10; i++) {
            sender.send(andesJMSPublisher.getSession().createTextMessage("#" + Integer.toString(i)));
        }
        AndesClientUtils.waitForMessagesAndShutdown(andesClient, 10000L);
        this.log.info("Received Messages : " + getMessageList(arrayList));
        for (int i2 = 0; i2 < 10; i2++) {
            Assert.assertEquals((String) arrayList.get(i2).getLeft(), "#" + Integer.toString(i2), "Invalid messages received. #" + Integer.toString(i2) + " expected.");
        }
        validateMessageContentAndDelay(arrayList, 0, 10, "#0");
        Assert.assertEquals(arrayList.size(), 10 + 1, "Message receiving failed.");
    }

    @Test(groups = {"wso2.mb", "queue"})
    public void firstMessageInvalidOnlyQueueMessageReceiverTestCase() throws AndesClientConfigurationException, XPathExpressionException, IOException, JMSException, AndesClientException, NamingException {
        final ArrayList arrayList = new ArrayList();
        AndesJMSConsumerClientConfiguration andesJMSConsumerClientConfiguration = new AndesJMSConsumerClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "firstMessageInvalidOnlyReceiverQueue");
        andesJMSConsumerClientConfiguration.setAcknowledgeMode(JMSAcknowledgeMode.PER_MESSAGE_ACKNOWLEDGE);
        andesJMSConsumerClientConfiguration.setAsync(false);
        AndesJMSPublisherClientConfiguration andesJMSPublisherClientConfiguration = new AndesJMSPublisherClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "firstMessageInvalidOnlyReceiverQueue");
        andesJMSPublisherClientConfiguration.setNumberOfMessagesToSend(10L);
        andesJMSPublisherClientConfiguration.setPrintsPerMessageCount(10 / 10);
        AndesClient andesClient = new AndesClient(andesJMSConsumerClientConfiguration, true);
        final AndesJMSConsumer andesJMSConsumer = (AndesJMSConsumer) andesClient.getConsumers().get(0);
        final MessageConsumer receiver = andesJMSConsumer.getReceiver();
        new Thread() { // from class: org.wso2.mb.integration.tests.amqp.functional.RedeliveryDelayTestCase.2
            private boolean receivedFirstMessage = false;

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (receiver != null) {
                    try {
                        TextMessage receive = receiver.receive();
                        if (this.receivedFirstMessage || !"#0".equals(receive.getText())) {
                            receive.acknowledge();
                        } else {
                            this.receivedFirstMessage = true;
                        }
                        arrayList.add(ImmutablePair.of(receive.getText(), Calendar.getInstance()));
                        andesJMSConsumer.getReceivedMessageCount().incrementAndGet();
                    } catch (JMSException e) {
                        throw new RuntimeException("Exception occurred when receiving messages.", e);
                    }
                }
            }
        }.start();
        AndesJMSPublisher andesJMSPublisher = (AndesJMSPublisher) new AndesClient(andesJMSPublisherClientConfiguration, true).getPublishers().get(0);
        MessageProducer sender = andesJMSPublisher.getSender();
        for (int i = 0; i < 10; i++) {
            sender.send(andesJMSPublisher.getSession().createTextMessage("#" + Integer.toString(i)));
        }
        AndesClientUtils.waitForMessagesAndShutdown(andesClient, 10000L);
        this.log.info("Received Messages : " + getMessageList(arrayList));
        for (int i2 = 0; i2 < 10; i2++) {
            Assert.assertEquals((String) arrayList.get(i2).getLeft(), "#" + Integer.toString(i2), "Invalid messages received. #" + Integer.toString(i2) + " expected.");
        }
        validateMessageContentAndDelay(arrayList, 0, 10, "#0");
        Assert.assertEquals(arrayList.size(), 10 + 1, "Message receiving failed.");
    }

    @Test(groups = {"wso2.mb", "queue"})
    public void allUnacknowledgeMessageListenerTestCase() throws AndesClientConfigurationException, XPathExpressionException, IOException, JMSException, AndesClientException, NamingException {
        final ArrayList arrayList = new ArrayList();
        AndesJMSConsumerClientConfiguration andesJMSConsumerClientConfiguration = new AndesJMSConsumerClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "multipleUnacknowledgeQueue");
        andesJMSConsumerClientConfiguration.setAcknowledgeMode(JMSAcknowledgeMode.PER_MESSAGE_ACKNOWLEDGE);
        andesJMSConsumerClientConfiguration.setAsync(false);
        AndesJMSPublisherClientConfiguration andesJMSPublisherClientConfiguration = new AndesJMSPublisherClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "multipleUnacknowledgeQueue");
        andesJMSPublisherClientConfiguration.setNumberOfMessagesToSend(10);
        AndesClient andesClient = new AndesClient(andesJMSConsumerClientConfiguration, true);
        final AndesJMSConsumer andesJMSConsumer = (AndesJMSConsumer) andesClient.getConsumers().get(0);
        andesJMSConsumer.getReceiver().setMessageListener(new MessageListener() { // from class: org.wso2.mb.integration.tests.amqp.functional.RedeliveryDelayTestCase.3
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    if (RedeliveryDelayTestCase.this.getMessageList(arrayList).contains(textMessage.getText())) {
                        message.acknowledge();
                    }
                    arrayList.add(ImmutablePair.of(textMessage.getText(), Calendar.getInstance()));
                    andesJMSConsumer.getReceivedMessageCount().incrementAndGet();
                } catch (JMSException e) {
                    throw new RuntimeException("Exception occurred when receiving messages.", e);
                }
            }
        });
        AndesJMSPublisher andesJMSPublisher = (AndesJMSPublisher) new AndesClient(andesJMSPublisherClientConfiguration, true).getPublishers().get(0);
        MessageProducer sender = andesJMSPublisher.getSender();
        for (int i = 0; i < 10; i++) {
            sender.send(andesJMSPublisher.getSession().createTextMessage("#" + Integer.toString(i)));
        }
        AndesClientUtils.waitForMessagesAndShutdown(andesClient, 10000L);
        this.log.info("Received Messages : " + getMessageList(arrayList));
        for (int i2 = 0; i2 < 10 * 2; i2++) {
            if (i2 < 10) {
                Assert.assertEquals((String) arrayList.get(i2).getLeft(), "#" + Integer.toString(i2), "Invalid messages received. #" + Integer.toString(i2) + " expected.");
            } else {
                validateMessageContentAndDelay(arrayList, i2 - 10, i2, "#" + Integer.toString(i2 - 10));
            }
        }
        Assert.assertEquals(arrayList.size(), 10 * 2, "Message receiving failed.");
    }

    @Test(groups = {"wso2.mb", "queue"})
    public void allUnacknowledgeMessageReceiverTestCase() throws AndesClientConfigurationException, XPathExpressionException, IOException, JMSException, AndesClientException, NamingException {
        final ArrayList arrayList = new ArrayList();
        AndesJMSConsumerClientConfiguration andesJMSConsumerClientConfiguration = new AndesJMSConsumerClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "multipleUnacknowledgeReceiverQueue");
        andesJMSConsumerClientConfiguration.setAcknowledgeMode(JMSAcknowledgeMode.PER_MESSAGE_ACKNOWLEDGE);
        andesJMSConsumerClientConfiguration.setAsync(false);
        AndesJMSPublisherClientConfiguration andesJMSPublisherClientConfiguration = new AndesJMSPublisherClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "multipleUnacknowledgeReceiverQueue");
        andesJMSPublisherClientConfiguration.setNumberOfMessagesToSend(10);
        AndesClient andesClient = new AndesClient(andesJMSConsumerClientConfiguration, true);
        final AndesJMSConsumer andesJMSConsumer = (AndesJMSConsumer) andesClient.getConsumers().get(0);
        final MessageConsumer receiver = andesJMSConsumer.getReceiver();
        new Thread() { // from class: org.wso2.mb.integration.tests.amqp.functional.RedeliveryDelayTestCase.4
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (receiver != null) {
                    try {
                        TextMessage receive = receiver.receive();
                        if (RedeliveryDelayTestCase.this.getMessageList(arrayList).contains(receive.getText())) {
                            receive.acknowledge();
                        }
                        arrayList.add(ImmutablePair.of(receive.getText(), Calendar.getInstance()));
                        andesJMSConsumer.getReceivedMessageCount().incrementAndGet();
                    } catch (JMSException e) {
                        throw new RuntimeException("Exception occurred when receiving messages.", e);
                    }
                }
            }
        }.start();
        AndesJMSPublisher andesJMSPublisher = (AndesJMSPublisher) new AndesClient(andesJMSPublisherClientConfiguration, true).getPublishers().get(0);
        MessageProducer sender = andesJMSPublisher.getSender();
        for (int i = 0; i < 10; i++) {
            sender.send(andesJMSPublisher.getSession().createTextMessage("#" + Integer.toString(i)));
        }
        AndesClientUtils.waitForMessagesAndShutdown(andesClient, 10000L);
        this.log.info("Received Messages : " + getMessageList(arrayList));
        for (int i2 = 0; i2 < 10 * 2; i2++) {
            if (i2 < 10) {
                Assert.assertEquals((String) arrayList.get(i2).getLeft(), "#" + Integer.toString(i2), "Invalid messages received. #" + Integer.toString(i2) + " expected.");
            } else {
                validateMessageContentAndDelay(arrayList, i2 - 10, i2, "#" + Integer.toString(i2 - 10));
            }
        }
        Assert.assertEquals(arrayList.size(), 10 * 2, "Message receiving failed.");
    }

    @Test(groups = {"wso2.mb", "queue"})
    public void oneByOneUnacknowledgeMessageListenerTestCase() throws AndesClientConfigurationException, XPathExpressionException, IOException, JMSException, AndesClientException, NamingException {
        final ArrayList arrayList = new ArrayList();
        AndesJMSConsumerClientConfiguration andesJMSConsumerClientConfiguration = new AndesJMSConsumerClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "oneByOneUnacknowledgeQueue");
        andesJMSConsumerClientConfiguration.setAcknowledgeMode(JMSAcknowledgeMode.PER_MESSAGE_ACKNOWLEDGE);
        andesJMSConsumerClientConfiguration.setAsync(false);
        AndesJMSPublisherClientConfiguration andesJMSPublisherClientConfiguration = new AndesJMSPublisherClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "oneByOneUnacknowledgeQueue");
        andesJMSPublisherClientConfiguration.setNumberOfMessagesToSend(10L);
        AndesClient andesClient = new AndesClient(andesJMSConsumerClientConfiguration, true);
        final AndesJMSConsumer andesJMSConsumer = (AndesJMSConsumer) andesClient.getConsumers().get(0);
        andesJMSConsumer.getReceiver().setMessageListener(new MessageListener() { // from class: org.wso2.mb.integration.tests.amqp.functional.RedeliveryDelayTestCase.5
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    if (Integer.parseInt(textMessage.getText().split("#")[1]) % 3 != 0 || RedeliveryDelayTestCase.this.getMessageList(arrayList).contains(textMessage.getText())) {
                        message.acknowledge();
                    }
                    arrayList.add(ImmutablePair.of(textMessage.getText(), Calendar.getInstance()));
                    andesJMSConsumer.getReceivedMessageCount().incrementAndGet();
                } catch (JMSException e) {
                    throw new RuntimeException("Exception occurred when receiving messages.", e);
                }
            }
        });
        AndesJMSPublisher andesJMSPublisher = (AndesJMSPublisher) new AndesClient(andesJMSPublisherClientConfiguration, true).getPublishers().get(0);
        MessageProducer sender = andesJMSPublisher.getSender();
        for (int i = 0; i < 10; i++) {
            sender.send(andesJMSPublisher.getSession().createTextMessage("#" + Integer.toString(i)));
        }
        AndesClientUtils.waitForMessagesAndShutdown(andesClient, 10000L);
        this.log.info("Received Messages : " + getMessageList(arrayList));
        for (int i2 = 0; i2 < 10; i2++) {
            Assert.assertEquals((String) arrayList.get(i2).getLeft(), "#" + Integer.toString(i2), "Invalid messages received. #" + Integer.toString(i2) + " expected.");
        }
        validateMessageContentAndDelay(arrayList, 0, 10, "#0");
        validateMessageContentAndDelay(arrayList, 1, 11, "#3");
        validateMessageContentAndDelay(arrayList, 2, 12, "#6");
        validateMessageContentAndDelay(arrayList, 3, 13, "#9");
        Assert.assertEquals(arrayList.size(), 10 + 4, "Message receiving failed.");
    }

    @Test(groups = {"wso2.mb", "queue"})
    public void oneByOneUnacknowledgeMessageReceiverTestCase() throws AndesClientConfigurationException, XPathExpressionException, IOException, JMSException, AndesClientException, NamingException {
        final ArrayList arrayList = new ArrayList();
        AndesJMSConsumerClientConfiguration andesJMSConsumerClientConfiguration = new AndesJMSConsumerClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "oneByOneUnacknowledgeReceiverQueue");
        andesJMSConsumerClientConfiguration.setAcknowledgeMode(JMSAcknowledgeMode.PER_MESSAGE_ACKNOWLEDGE);
        andesJMSConsumerClientConfiguration.setAsync(false);
        AndesJMSPublisherClientConfiguration andesJMSPublisherClientConfiguration = new AndesJMSPublisherClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "oneByOneUnacknowledgeReceiverQueue");
        andesJMSPublisherClientConfiguration.setNumberOfMessagesToSend(10L);
        AndesClient andesClient = new AndesClient(andesJMSConsumerClientConfiguration, true);
        final AndesJMSConsumer andesJMSConsumer = (AndesJMSConsumer) andesClient.getConsumers().get(0);
        final MessageConsumer receiver = andesJMSConsumer.getReceiver();
        new Thread() { // from class: org.wso2.mb.integration.tests.amqp.functional.RedeliveryDelayTestCase.6
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (receiver != null) {
                    try {
                        TextMessage receive = receiver.receive();
                        if (Integer.parseInt(receive.getText().split("#")[1]) % 3 != 0 || RedeliveryDelayTestCase.this.getMessageList(arrayList).contains(receive.getText())) {
                            receive.acknowledge();
                        }
                        arrayList.add(ImmutablePair.of(receive.getText(), Calendar.getInstance()));
                        andesJMSConsumer.getReceivedMessageCount().incrementAndGet();
                    } catch (JMSException e) {
                        throw new RuntimeException("Exception occurred when receiving messages.", e);
                    }
                }
            }
        }.start();
        AndesJMSPublisher andesJMSPublisher = (AndesJMSPublisher) new AndesClient(andesJMSPublisherClientConfiguration, true).getPublishers().get(0);
        MessageProducer sender = andesJMSPublisher.getSender();
        for (int i = 0; i < 10; i++) {
            sender.send(andesJMSPublisher.getSession().createTextMessage("#" + Integer.toString(i)));
        }
        AndesClientUtils.waitForMessagesAndShutdown(andesClient, 10000L);
        this.log.info("Received Messages : " + getMessageList(arrayList));
        for (int i2 = 0; i2 < 10; i2++) {
            Assert.assertEquals((String) arrayList.get(i2).getLeft(), "#" + Integer.toString(i2), "Invalid messages received. #" + Integer.toString(i2) + " expected.");
        }
        validateMessageContentAndDelay(arrayList, 0, 10, "#0");
        validateMessageContentAndDelay(arrayList, 1, 11, "#3");
        validateMessageContentAndDelay(arrayList, 2, 12, "#6");
        validateMessageContentAndDelay(arrayList, 3, 13, "#9");
        Assert.assertEquals(arrayList.size(), 10 + 4, "Message receiving failed.");
    }

    @Test(groups = {"wso2.mb", "queue"})
    public void firstFewUnacknowledgeMessageListenerTestCase() throws AndesClientConfigurationException, XPathExpressionException, IOException, JMSException, AndesClientException, NamingException {
        final ArrayList arrayList = new ArrayList();
        AndesJMSConsumerClientConfiguration andesJMSConsumerClientConfiguration = new AndesJMSConsumerClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "firstFewUnacknowledgeQueue");
        andesJMSConsumerClientConfiguration.setAcknowledgeMode(JMSAcknowledgeMode.PER_MESSAGE_ACKNOWLEDGE);
        andesJMSConsumerClientConfiguration.setAsync(false);
        AndesJMSPublisherClientConfiguration andesJMSPublisherClientConfiguration = new AndesJMSPublisherClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "firstFewUnacknowledgeQueue");
        andesJMSPublisherClientConfiguration.setNumberOfMessagesToSend(10L);
        AndesClient andesClient = new AndesClient(andesJMSConsumerClientConfiguration, true);
        final AndesJMSConsumer andesJMSConsumer = (AndesJMSConsumer) andesClient.getConsumers().get(0);
        andesJMSConsumer.getReceiver().setMessageListener(new MessageListener() { // from class: org.wso2.mb.integration.tests.amqp.functional.RedeliveryDelayTestCase.7
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    if (Integer.parseInt(textMessage.getText().split("#")[1]) >= 4 || RedeliveryDelayTestCase.this.getMessageList(arrayList).contains(textMessage.getText())) {
                        message.acknowledge();
                    }
                    arrayList.add(ImmutablePair.of(textMessage.getText(), Calendar.getInstance()));
                    andesJMSConsumer.getReceivedMessageCount().incrementAndGet();
                } catch (JMSException e) {
                    throw new RuntimeException("Exception occurred when receiving messages.", e);
                }
            }
        });
        AndesJMSPublisher andesJMSPublisher = (AndesJMSPublisher) new AndesClient(andesJMSPublisherClientConfiguration, true).getPublishers().get(0);
        MessageProducer sender = andesJMSPublisher.getSender();
        for (int i = 0; i < 10; i++) {
            sender.send(andesJMSPublisher.getSession().createTextMessage("#" + Integer.toString(i)));
        }
        AndesClientUtils.waitForMessagesAndShutdown(andesClient, 10000L);
        this.log.info("Received Messages : " + getMessageList(arrayList));
        for (int i2 = 0; i2 < 10; i2++) {
            Assert.assertEquals((String) arrayList.get(i2).getLeft(), "#" + Integer.toString(i2), "Invalid messages received. #" + Integer.toString(i2) + " expected.");
        }
        validateMessageContentAndDelay(arrayList, 0, 10, "#0");
        validateMessageContentAndDelay(arrayList, 1, 11, "#1");
        validateMessageContentAndDelay(arrayList, 2, 12, "#2");
        validateMessageContentAndDelay(arrayList, 3, 13, "#3");
        Assert.assertEquals(arrayList.size(), 10 + 4, "Message receiving failed.");
    }

    @Test(groups = {"wso2.mb", "queue"})
    public void firstFewUnacknowledgeMessageReceiverTestCase() throws AndesClientConfigurationException, XPathExpressionException, IOException, JMSException, AndesClientException, NamingException {
        final ArrayList arrayList = new ArrayList();
        AndesJMSConsumerClientConfiguration andesJMSConsumerClientConfiguration = new AndesJMSConsumerClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "firstFewUnacknowledgeReceiverQueue");
        andesJMSConsumerClientConfiguration.setAcknowledgeMode(JMSAcknowledgeMode.PER_MESSAGE_ACKNOWLEDGE);
        andesJMSConsumerClientConfiguration.setAsync(false);
        AndesJMSPublisherClientConfiguration andesJMSPublisherClientConfiguration = new AndesJMSPublisherClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "firstFewUnacknowledgeReceiverQueue");
        andesJMSPublisherClientConfiguration.setNumberOfMessagesToSend(10L);
        AndesClient andesClient = new AndesClient(andesJMSConsumerClientConfiguration, true);
        final AndesJMSConsumer andesJMSConsumer = (AndesJMSConsumer) andesClient.getConsumers().get(0);
        final MessageConsumer receiver = andesJMSConsumer.getReceiver();
        new Thread() { // from class: org.wso2.mb.integration.tests.amqp.functional.RedeliveryDelayTestCase.8
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (receiver != null) {
                    try {
                        TextMessage receive = receiver.receive();
                        if (Integer.parseInt(receive.getText().split("#")[1]) >= 4 || RedeliveryDelayTestCase.this.getMessageList(arrayList).contains(receive.getText())) {
                            receive.acknowledge();
                        }
                        arrayList.add(ImmutablePair.of(receive.getText(), Calendar.getInstance()));
                        andesJMSConsumer.getReceivedMessageCount().incrementAndGet();
                    } catch (JMSException e) {
                        throw new RuntimeException("Exception occurred when receiving messages.", e);
                    }
                }
            }
        }.start();
        AndesJMSPublisher andesJMSPublisher = (AndesJMSPublisher) new AndesClient(andesJMSPublisherClientConfiguration, true).getPublishers().get(0);
        MessageProducer sender = andesJMSPublisher.getSender();
        for (int i = 0; i < 10; i++) {
            sender.send(andesJMSPublisher.getSession().createTextMessage("#" + Integer.toString(i)));
        }
        AndesClientUtils.waitForMessagesAndShutdown(andesClient, 10000L);
        this.log.info("Received Messages : " + getMessageList(arrayList));
        for (int i2 = 0; i2 < 10; i2++) {
            Assert.assertEquals((String) arrayList.get(i2).getLeft(), "#" + Integer.toString(i2), "Invalid messages received. #" + Integer.toString(i2) + " expected.");
        }
        validateMessageContentAndDelay(arrayList, 0, 10, "#0");
        validateMessageContentAndDelay(arrayList, 1, 11, "#1");
        validateMessageContentAndDelay(arrayList, 2, 12, "#2");
        validateMessageContentAndDelay(arrayList, 3, 13, "#3");
        Assert.assertEquals(arrayList.size(), 10 + 4, "Message receiving failed.");
    }

    @Test(groups = {"wso2.mb", "queue"})
    public void unacknowledgeMiddleMessageMessageListenerTestCase() throws AndesClientConfigurationException, XPathExpressionException, IOException, JMSException, AndesClientException, NamingException {
        final ArrayList arrayList = new ArrayList();
        AndesJMSConsumerClientConfiguration andesJMSConsumerClientConfiguration = new AndesJMSConsumerClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "unacknowledgeMiddleMessageQueue");
        andesJMSConsumerClientConfiguration.setAcknowledgeMode(JMSAcknowledgeMode.PER_MESSAGE_ACKNOWLEDGE);
        andesJMSConsumerClientConfiguration.setAsync(false);
        AndesJMSPublisherClientConfiguration andesJMSPublisherClientConfiguration = new AndesJMSPublisherClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "unacknowledgeMiddleMessageQueue");
        andesJMSPublisherClientConfiguration.setNumberOfMessagesToSend(10L);
        AndesClient andesClient = new AndesClient(andesJMSConsumerClientConfiguration, true);
        final AndesJMSConsumer andesJMSConsumer = (AndesJMSConsumer) andesClient.getConsumers().get(0);
        andesJMSConsumer.getReceiver().setMessageListener(new MessageListener() { // from class: org.wso2.mb.integration.tests.amqp.functional.RedeliveryDelayTestCase.9
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    if (!textMessage.getText().equals("#7") || RedeliveryDelayTestCase.this.getMessageList(arrayList).contains(textMessage.getText())) {
                        message.acknowledge();
                    }
                    arrayList.add(ImmutablePair.of(textMessage.getText(), Calendar.getInstance()));
                    andesJMSConsumer.getReceivedMessageCount().incrementAndGet();
                } catch (JMSException e) {
                    throw new RuntimeException("Exception occurred when receiving messages.", e);
                }
            }
        });
        AndesJMSPublisher andesJMSPublisher = (AndesJMSPublisher) new AndesClient(andesJMSPublisherClientConfiguration, true).getPublishers().get(0);
        MessageProducer sender = andesJMSPublisher.getSender();
        for (int i = 0; i < 10; i++) {
            sender.send(andesJMSPublisher.getSession().createTextMessage("#" + Integer.toString(i)));
        }
        AndesClientUtils.waitForMessagesAndShutdown(andesClient, 10000L);
        this.log.info("Received Messages : " + getMessageList(arrayList));
        for (int i2 = 0; i2 < 10; i2++) {
            Assert.assertEquals((String) arrayList.get(i2).getLeft(), "#" + Integer.toString(i2), "Invalid messages received. #" + Integer.toString(i2) + " expected.");
        }
        validateMessageContentAndDelay(arrayList, 6, 10, "#7");
        Assert.assertEquals(arrayList.size(), 10 + 1, "Message receiving failed.");
    }

    @Test(groups = {"wso2.mb", "queue"})
    public void unacknowledgeMiddleMessageMessageReceiverTestCase() throws AndesClientConfigurationException, XPathExpressionException, IOException, JMSException, AndesClientException, NamingException {
        final ArrayList arrayList = new ArrayList();
        AndesJMSConsumerClientConfiguration andesJMSConsumerClientConfiguration = new AndesJMSConsumerClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "unacknowledgeMiddleMessageReceiverQueue");
        andesJMSConsumerClientConfiguration.setAcknowledgeMode(JMSAcknowledgeMode.PER_MESSAGE_ACKNOWLEDGE);
        andesJMSConsumerClientConfiguration.setAsync(false);
        AndesJMSPublisherClientConfiguration andesJMSPublisherClientConfiguration = new AndesJMSPublisherClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "unacknowledgeMiddleMessageReceiverQueue");
        andesJMSPublisherClientConfiguration.setNumberOfMessagesToSend(10L);
        AndesClient andesClient = new AndesClient(andesJMSConsumerClientConfiguration, true);
        final AndesJMSConsumer andesJMSConsumer = (AndesJMSConsumer) andesClient.getConsumers().get(0);
        final MessageConsumer receiver = andesJMSConsumer.getReceiver();
        new Thread() { // from class: org.wso2.mb.integration.tests.amqp.functional.RedeliveryDelayTestCase.10
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (receiver != null) {
                    try {
                        TextMessage receive = receiver.receive();
                        if (!receive.getText().equals("#7") || RedeliveryDelayTestCase.this.getMessageList(arrayList).contains(receive.getText())) {
                            receive.acknowledge();
                        }
                        arrayList.add(ImmutablePair.of(receive.getText(), Calendar.getInstance()));
                        andesJMSConsumer.getReceivedMessageCount().incrementAndGet();
                    } catch (JMSException e) {
                        throw new RuntimeException("Exception occurred when receiving messages.", e);
                    }
                }
            }
        }.start();
        AndesJMSPublisher andesJMSPublisher = (AndesJMSPublisher) new AndesClient(andesJMSPublisherClientConfiguration, true).getPublishers().get(0);
        MessageProducer sender = andesJMSPublisher.getSender();
        for (int i = 0; i < 10; i++) {
            sender.send(andesJMSPublisher.getSession().createTextMessage("#" + Integer.toString(i)));
        }
        AndesClientUtils.waitForMessagesAndShutdown(andesClient, 10000L);
        this.log.info("Received Messages : " + getMessageList(arrayList));
        for (int i2 = 0; i2 < 10; i2++) {
            Assert.assertEquals((String) arrayList.get(i2).getLeft(), "#" + Integer.toString(i2), "Invalid messages received. #" + Integer.toString(i2) + " expected.");
        }
        validateMessageContentAndDelay(arrayList, 6, 10, "#7");
        Assert.assertEquals(arrayList.size(), 10 + 1, "Message receiving failed.");
    }

    @Test(groups = {"wso2.mb", "queue"})
    public void oneByOneUnacknowledgeMessageListenerForMultipleMessagesTestCase() throws AndesClientConfigurationException, XPathExpressionException, IOException, JMSException, AndesClientException, NamingException {
        final ArrayList arrayList = new ArrayList();
        AndesJMSConsumerClientConfiguration andesJMSConsumerClientConfiguration = new AndesJMSConsumerClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "oneByOneUnacknowledgeMessageListenerForMultiple");
        andesJMSConsumerClientConfiguration.setAcknowledgeMode(JMSAcknowledgeMode.PER_MESSAGE_ACKNOWLEDGE);
        andesJMSConsumerClientConfiguration.setAsync(false);
        AndesJMSPublisherClientConfiguration andesJMSPublisherClientConfiguration = new AndesJMSPublisherClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "oneByOneUnacknowledgeMessageListenerForMultiple");
        andesJMSPublisherClientConfiguration.setNumberOfMessagesToSend(1000L);
        AndesClient andesClient = new AndesClient(andesJMSConsumerClientConfiguration, true);
        final AndesJMSConsumer andesJMSConsumer = (AndesJMSConsumer) andesClient.getConsumers().get(0);
        andesJMSConsumer.getReceiver().setMessageListener(new MessageListener() { // from class: org.wso2.mb.integration.tests.amqp.functional.RedeliveryDelayTestCase.11
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    if (Integer.parseInt(textMessage.getText().split("#")[1]) % 100 != 0 || RedeliveryDelayTestCase.this.getMessageList(arrayList).contains(textMessage.getText())) {
                        message.acknowledge();
                    }
                    arrayList.add(ImmutablePair.of(textMessage.getText(), Calendar.getInstance()));
                    andesJMSConsumer.getReceivedMessageCount().incrementAndGet();
                } catch (JMSException e) {
                    throw new RuntimeException("Exception occurred when receiving messages.", e);
                }
            }
        });
        AndesJMSPublisher andesJMSPublisher = (AndesJMSPublisher) new AndesClient(andesJMSPublisherClientConfiguration, true).getPublishers().get(0);
        MessageProducer sender = andesJMSPublisher.getSender();
        for (int i = 0; i < 1000; i++) {
            sender.send(andesJMSPublisher.getSession().createTextMessage("#" + Integer.toString(i)));
        }
        AndesClientUtils.waitForMessagesAndShutdown(andesClient, 20000L);
        this.log.info("Received Messages : " + getMessageList(arrayList));
        for (int i2 = 0; i2 < 1000; i2++) {
            Assert.assertEquals((String) arrayList.get(i2).getLeft(), "#" + Integer.toString(i2), "Invalid messages received. #" + Integer.toString(i2) + " expected.");
        }
        validateMessageContentAndDelay(arrayList, 0, 1000, "#0");
        validateMessageContentAndDelay(arrayList, 99, 1001, "#100");
        validateMessageContentAndDelay(arrayList, 199, 1002, "#200");
        validateMessageContentAndDelay(arrayList, 299, 1003, "#300");
        validateMessageContentAndDelay(arrayList, 399, 1004, "#400");
        validateMessageContentAndDelay(arrayList, 499, 1005, "#500");
        validateMessageContentAndDelay(arrayList, 599, 1006, "#600");
        validateMessageContentAndDelay(arrayList, 699, 1007, "#700");
        validateMessageContentAndDelay(arrayList, 799, 1008, "#800");
        validateMessageContentAndDelay(arrayList, 899, 1009, "#900");
        Assert.assertEquals(arrayList.size(), 1000 + 10, "Message receiving failed.");
    }

    @Test(groups = {"wso2.mb", "queue"})
    public void oneByOneUnacknowledgeMessageReceiverForMultipleMessagesTestCase() throws AndesClientConfigurationException, XPathExpressionException, IOException, JMSException, AndesClientException, NamingException {
        final ArrayList arrayList = new ArrayList();
        AndesJMSConsumerClientConfiguration andesJMSConsumerClientConfiguration = new AndesJMSConsumerClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "oneByOneUnacknowledgeMessageReceiverForMultipleQueue");
        andesJMSConsumerClientConfiguration.setAcknowledgeMode(JMSAcknowledgeMode.PER_MESSAGE_ACKNOWLEDGE);
        andesJMSConsumerClientConfiguration.setAsync(false);
        AndesJMSPublisherClientConfiguration andesJMSPublisherClientConfiguration = new AndesJMSPublisherClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "oneByOneUnacknowledgeMessageReceiverForMultipleQueue");
        andesJMSPublisherClientConfiguration.setNumberOfMessagesToSend(1000L);
        AndesClient andesClient = new AndesClient(andesJMSConsumerClientConfiguration, true);
        final AndesJMSConsumer andesJMSConsumer = (AndesJMSConsumer) andesClient.getConsumers().get(0);
        final MessageConsumer receiver = andesJMSConsumer.getReceiver();
        new Thread() { // from class: org.wso2.mb.integration.tests.amqp.functional.RedeliveryDelayTestCase.12
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (receiver != null) {
                    try {
                        TextMessage receive = receiver.receive();
                        if (Integer.parseInt(receive.getText().split("#")[1]) % 100 != 0 || RedeliveryDelayTestCase.this.getMessageList(arrayList).contains(receive.getText())) {
                            receive.acknowledge();
                        }
                        arrayList.add(ImmutablePair.of(receive.getText(), Calendar.getInstance()));
                        andesJMSConsumer.getReceivedMessageCount().incrementAndGet();
                    } catch (JMSException e) {
                        throw new RuntimeException("Exception occurred when receiving messages.", e);
                    }
                }
            }
        }.start();
        AndesJMSPublisher andesJMSPublisher = (AndesJMSPublisher) new AndesClient(andesJMSPublisherClientConfiguration, true).getPublishers().get(0);
        MessageProducer sender = andesJMSPublisher.getSender();
        for (int i = 0; i < 1000; i++) {
            sender.send(andesJMSPublisher.getSession().createTextMessage("#" + Integer.toString(i)));
        }
        AndesClientUtils.waitForMessagesAndShutdown(andesClient, 20000L);
        this.log.info("Received Messages : " + getMessageList(arrayList));
        for (int i2 = 0; i2 < 1000; i2++) {
            Assert.assertEquals((String) arrayList.get(i2).getLeft(), "#" + Integer.toString(i2), "Invalid messages received. #" + Integer.toString(i2) + " expected.");
        }
        validateMessageContentAndDelay(arrayList, 0, 1000, "#0");
        validateMessageContentAndDelay(arrayList, 99, 1001, "#100");
        validateMessageContentAndDelay(arrayList, 199, 1002, "#200");
        validateMessageContentAndDelay(arrayList, 299, 1003, "#300");
        validateMessageContentAndDelay(arrayList, 399, 1004, "#400");
        validateMessageContentAndDelay(arrayList, 499, 1005, "#500");
        validateMessageContentAndDelay(arrayList, 599, 1006, "#600");
        validateMessageContentAndDelay(arrayList, 699, 1007, "#700");
        validateMessageContentAndDelay(arrayList, 799, 1008, "#800");
        validateMessageContentAndDelay(arrayList, 899, 1009, "#900");
        Assert.assertEquals(arrayList.size(), 1000 + 10, "Message receiving failed.");
    }

    @AfterClass
    public void tearDown() throws IOException, AutomationUtilException, LogoutAuthenticationExceptionException, AndesAdminServiceBrokerManagerAdminException {
        if (StringUtils.isBlank(this.defaultAndesAckWaitTimeOut)) {
            System.clearProperty("AndesAckWaitTimeOut");
        } else {
            System.setProperty("AndesAckWaitTimeOut", this.defaultAndesAckWaitTimeOut);
        }
        if (StringUtils.isBlank(this.defaultAndesRedeliveryDelay)) {
            System.clearProperty("AndesRedeliveryDelay");
        } else {
            System.setProperty("AndesRedeliveryDelay", this.defaultAndesRedeliveryDelay);
        }
        LoginLogoutClient loginLogoutClient = new LoginLogoutClient(((MBIntegrationBaseTest) this).automationContext);
        AndesAdminClient andesAdminClient = new AndesAdminClient(((MBIntegrationBaseTest) this).backendURL, loginLogoutClient.login());
        andesAdminClient.deleteQueue("firstMessageInvalidOnlyQueue");
        andesAdminClient.deleteQueue("firstMessageInvalidOnlyReceiverQueue");
        andesAdminClient.deleteQueue("multipleUnacknowledgeQueue");
        andesAdminClient.deleteQueue("multipleUnacknowledgeReceiverQueue");
        andesAdminClient.deleteQueue("oneByOneUnacknowledgeQueue");
        andesAdminClient.deleteQueue("oneByOneUnacknowledgeReceiverQueue");
        andesAdminClient.deleteQueue("firstFewUnacknowledgeQueue");
        andesAdminClient.deleteQueue("firstFewUnacknowledgeReceiverQueue");
        andesAdminClient.deleteQueue("unacknowledgeMiddleMessageQueue");
        andesAdminClient.deleteQueue("unacknowledgeMiddleMessageReceiverQueue");
        andesAdminClient.deleteQueue("oneByOneUnacknowledgeMessageListenerForMultiple");
        andesAdminClient.deleteQueue("oneByOneUnacknowledgeMessageReceiverForMultipleQueue");
        loginLogoutClient.logout();
        ((MBIntegrationBaseTest) this).serverManager.restoreToLastConfiguration(true);
    }

    private void validateMessageContentAndDelay(List<ImmutablePair<String, Calendar>> list, int i, int i2, String str) {
        String str2 = (String) list.get(i2).getLeft();
        Assert.assertEquals(str2, str, "Invalid messages received.");
        Calendar calendar = (Calendar) list.get(i).getRight();
        this.log.info("Original message timestamp for " + str2 + " : " + calendar.getTimeInMillis());
        calendar.add(13, 10);
        this.log.info("Minimum redelivered timestamp for " + str2 + " : " + calendar.getTimeInMillis());
        Calendar calendar2 = (Calendar) list.get(i2).getRight();
        this.log.info("Timestamp of redelivered for " + str2 + " message : " + calendar2.getTimeInMillis());
        Assert.assertTrue(calendar.compareTo(calendar2) <= 0, "Message received before the redelivery delay");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<String> getMessageList(List<ImmutablePair<String, Calendar>> list) {
        ArrayList arrayList = new ArrayList();
        Iterator<ImmutablePair<String, Calendar>> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getLeft());
        }
        return arrayList;
    }
}
