package org.wso2.carbon.esb.jms.transport.test;

import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.util.AXIOMUtil;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.wso2.carbon.automation.extensions.servers.jmsserver.client.JMSQueueMessageProducer;
import org.wso2.carbon.automation.extensions.servers.jmsserver.controller.config.JMSBrokerConfigurationProvider;
import org.wso2.esb.integration.common.utils.CarbonLogReader;
import org.wso2.esb.integration.common.utils.ESBIntegrationTest;
import org.wso2.esb.integration.common.utils.Utils;

/* loaded from: input_file:org/wso2/carbon/esb/jms/transport/test/EI1622JMSInboundMessagePollingConsumerTest.class */
public class EI1622JMSInboundMessagePollingConsumerTest extends ESBIntegrationTest {
    private static final String ENDPOINT_NAME = "jms_inbound";
    private CarbonLogReader carbonLogReader = null;

    @BeforeClass(alwaysRun = true)
    protected void init() throws Exception {
        this.carbonLogReader = new CarbonLogReader();
        this.carbonLogReader.start();
    }

    @Test(groups = {"wso2.esb"}, description = "Check whether polling is suspended.")
    public void testPollingWithSuspensionLimit() throws Exception {
        pushMessageToQue(addEndpoint());
        Assert.assertTrue(Utils.checkForLog(this.carbonLogReader, "Suspending polling as the pollingSuspensionLimit of 2 reached. Polling will be re-started after 3000 milliseconds", 180), "JMS Polling suspension is not enabled.");
        Utils.undeploySynapseConfiguration(ENDPOINT_NAME, Utils.ArtifactType.INBOUND_ENDPOINT.getDirName(), false);
    }

    @Test(groups = {"wso2.esb"}, description = "Check whether polling is permanently suspended when limit is zero.")
    public void testPollingWithSuspensionLimitAsZero() throws Exception {
        pushMessageToQue(addEndpointWithSuspensionLimitZero());
        Assert.assertTrue(Utils.checkForLog(this.carbonLogReader, "Polling is suspended permanently", 180), "JMS Polling is not permanently suspended though the suspension limit is 0.");
        Utils.undeploySynapseConfiguration(ENDPOINT_NAME, Utils.ArtifactType.INBOUND_ENDPOINT.getDirName(), false);
    }

    private void pushMessageToQue(OMElement oMElement) throws Exception {
        JMSQueueMessageProducer jMSQueueMessageProducer = new JMSQueueMessageProducer(JMSBrokerConfigurationProvider.getInstance().getBrokerConfiguration());
        try {
            this.carbonLogReader.clearLogs();
            Utils.deploySynapseConfiguration(oMElement, ENDPOINT_NAME, Utils.ArtifactType.INBOUND_ENDPOINT.getDirName(), false);
            Assert.assertTrue(this.carbonLogReader.checkForLog("Initializing Inbound Endpoint: jms_inbound", 60), "Deployment failed.");
            this.carbonLogReader.clearLogs();
            jMSQueueMessageProducer.connect("JMSMS");
            jMSQueueMessageProducer.pushMessage("<?xml version='1.0' encoding='UTF-8'?><soapenv:Envelope xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope/\" xmlns:ser=\"http://services.samples\" xmlns:xsd=\"http://services.samples/xsd\">   <soapenv:Header/>   <soapenv:Body>      <ser:placeOrder>         <ser:order>            <xsd:price>100</xsd:price>            <xsd:quantity>2000</xsd:quantity>            <xsd:symbol>WSO2</xsd:symbol>         </ser:order>      </ser:placeOrder>   </soapenv:Body></soapenv:Envelope>");
            this.log.info("Message pushed to the JMS Queue");
            jMSQueueMessageProducer.disconnect();
        } catch (Throwable th) {
            jMSQueueMessageProducer.disconnect();
            throw th;
        }
    }

    @AfterClass(alwaysRun = true)
    public void destroy() throws Exception {
        this.carbonLogReader.stop();
        Utils.undeploySynapseConfiguration(ENDPOINT_NAME, Utils.ArtifactType.INBOUND_ENDPOINT.getDirName(), false);
    }

    private OMElement addEndpoint() throws Exception {
        return AXIOMUtil.stringToOM("<inboundEndpoint xmlns=\"http://ws.apache.org/ns/synapse\"\n                 name=\"jms_inbound\"\n                 sequence=\"request\"\n                 onError=\"faultSeq\"\n                 protocol=\"jms\"\n                 suspend=\"false\">\n   <parameters>\n      <parameter name=\"interval\">1000</parameter>\n      <parameter name=\"sequential\">true</parameter>\n      <parameter name=\"coordination\">true</parameter>\n      <parameter name=\"java.naming.factory.initial\">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>\n      <parameter name=\"java.naming.provider.url\">tcp://localhost:61616</parameter>\n      <parameter name=\"transport.jms.ConnectionFactoryJNDIName\">QueueConnectionFactory</parameter>\n      <parameter name=\"transport.jms.ConnectionFactoryType\">queue</parameter>\n      <parameter name=\"transport.jms.Destination\">JMSMS</parameter>\n      <parameter name=\"transport.jms.SessionTransacted\">true</parameter>\n      <parameter name=\"transport.jms.SessionAcknowledgement\">CLIENT_ACKNOWLEDGE</parameter>\n      <parameter name=\"transport.jms.CacheLevel\">3</parameter>\n      <parameter name=\"transport.jms.SubscriptionDurable\">false</parameter>\n      <parameter name=\"transport.jms.RetriesBeforeSuspension\">2</parameter>\n      <parameter name=\"transport.jms.PollingSuspensionPeriod\">3000</parameter>\n   </parameters>\n</inboundEndpoint>");
    }

    private OMElement addEndpointWithSuspensionLimitZero() throws Exception {
        return AXIOMUtil.stringToOM("<inboundEndpoint xmlns=\"http://ws.apache.org/ns/synapse\"\n                 name=\"jms_inbound\"\n                 sequence=\"request\"\n                 onError=\"faultSeq\"\n                 protocol=\"jms\"\n                 suspend=\"false\">\n   <parameters>\n      <parameter name=\"interval\">1000</parameter>\n      <parameter name=\"sequential\">true</parameter>\n      <parameter name=\"coordination\">true</parameter>\n      <parameter name=\"java.naming.factory.initial\">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>\n      <parameter name=\"java.naming.provider.url\">tcp://localhost:61616</parameter>\n      <parameter name=\"transport.jms.ConnectionFactoryJNDIName\">QueueConnectionFactory</parameter>\n      <parameter name=\"transport.jms.ConnectionFactoryType\">queue</parameter>\n      <parameter name=\"transport.jms.Destination\">JMSMS</parameter>\n      <parameter name=\"transport.jms.SessionTransacted\">true</parameter>\n      <parameter name=\"transport.jms.SessionAcknowledgement\">CLIENT_ACKNOWLEDGE</parameter>\n      <parameter name=\"transport.jms.CacheLevel\">3</parameter>\n      <parameter name=\"transport.jms.SubscriptionDurable\">false</parameter>\n      <parameter name=\"transport.jms.RetriesBeforeSuspension\">0</parameter>\n      <parameter name=\"transport.jms.PollingSuspensionPeriod\">3000</parameter>\n   </parameters>\n</inboundEndpoint>");
    }
}
