package org.wso2.carbon.esb.jms.inbound.transport.test;

import javax.xml.stream.XMLStreamException;
import org.apache.activemq.util.ThreadPoolUtils;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.util.AXIOMUtil;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.wso2.carbon.automation.extensions.servers.jmsserver.controller.config.JMSBrokerConfigurationProvider;
import org.wso2.carbon.integration.common.admin.client.LogViewerClient;
import org.wso2.carbon.logging.view.stub.types.carbon.LogEvent;
import org.wso2.esb.integration.common.utils.ESBIntegrationTest;
import org.wso2.esb.integration.common.utils.clients.jmsclient.JMSQueueMessageProducer;
import org.wso2.esb.integration.common.utils.servers.ActiveMQServer;

/* loaded from: input_file:org/wso2/carbon/esb/jms/inbound/transport/test/JMSInboundMessageSupportTestCase.class */
public class JMSInboundMessageSupportTestCase extends ESBIntegrationTest {
    private JMSQueueMessageProducer jmsQueueMessageProducer;
    private LogViewerClient logViewerClient;
    private LogEvent[] logEvents;
    private String endpointName;
    private String queueName;
    private long startTime;
    private ActiveMQServer activeMQServer = new ActiveMQServer();
    private int numberOfMessages = 3;
    private int numberOfMessagesReceived = 0;

    @BeforeClass(alwaysRun = true)
    protected void init() throws Exception {
        this.activeMQServer.startJMSBroker();
        super.init();
        this.jmsQueueMessageProducer = new JMSQueueMessageProducer(JMSBrokerConfigurationProvider.getInstance().getBrokerConfiguration());
        this.logViewerClient = new LogViewerClient(this.contextUrls.getBackEndUrl(), getSessionCookie());
    }

    @Test(groups = {"wso2.esb"}, description = "JMS Inbound Endpoint Polling BytesMessages from a Queue")
    public void testBytesMessageType() throws Exception {
        this.endpointName = "BytesMessageEndpoint";
        this.queueName = "BytesMessageQueue";
        this.logViewerClient.clearLogs();
        try {
            this.jmsQueueMessageProducer.connect(this.queueName);
            for (int i = 0; i < this.numberOfMessages; i++) {
                this.jmsQueueMessageProducer.sendBytesMessage("<?xml version='1.0' encoding='UTF-8'?><soapenv:Envelope xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope/\" xmlns:ser=\"http://services.samples\" xmlns:xsd=\"http://services.samples/xsd\">   <soapenv:Header/>   <soapenv:Body>      <ser:placeOrder>         <ser:order>            <xsd:price>100</xsd:price>            <xsd:quantity>2000</xsd:quantity>            <xsd:symbol>JMSBytes</xsd:symbol>         </ser:order>      </ser:placeOrder>   </soapenv:Body></soapenv:Envelope>".getBytes());
                this.log.info("BytesMessage " + i + " published to the JMS Queue");
            }
            addInboundEndpoint(getEndpointConfig(this.endpointName, this.queueName));
            this.startTime = System.currentTimeMillis();
            while (this.numberOfMessagesReceived < this.numberOfMessages && System.currentTimeMillis() - this.startTime < ThreadPoolUtils.DEFAULT_SHUTDOWN_AWAIT_TERMINATION) {
                this.numberOfMessagesReceived = 0;
                this.logEvents = this.logViewerClient.getAllRemoteSystemLogs();
                for (LogEvent logEvent : this.logEvents) {
                    if (logEvent.getMessage().contains("<xsd:symbol>JMSBytes</xsd:symbol>")) {
                        this.numberOfMessagesReceived++;
                    }
                }
            }
            deleteInboundEndpointFromName(this.endpointName);
            Assert.assertEquals(this.numberOfMessagesReceived, this.numberOfMessages, "JMS Inbound Endpoint couldn't consume BytesMessages from Queue");
        } finally {
            this.jmsQueueMessageProducer.disconnect();
        }
    }

    @Test(groups = {"wso2.esb"}, description = "JMS Inbound Endpoint Polling MapMessages from a Queue")
    public void testMapMessageType() throws Exception {
        this.endpointName = "MapMessageEndpoint";
        this.queueName = "MapMessageQueue";
        this.logViewerClient.clearLogs();
        try {
            this.jmsQueueMessageProducer.connect(this.queueName);
            for (int i = 0; i < 3; i++) {
                this.jmsQueueMessageProducer.sendMapMessage();
                this.log.info("MapMessage " + i + " published to the JMS Queue");
            }
            addInboundEndpoint(getEndpointConfig(this.endpointName, this.queueName));
            this.startTime = System.currentTimeMillis();
            while (this.numberOfMessagesReceived < this.numberOfMessages && System.currentTimeMillis() - this.startTime < ThreadPoolUtils.DEFAULT_SHUTDOWN_AWAIT_TERMINATION) {
                this.numberOfMessagesReceived = 0;
                this.logEvents = this.logViewerClient.getAllRemoteSystemLogs();
                for (LogEvent logEvent : this.logEvents) {
                    if (logEvent.getMessage().contains("JMSMap xmlns=\"http://axis.apache.org/axis2/java/transports/jms/map-payload\"")) {
                        this.numberOfMessagesReceived++;
                    }
                }
            }
            deleteInboundEndpointFromName(this.endpointName);
            Assert.assertEquals(this.numberOfMessagesReceived, this.numberOfMessages, "JMS Inbound Endpoint couldn't consume MapMessages from Queue");
        } finally {
            this.jmsQueueMessageProducer.disconnect();
        }
    }

    @AfterClass(alwaysRun = true)
    public void destroy() throws Exception {
        super.cleanup();
        this.activeMQServer.stopJMSBroker();
    }

    private OMElement getEndpointConfig(String str, String str2) throws XMLStreamException {
        return AXIOMUtil.stringToOM("<inboundEndpoint xmlns=\"http://ws.apache.org/ns/synapse\"\n                 name=\"" + str + "\"\n                 sequence=\"requestHandlerSeq\"\n                 onError=\"inFault\"\n                 protocol=\"jms\"\n                 suspend=\"false\">\n    <parameters>\n        <parameter name=\"interval\">1000</parameter>\n        <parameter name=\"transport.jms.Destination\">" + str2 + "</parameter>\n        <parameter name=\"transport.jms.CacheLevel\">1</parameter>\n        <parameter name=\"transport.jms.ConnectionFactoryJNDIName\">QueueConnectionFactory</parameter>\n        <parameter name=\"java.naming.factory.initial\">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>\n        <parameter name=\"java.naming.provider.url\">tcp://localhost:61616</parameter>\n        <parameter name=\"transport.jms.SessionAcknowledgement\">AUTO_ACKNOWLEDGE</parameter>\n        <parameter name=\"transport.jms.SessionTransacted\">false</parameter>\n        <parameter name=\"transport.jms.ConnectionFactoryType\">queue</parameter>\n    </parameters>\n</inboundEndpoint>");
    }
}
