package org.wso2.mb.integration.tests.amqp.functional;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import javax.naming.NamingException;
import javax.xml.xpath.XPathExpressionException;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.wso2.carbon.automation.engine.context.TestUserMode;
import org.wso2.mb.integration.common.clients.AndesClient;
import org.wso2.mb.integration.common.clients.AndesJMSPublisher;
import org.wso2.mb.integration.common.clients.configurations.AndesJMSConsumerClientConfiguration;
import org.wso2.mb.integration.common.clients.configurations.AndesJMSPublisherClientConfiguration;
import org.wso2.mb.integration.common.clients.exceptions.AndesClientConfigurationException;
import org.wso2.mb.integration.common.clients.exceptions.AndesClientException;
import org.wso2.mb.integration.common.clients.operations.utils.AndesClientConstants;
import org.wso2.mb.integration.common.clients.operations.utils.AndesClientUtils;
import org.wso2.mb.integration.common.clients.operations.utils.ExchangeType;
import org.wso2.mb.integration.common.utils.backend.MBIntegrationBaseTest;

/* loaded from: input_file:org/wso2/mb/integration/tests/amqp/functional/TransactionalPublishingTestCase.class */
public class TransactionalPublishingTestCase extends MBIntegrationBaseTest {
    @BeforeClass
    public void prepare() throws XPathExpressionException {
        init(TestUserMode.SUPER_TENANT_ADMIN);
    }

    @Test(groups = {"wso2.mb", "queue", "transaction"}, description = "Send message and check whether message is received, and commit and check whether message is received")
    public void enqueueAndCheckCommitAndCheckTestCase() throws IOException, JMSException, AndesClientException, NamingException, AndesClientConfigurationException, InterruptedException, XPathExpressionException {
        AndesJMSConsumerClientConfiguration andesJMSConsumerClientConfiguration = new AndesJMSConsumerClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "Transactional-enqueueAndCheckCommitAndCheckTestCase");
        andesJMSConsumerClientConfiguration.setMaximumMessagesToReceived(1);
        andesJMSConsumerClientConfiguration.setAsync(false);
        AndesJMSPublisherClientConfiguration andesJMSPublisherClientConfiguration = new AndesJMSPublisherClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "Transactional-enqueueAndCheckCommitAndCheckTestCase");
        andesJMSPublisherClientConfiguration.setTransactionalSession(true);
        AndesClient andesClient = new AndesClient(andesJMSConsumerClientConfiguration, true);
        andesClient.startClient();
        AndesClient andesClient2 = new AndesClient(andesJMSPublisherClientConfiguration, true);
        AndesJMSPublisher andesJMSPublisher = (AndesJMSPublisher) andesClient2.getPublishers().get(0);
        try {
            andesJMSPublisher.getSender().send(andesJMSPublisher.getSession().createTextMessage("transactional message"));
            TimeUnit.MILLISECONDS.sleep(10000L);
            Assert.assertEquals(andesClient.getReceivedMessageCount(), 0, "Message received! send messages are not committed hence no message should be received");
            andesJMSPublisher.getSession().commit();
            AndesClientUtils.waitForMessagesAndShutdown(andesClient, 10000L);
            Assert.assertEquals(andesClient.getReceivedMessageCount(), 1, "Expected message count not received after commit");
            andesClient2.stopClient();
        } catch (Throwable th) {
            andesClient2.stopClient();
            throw th;
        }
    }

    @Test(groups = {"wso2.mb", "queue", "transaction"}, description = "Test for rollback functionality")
    public void enqueueAndRollbackEnqueueAndCommitTestCase() throws AndesClientConfigurationException, IOException, JMSException, AndesClientException, NamingException, InterruptedException, XPathExpressionException {
        AndesJMSConsumerClientConfiguration andesJMSConsumerClientConfiguration = new AndesJMSConsumerClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "Transactional-enqueueAndRollbackEnqueueAndCommitTestCase");
        andesJMSConsumerClientConfiguration.setFilePathToWriteReceivedMessages(AndesClientConstants.FILE_PATH_TO_WRITE_RECEIVED_MESSAGES);
        andesJMSConsumerClientConfiguration.setAsync(false);
        AndesJMSPublisherClientConfiguration andesJMSPublisherClientConfiguration = new AndesJMSPublisherClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "Transactional-enqueueAndRollbackEnqueueAndCommitTestCase");
        andesJMSPublisherClientConfiguration.setTransactionalSession(true);
        AndesClient andesClient = new AndesClient(andesJMSConsumerClientConfiguration, true);
        andesClient.startClient();
        AndesClient andesClient2 = new AndesClient(andesJMSPublisherClientConfiguration, true);
        AndesJMSPublisher andesJMSPublisher = (AndesJMSPublisher) andesClient2.getPublishers().get(0);
        try {
            andesJMSPublisher.getSender().send(andesJMSPublisher.getSession().createTextMessage("rollback message"));
            TimeUnit.MILLISECONDS.sleep(10000L);
            Assert.assertEquals(andesClient.getReceivedMessageCount(), 0, "Message received! send messages are not committed hence no message should be received");
            andesJMSPublisher.getSession().rollback();
            andesJMSPublisher.getSender().send(andesJMSPublisher.getSession().createTextMessage("transactional message"));
            andesJMSPublisher.getSession().commit();
            AndesClientUtils.waitForMessagesAndShutdown(andesClient, 10000L);
            BufferedReader bufferedReader = new BufferedReader(new FileReader(AndesClientConstants.FILE_PATH_TO_WRITE_RECEIVED_MESSAGES));
            String readLine = bufferedReader.readLine();
            bufferedReader.close();
            Assert.assertEquals(andesClient.getReceivedMessageCount(), 1, "Expected message count not received after commit");
            Assert.assertEquals(readLine, "transactional message", "Message content has been modified.");
            andesClient2.stopClient();
        } catch (Throwable th) {
            andesClient2.stopClient();
            throw th;
        }
    }

    @Test(groups = {"wso2.mb", "queue", "transaction"}, description = "Test transactions with multiple publishers")
    public void multiplePublisherEnqueueAndCheckCommitAndCheckTestCase() throws AndesClientConfigurationException, IOException, JMSException, AndesClientException, NamingException, InterruptedException, XPathExpressionException {
        AndesJMSConsumerClientConfiguration andesJMSConsumerClientConfiguration = new AndesJMSConsumerClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "Transactional-multiplePublisherEnqueueAndCheckCommitAndCheck-1");
        andesJMSConsumerClientConfiguration.setMaximumMessagesToReceived(1);
        andesJMSConsumerClientConfiguration.setAsync(false);
        AndesJMSConsumerClientConfiguration andesJMSConsumerClientConfiguration2 = new AndesJMSConsumerClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "Transactional-multiplePublisherEnqueueAndCheckCommitAndCheck-2");
        andesJMSConsumerClientConfiguration2.setMaximumMessagesToReceived(1);
        andesJMSConsumerClientConfiguration2.setAsync(false);
        AndesJMSPublisherClientConfiguration andesJMSPublisherClientConfiguration = new AndesJMSPublisherClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "Transactional-multiplePublisherEnqueueAndCheckCommitAndCheck-1");
        andesJMSPublisherClientConfiguration.setTransactionalSession(true);
        AndesJMSPublisherClientConfiguration andesJMSPublisherClientConfiguration2 = new AndesJMSPublisherClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "Transactional-multiplePublisherEnqueueAndCheckCommitAndCheck-2");
        andesJMSPublisherClientConfiguration2.setTransactionalSession(true);
        AndesClient andesClient = new AndesClient(andesJMSConsumerClientConfiguration, true);
        AndesClient andesClient2 = new AndesClient(andesJMSConsumerClientConfiguration2, true);
        andesClient.startClient();
        andesClient2.startClient();
        AndesClient andesClient3 = new AndesClient(andesJMSPublisherClientConfiguration, true);
        AndesClient andesClient4 = new AndesClient(andesJMSPublisherClientConfiguration2, true);
        AndesJMSPublisher andesJMSPublisher = (AndesJMSPublisher) andesClient3.getPublishers().get(0);
        AndesJMSPublisher andesJMSPublisher2 = (AndesJMSPublisher) andesClient4.getPublishers().get(0);
        try {
            TextMessage createTextMessage = andesJMSPublisher.getSession().createTextMessage("message1");
            TextMessage createTextMessage2 = andesJMSPublisher2.getSession().createTextMessage("message2");
            andesJMSPublisher.getSender().send(createTextMessage);
            andesJMSPublisher2.getSender().send(createTextMessage2);
            TimeUnit.MILLISECONDS.sleep(10000L);
            Assert.assertEquals(andesClient.getReceivedMessageCount(), 0, "Message received for Transactional-multiplePublisherEnqueueAndCheckCommitAndCheck-1 ! send messages are not committed hence no message should be received");
            Assert.assertEquals(andesClient2.getReceivedMessageCount(), 0, "Message received for Transactional-multiplePublisherEnqueueAndCheckCommitAndCheck-2 ! send messages are not committed hence no message should be received");
            andesJMSPublisher.getSession().commit();
            AndesClientUtils.waitForMessagesAndShutdown(andesClient, 10000L);
            Assert.assertEquals(andesClient.getReceivedMessageCount(), 1, "Expected message count not received after commit");
            Assert.assertEquals(andesClient2.getReceivedMessageCount(), 0, "Message received for Transactional-multiplePublisherEnqueueAndCheckCommitAndCheck-2 ! send messages are not committed hence no message should be received");
            andesJMSPublisher2.getSession().commit();
            AndesClientUtils.waitForMessagesAndShutdown(andesClient2, 10000L);
            Assert.assertEquals(andesClient2.getReceivedMessageCount(), 1, "Expected message count not received after commit");
            andesClient3.stopClient();
            andesClient4.stopClient();
        } catch (Throwable th) {
            andesClient3.stopClient();
            andesClient4.stopClient();
            throw th;
        }
    }

    @Test(groups = {"wso2.mb", "queue", "transaction"}, description = "Test rollback functionality with multiple publishers")
    public void multiplePublisherEnqueueAndRollbackEnqueueAndCommitTestCase() throws AndesClientConfigurationException, IOException, JMSException, AndesClientException, NamingException, InterruptedException, XPathExpressionException {
        AndesJMSConsumerClientConfiguration andesJMSConsumerClientConfiguration = new AndesJMSConsumerClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "Transactional-multiplePublisherEnqueueAndRollbackEnqueueAndCommit");
        andesJMSConsumerClientConfiguration.setMaximumMessagesToReceived(2);
        andesJMSConsumerClientConfiguration.setFilePathToWriteReceivedMessages(AndesClientConstants.FILE_PATH_TO_WRITE_RECEIVED_MESSAGES);
        AndesJMSPublisherClientConfiguration andesJMSPublisherClientConfiguration = new AndesJMSPublisherClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "Transactional-multiplePublisherEnqueueAndRollbackEnqueueAndCommit");
        andesJMSPublisherClientConfiguration.setTransactionalSession(true);
        AndesJMSPublisherClientConfiguration andesJMSPublisherClientConfiguration2 = new AndesJMSPublisherClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "Transactional-multiplePublisherEnqueueAndRollbackEnqueueAndCommit");
        andesJMSPublisherClientConfiguration2.setTransactionalSession(true);
        AndesClient andesClient = new AndesClient(andesJMSConsumerClientConfiguration, true);
        andesClient.startClient();
        AndesClient andesClient2 = new AndesClient(andesJMSPublisherClientConfiguration, true);
        AndesClient andesClient3 = new AndesClient(andesJMSPublisherClientConfiguration2, true);
        AndesJMSPublisher andesJMSPublisher = (AndesJMSPublisher) andesClient2.getPublishers().get(0);
        AndesJMSPublisher andesJMSPublisher2 = (AndesJMSPublisher) andesClient3.getPublishers().get(0);
        BufferedReader bufferedReader = new BufferedReader(new FileReader(AndesClientConstants.FILE_PATH_TO_WRITE_RECEIVED_MESSAGES));
        try {
            TextMessage createTextMessage = andesJMSPublisher.getSession().createTextMessage("rollback message 1");
            TextMessage createTextMessage2 = andesJMSPublisher2.getSession().createTextMessage("rollback message 2");
            andesJMSPublisher.getSender().send(createTextMessage);
            andesJMSPublisher2.getSender().send(createTextMessage2);
            TimeUnit.MILLISECONDS.sleep(10000L);
            Assert.assertEquals(andesClient.getReceivedMessageCount(), 0, "Message received for Transactional-multiplePublisherEnqueueAndRollbackEnqueueAndCommit ! send messages are not committed hence no message should be received");
            andesJMSPublisher.getSession().rollback();
            andesJMSPublisher.getSender().send(andesJMSPublisher.getSession().createTextMessage("transactional message 1"));
            andesJMSPublisher.getSession().commit();
            TimeUnit.MILLISECONDS.sleep(10000L);
            AndesClientUtils.flushPrintWriters();
            String readLine = bufferedReader.readLine();
            Assert.assertEquals(andesClient.getReceivedMessageCount(), 1, "Expected message count not received after commit");
            Assert.assertEquals(readLine, "transactional message 1", "Message content has been modified.");
            andesJMSPublisher2.getSession().rollback();
            andesJMSPublisher2.getSender().send(andesJMSPublisher2.getSession().createTextMessage("transactional message 2"));
            andesJMSPublisher2.getSession().commit();
            AndesClientUtils.waitForMessagesAndShutdown(andesClient, 10000L);
            AndesClientUtils.flushPrintWriters();
            String readLine2 = bufferedReader.readLine();
            Assert.assertEquals(andesClient.getReceivedMessageCount(), 2, "Expected message count not received after commit");
            Assert.assertEquals(readLine2, "transactional message 2", "Message content has been modified.");
            bufferedReader.close();
            andesClient2.stopClient();
            andesClient3.stopClient();
        } catch (Throwable th) {
            bufferedReader.close();
            andesClient2.stopClient();
            andesClient3.stopClient();
            throw th;
        }
    }

    @Test(groups = {"wso2.mb", "queue", "transaction"}, description = "Test the commit batch size limit check functionality", expectedExceptions = {JMSException.class})
    public void exceedCommitBatchSizeTest() throws IOException, JMSException, AndesClientException, NamingException, AndesClientConfigurationException, XPathExpressionException {
        AndesJMSPublisherClientConfiguration andesJMSPublisherClientConfiguration = new AndesJMSPublisherClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "Transactional-exceedCommitBatchSizeTest");
        andesJMSPublisherClientConfiguration.setTransactionalSession(true);
        AndesJMSConsumerClientConfiguration andesJMSConsumerClientConfiguration = new AndesJMSConsumerClientConfiguration(getAMQPPort().intValue(), ExchangeType.QUEUE, "Transactional-exceedCommitBatchSizeTest");
        andesJMSConsumerClientConfiguration.setMaximumMessagesToReceived(20);
        AndesClient andesClient = new AndesClient(andesJMSConsumerClientConfiguration, true);
        andesClient.startClient();
        AndesClient andesClient2 = new AndesClient(andesJMSPublisherClientConfiguration, true);
        AndesJMSPublisher andesJMSPublisher = (AndesJMSPublisher) andesClient2.getPublishers().get(0);
        char[] cArr = new char[1048576];
        BufferedReader bufferedReader = new BufferedReader(new FileReader(AndesClientConstants.MESSAGE_CONTENT_INPUT_FILE_PATH_1MB));
        try {
            bufferedReader.read(cArr);
            bufferedReader.close();
            TextMessage createTextMessage = andesJMSPublisher.getSession().createTextMessage(new String(cArr));
            for (int i = 0; i < 20; i++) {
                andesJMSPublisher.getSender().send(createTextMessage);
            }
            andesJMSPublisher.getSession().commit();
            andesClient2.stopClient();
            andesClient.stopClient();
        } catch (Throwable th) {
            andesClient2.stopClient();
            andesClient.stopClient();
            throw th;
        }
    }

    @Test(groups = {"wso2.mb", "queue", "transaction"}, description = "Test topic publisher without subscribers")
    public void topicPublishingWithoutSubsribers() throws XPathExpressionException, IOException, AndesClientException, NamingException, JMSException {
        AndesJMSPublisherClientConfiguration andesJMSPublisherClientConfiguration = new AndesJMSPublisherClientConfiguration(getAMQPPort().intValue(), ExchangeType.TOPIC, "Transactional-pubwithoutSub");
        andesJMSPublisherClientConfiguration.setTransactionalSession(true);
        AndesClient andesClient = new AndesClient(andesJMSPublisherClientConfiguration, true);
        AndesJMSPublisher andesJMSPublisher = (AndesJMSPublisher) andesClient.getPublishers().get(0);
        try {
            TextMessage createTextMessage = andesJMSPublisher.getSession().createTextMessage("Transactional-pubwithoutSub Message");
            for (int i = 0; i < 20; i++) {
                andesJMSPublisher.getSender().send(createTextMessage);
            }
            andesJMSPublisher.getSession().commit();
            andesClient.stopClient();
        } catch (Throwable th) {
            andesClient.stopClient();
            throw th;
        }
    }
}
