package org.wso2.carbon.integration.test.inputflow;

import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.wso2.carbon.automation.engine.context.TestUserMode;
import org.wso2.carbon.automation.extensions.servers.jmsserver.controller.JMSBrokerController;
import org.wso2.carbon.automation.extensions.servers.jmsserver.controller.config.JMSBrokerConfiguration;
import org.wso2.carbon.automation.extensions.servers.jmsserver.controller.config.JMSBrokerConfigurationProvider;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.integration.common.utils.mgt.ServerConfigurationManager;
import org.wso2.carbon.integration.test.client.JMSPublisherClient;
import org.wso2.carbon.integration.test.client.Wso2EventServer;
import org.wso2.cep.integration.common.utils.CEPIntegrationTest;

/* loaded from: input_file:org/wso2/carbon/integration/test/inputflow/JMSTestCase.class */
public class JMSTestCase extends CEPIntegrationTest {
    private static final Log log = LogFactory.getLog(JMSTestCase.class);
    private final String GERONIMO_J2EE_MANAGEMENT = "geronimo-j2ee-management_1.1_spec-1.0.1.jar";
    private final String SENSOR_STREAM_JSON = "org.wso2.event.sensor.stream_1.0.0.json";
    private final String SENSOR_STREAM = "org.wso2.event.sensor.stream:1.0.0";
    private final String JMS_RECEIVER_MAP = "jmsReceiverMap.xml";
    private final String WSO2_EVENT_PUBLISHER = "wso2EventPublisher.xml";
    private final String ACTIVEMQ_CORE = "activemq-core-5.7.0.jar";
    private JMSBrokerController activeMqBroker = null;
    private ServerConfigurationManager serverManager = null;

    @BeforeClass(alwaysRun = true)
    public void init() throws Exception {
        super.init(TestUserMode.SUPER_TENANT_ADMIN);
        try {
            this.serverManager = new ServerConfigurationManager(this.cepServer);
            setupActiveMQBroker();
            try {
                String str = File.separator + "artifacts" + File.separator + "CEP" + File.separator + "jar";
                this.serverManager.copyToComponentLib(new File(getClass().getResource(str + File.separator + "activemq-core-5.7.0.jar").toURI()));
                this.serverManager.copyToComponentLib(new File(getClass().getResource(str + File.separator + "geronimo-j2ee-management_1.1_spec-1.0.1.jar").toURI()));
                this.serverManager.restartGracefully();
                String sessionCookie = getSessionCookie();
                this.eventReceiverAdminServiceClient = this.configurationUtil.getEventReceiverAdminServiceClient(this.backendURL, sessionCookie);
                this.eventStreamManagerAdminServiceClient = this.configurationUtil.getEventStreamManagerAdminServiceClient(this.backendURL, sessionCookie);
                this.eventPublisherAdminServiceClient = this.configurationUtil.getEventPublisherAdminServiceClient(this.backendURL, sessionCookie);
                Thread.sleep(45000L);
            } catch (IOException e) {
                throw new RemoteException("IOException when initializing ActiveMQ broker", e);
            } catch (URISyntaxException e2) {
                throw new RemoteException("URISyntaxException when initializing ActiveMQ broker", e2);
            } catch (Exception e3) {
                throw new RemoteException("Exception caught when restarting server", e3);
            }
        } catch (MalformedURLException e4) {
            throw new RemoteException("Malformed URL exception thrown when initializing ActiveMQ broker", e4);
        }
    }

    @Test(groups = {"wso2.cep"}, description = "Testing activemq jms receiver with Map formatted event with default mapping.")
    public void jmsMapTestWithDefaultMappingScenario() throws Exception {
        String str = "inputflows" + File.separator + "sample0009";
        int eventStreamCount = this.eventStreamManagerAdminServiceClient.getEventStreamCount();
        int activeEventReceiverCount = this.eventReceiverAdminServiceClient.getActiveEventReceiverCount();
        int activeEventPublisherCount = this.eventPublisherAdminServiceClient.getActiveEventPublisherCount();
        this.eventStreamManagerAdminServiceClient.addEventStreamAsString(getJSONArtifactConfiguration(str, "org.wso2.event.sensor.stream_1.0.0.json"));
        Assert.assertEquals(this.eventStreamManagerAdminServiceClient.getEventStreamCount(), eventStreamCount + 1);
        this.eventReceiverAdminServiceClient.addEventReceiverConfiguration(getXMLArtifactConfiguration(str, "jmsReceiverMap.xml"));
        Assert.assertEquals(this.eventReceiverAdminServiceClient.getActiveEventReceiverCount(), activeEventReceiverCount + 1);
        this.eventPublisherAdminServiceClient.addEventPublisherConfiguration(getXMLArtifactConfiguration(str, "wso2EventPublisher.xml"));
        Assert.assertEquals(this.eventPublisherAdminServiceClient.getActiveEventPublisherCount(), activeEventPublisherCount + 1);
        Wso2EventServer wso2EventServer = new Wso2EventServer(str, 8461, true);
        new Thread(wso2EventServer).start();
        Thread.sleep(LeaseDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL);
        JMSPublisherClient.publish("topicMap", "csv", str, "topicMap.csv");
        Thread.sleep(LeaseDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL);
        this.eventStreamManagerAdminServiceClient.removeEventStream("org.wso2.event.sensor.stream", "1.0.0");
        this.eventReceiverAdminServiceClient.removeInactiveEventReceiverConfiguration("jmsReceiverMap.xml");
        this.eventPublisherAdminServiceClient.removeInactiveEventPublisherConfiguration("wso2EventPublisher.xml");
        Thread.sleep(2000L);
        ArrayList arrayList = new ArrayList();
        Event event = new Event();
        event.setStreamId("org.wso2.event.sensor.stream:1.0.0");
        event.setMetaData(new Object[]{19900813115534L, false, 601, "temperature"});
        event.setCorrelationData(new Object[]{Double.valueOf(90.34344d), Double.valueOf(20.44345d)});
        event.setPayloadData(new Object[]{Float.valueOf(2.3f), Double.valueOf(20.44345d)});
        arrayList.add(event);
        Event event2 = new Event();
        event2.setStreamId("org.wso2.event.sensor.stream:1.0.0");
        event2.setMetaData(new Object[]{19900813115534L, false, 602, "temperature"});
        event2.setCorrelationData(new Object[]{Double.valueOf(90.34344d), Double.valueOf(20.44345d)});
        event2.setPayloadData(new Object[]{Float.valueOf(2.3f), Double.valueOf(20.44345d)});
        arrayList.add(event2);
        Event event3 = new Event();
        event3.setStreamId("org.wso2.event.sensor.stream:1.0.0");
        event3.setMetaData(new Object[]{19900813115534L, false, 603, "temperature"});
        event3.setCorrelationData(new Object[]{Double.valueOf(90.34344d), Double.valueOf(20.44345d)});
        event3.setPayloadData(new Object[]{Float.valueOf(2.3f), Double.valueOf(20.44345d)});
        arrayList.add(event3);
        try {
            try {
                Assert.assertEquals(wso2EventServer.getMsgCount(), 3L, "Incorrect number of messages consumed!");
                List<Event> preservedEventList = wso2EventServer.getPreservedEventList();
                Iterator<Event> it = preservedEventList.iterator();
                while (it.hasNext()) {
                    it.next().setTimeStamp(0L);
                }
                Assert.assertEquals(preservedEventList, arrayList, "Mapping is incorrect!");
                wso2EventServer.stop();
            } catch (Throwable th) {
                log.error("Exception thrown: " + th.getMessage(), th);
                Assert.fail("Exception: " + th.getMessage());
                wso2EventServer.stop();
            }
        } catch (Throwable th2) {
            wso2EventServer.stop();
            throw th2;
        }
    }

    @Test(groups = {"wso2.cep"}, description = "Testing activemq jms receiver with Map formatted event with custom mapping", dependsOnMethods = {"jmsMapTestWithDefaultMappingScenario"})
    public void jmsMapTestWithCustomMappingScenario() throws Exception {
        String str = "inputflows" + File.separator + "sample0010";
        int eventStreamCount = this.eventStreamManagerAdminServiceClient.getEventStreamCount();
        int activeEventReceiverCount = this.eventReceiverAdminServiceClient.getActiveEventReceiverCount();
        int activeEventPublisherCount = this.eventPublisherAdminServiceClient.getActiveEventPublisherCount();
        this.eventStreamManagerAdminServiceClient.addEventStreamAsString(getJSONArtifactConfiguration(str, "org.wso2.event.sensor.stream_1.0.0.json"));
        Assert.assertEquals(this.eventStreamManagerAdminServiceClient.getEventStreamCount(), eventStreamCount + 1);
        this.eventReceiverAdminServiceClient.addEventReceiverConfiguration(getXMLArtifactConfiguration(str, "jmsReceiverMap.xml"));
        Assert.assertEquals(this.eventReceiverAdminServiceClient.getActiveEventReceiverCount(), activeEventReceiverCount + 1);
        this.eventPublisherAdminServiceClient.addEventPublisherConfiguration(getXMLArtifactConfiguration(str, "wso2EventPublisher.xml"));
        Assert.assertEquals(this.eventPublisherAdminServiceClient.getActiveEventPublisherCount(), activeEventPublisherCount + 1);
        Wso2EventServer wso2EventServer = new Wso2EventServer(str, 8461, true);
        new Thread(wso2EventServer).start();
        Thread.sleep(LeaseDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL);
        JMSPublisherClient.publish("topicMap", "csv", str, "topicMap.csv");
        Thread.sleep(LeaseDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL);
        this.eventStreamManagerAdminServiceClient.removeEventStream("org.wso2.event.sensor.stream", "1.0.0");
        this.eventReceiverAdminServiceClient.removeInactiveEventReceiverConfiguration("jmsReceiverMap.xml");
        this.eventPublisherAdminServiceClient.removeInactiveEventPublisherConfiguration("wso2EventPublisher.xml");
        Thread.sleep(2000L);
        ArrayList arrayList = new ArrayList();
        Event event = new Event();
        event.setStreamId("org.wso2.event.sensor.stream:1.0.0");
        event.setMetaData(new Object[]{19900813115534L, false, 501, "temperature"});
        event.setCorrelationData(new Object[]{Double.valueOf(90.34344d), Double.valueOf(20.44345d)});
        event.setPayloadData(new Object[]{Float.valueOf(2.3f), Double.valueOf(20.44345d)});
        arrayList.add(event);
        Event event2 = new Event();
        event2.setStreamId("org.wso2.event.sensor.stream:1.0.0");
        event2.setMetaData(new Object[]{19900813115534L, false, 502, "temperature"});
        event2.setCorrelationData(new Object[]{Double.valueOf(90.34344d), Double.valueOf(20.44345d)});
        event2.setPayloadData(new Object[]{Float.valueOf(2.3f), Double.valueOf(20.44345d)});
        arrayList.add(event2);
        Event event3 = new Event();
        event3.setStreamId("org.wso2.event.sensor.stream:1.0.0");
        event3.setMetaData(new Object[]{19900813115534L, false, 503, "temperature"});
        event3.setCorrelationData(new Object[]{Double.valueOf(90.34344d), Double.valueOf(20.44345d)});
        event3.setPayloadData(new Object[]{Float.valueOf(2.3f), Double.valueOf(20.44345d)});
        arrayList.add(event3);
        try {
            try {
                Assert.assertEquals(wso2EventServer.getMsgCount(), 3L, "Incorrect number of messages consumed!");
                List<Event> preservedEventList = wso2EventServer.getPreservedEventList();
                Iterator<Event> it = preservedEventList.iterator();
                while (it.hasNext()) {
                    it.next().setTimeStamp(0L);
                }
                Assert.assertEquals(preservedEventList, arrayList, "Mapping is incorrect!");
                wso2EventServer.stop();
            } catch (Throwable th) {
                log.error("Exception thrown: " + th.getMessage(), th);
                Assert.fail("Exception: " + th.getMessage());
                wso2EventServer.stop();
            }
        } catch (Throwable th2) {
            wso2EventServer.stop();
            throw th2;
        }
    }

    @Test(groups = {"wso2.cep"}, description = "Testing activemq jms receiver with JSON formatted event with default mapping", dependsOnMethods = {"jmsMapTestWithCustomMappingScenario"})
    public void jmsJSONTestWithDefaultMappingScenario() throws Exception {
        String str = "inputflows" + File.separator + "sample0011";
        int eventStreamCount = this.eventStreamManagerAdminServiceClient.getEventStreamCount();
        int activeEventReceiverCount = this.eventReceiverAdminServiceClient.getActiveEventReceiverCount();
        int activeEventPublisherCount = this.eventPublisherAdminServiceClient.getActiveEventPublisherCount();
        this.eventStreamManagerAdminServiceClient.addEventStreamAsString(getJSONArtifactConfiguration(str, "org.wso2.event.sensor.stream_1.0.0.json"));
        Assert.assertEquals(this.eventStreamManagerAdminServiceClient.getEventStreamCount(), eventStreamCount + 1);
        this.eventReceiverAdminServiceClient.addEventReceiverConfiguration(getXMLArtifactConfiguration(str, "jmsReceiverJSON.xml"));
        Assert.assertEquals(this.eventReceiverAdminServiceClient.getActiveEventReceiverCount(), activeEventReceiverCount + 1);
        this.eventPublisherAdminServiceClient.addEventPublisherConfiguration(getXMLArtifactConfiguration(str, "wso2EventPublisher.xml"));
        Assert.assertEquals(this.eventPublisherAdminServiceClient.getActiveEventPublisherCount(), activeEventPublisherCount + 1);
        Wso2EventServer wso2EventServer = new Wso2EventServer(str, 8461, true);
        new Thread(wso2EventServer).start();
        Thread.sleep(LeaseDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL);
        JMSPublisherClient.publish("topicJSON", "json", str, "topicJSON.txt");
        Thread.sleep(LeaseDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL);
        this.eventStreamManagerAdminServiceClient.removeEventStream("org.wso2.event.sensor.stream", "1.0.0");
        this.eventReceiverAdminServiceClient.removeInactiveEventReceiverConfiguration("jmsReceiverJSON.xml");
        this.eventPublisherAdminServiceClient.removeInactiveEventPublisherConfiguration("wso2EventPublisher.xml");
        Thread.sleep(2000L);
        ArrayList arrayList = new ArrayList();
        Event event = new Event();
        event.setStreamId("org.wso2.event.sensor.stream:1.0.0");
        event.setMetaData(new Object[]{19900813115534L, false, 701, "temperature"});
        event.setCorrelationData(new Object[]{Double.valueOf(90.34344d), Double.valueOf(20.44345d)});
        event.setPayloadData(new Object[]{Float.valueOf(2.3f), Double.valueOf(20.44345d)});
        arrayList.add(event);
        Event event2 = new Event();
        event2.setStreamId("org.wso2.event.sensor.stream:1.0.0");
        event2.setMetaData(new Object[]{19900813115534L, false, 702, "temperature"});
        event2.setCorrelationData(new Object[]{Double.valueOf(90.34344d), Double.valueOf(20.44345d)});
        event2.setPayloadData(new Object[]{Float.valueOf(2.3f), Double.valueOf(20.44345d)});
        arrayList.add(event2);
        Event event3 = new Event();
        event3.setStreamId("org.wso2.event.sensor.stream:1.0.0");
        event3.setMetaData(new Object[]{19900813115534L, false, 703, "temperature"});
        event3.setCorrelationData(new Object[]{Double.valueOf(90.34344d), Double.valueOf(20.44345d)});
        event3.setPayloadData(new Object[]{Float.valueOf(2.3f), Double.valueOf(20.44345d)});
        arrayList.add(event3);
        try {
            try {
                Assert.assertEquals(wso2EventServer.getMsgCount(), 3L, "Incorrect number of messages consumed!");
                List<Event> preservedEventList = wso2EventServer.getPreservedEventList();
                Iterator<Event> it = preservedEventList.iterator();
                while (it.hasNext()) {
                    it.next().setTimeStamp(0L);
                }
                Assert.assertEquals(preservedEventList, arrayList, "Mapping is incorrect!");
                wso2EventServer.stop();
            } catch (Throwable th) {
                log.error("Exception thrown: " + th.getMessage(), th);
                Assert.fail("Exception: " + th.getMessage());
                wso2EventServer.stop();
            }
        } catch (Throwable th2) {
            wso2EventServer.stop();
            throw th2;
        }
    }

    @Test(groups = {"wso2.cep"}, description = "Testing activemq jms receiver with XML formatted event with default mapping", dependsOnMethods = {"jmsJSONTestWithDefaultMappingScenario"})
    public void jmsXmlTestWithDefaultMappingScenario() throws Exception {
        String str = "inputflows" + File.separator + "sample0011";
        int eventStreamCount = this.eventStreamManagerAdminServiceClient.getEventStreamCount();
        int activeEventReceiverCount = this.eventReceiverAdminServiceClient.getActiveEventReceiverCount();
        int activeEventPublisherCount = this.eventPublisherAdminServiceClient.getActiveEventPublisherCount();
        this.eventStreamManagerAdminServiceClient.addEventStreamAsString(getJSONArtifactConfiguration(str, "org.wso2.event.sensor.stream_1.0.0.json"));
        Assert.assertEquals(this.eventStreamManagerAdminServiceClient.getEventStreamCount(), eventStreamCount + 1);
        this.eventReceiverAdminServiceClient.addEventReceiverConfiguration(getXMLArtifactConfiguration(str, "jmsReceiverXML.xml"));
        Assert.assertEquals(this.eventReceiverAdminServiceClient.getActiveEventReceiverCount(), activeEventReceiverCount + 1);
        this.eventPublisherAdminServiceClient.addEventPublisherConfiguration(getXMLArtifactConfiguration(str, "wso2EventPublisher.xml"));
        Assert.assertEquals(this.eventPublisherAdminServiceClient.getActiveEventPublisherCount(), activeEventPublisherCount + 1);
        Wso2EventServer wso2EventServer = new Wso2EventServer(str, 8461, true);
        new Thread(wso2EventServer).start();
        Thread.sleep(LeaseDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL);
        JMSPublisherClient.publish("topicXML", "xml", str, "topicXML.txt");
        Thread.sleep(2000L);
        this.eventStreamManagerAdminServiceClient.removeEventStream("org.wso2.event.sensor.stream", "1.0.0");
        this.eventReceiverAdminServiceClient.removeInactiveEventReceiverConfiguration("jmsReceiverXML.xml");
        this.eventPublisherAdminServiceClient.removeInactiveEventPublisherConfiguration("wso2EventPublisher.xml");
        Thread.sleep(2000L);
        ArrayList arrayList = new ArrayList();
        Event event = new Event();
        event.setStreamId("org.wso2.event.sensor.stream:1.0.0");
        event.setMetaData(new Object[]{199008131245L, true, 801, "temperature"});
        event.setCorrelationData(new Object[]{Double.valueOf(4.504343d), Double.valueOf(1.23434d)});
        event.setPayloadData(new Object[]{Float.valueOf(6.6f), Double.valueOf(20.44345d)});
        arrayList.add(event);
        Event event2 = new Event();
        event2.setStreamId("org.wso2.event.sensor.stream:1.0.0");
        event2.setMetaData(new Object[]{199008131245L, true, 802, "temperature"});
        event2.setCorrelationData(new Object[]{Double.valueOf(4.504343d), Double.valueOf(1.23434d)});
        event2.setPayloadData(new Object[]{Float.valueOf(6.6f), Double.valueOf(20.44345d)});
        arrayList.add(event2);
        try {
            try {
                Assert.assertEquals(wso2EventServer.getMsgCount(), 2L, "Incorrect number of messages consumed!");
                List<Event> preservedEventList = wso2EventServer.getPreservedEventList();
                Iterator<Event> it = preservedEventList.iterator();
                while (it.hasNext()) {
                    it.next().setTimeStamp(0L);
                }
                Assert.assertEquals(preservedEventList, arrayList, "Mapping is incorrect!");
                wso2EventServer.stop();
            } catch (Throwable th) {
                log.error("Exception thrown: " + th.getMessage(), th);
                Assert.fail("Exception: " + th.getMessage());
                wso2EventServer.stop();
            }
        } catch (Throwable th2) {
            wso2EventServer.stop();
            throw th2;
        }
    }

    @Test(groups = {"wso2.cep"}, description = "Testing activemq jms receiver with JSON formatted event with default mapping", dependsOnMethods = {"jmsXmlTestWithDefaultMappingScenario"})
    public void jmsTextTestWithDefaultMappingScenario() throws Exception {
        String str = "inputflows" + File.separator + "sample0011";
        int eventStreamCount = this.eventStreamManagerAdminServiceClient.getEventStreamCount();
        int activeEventReceiverCount = this.eventReceiverAdminServiceClient.getActiveEventReceiverCount();
        int activeEventPublisherCount = this.eventPublisherAdminServiceClient.getActiveEventPublisherCount();
        this.eventStreamManagerAdminServiceClient.addEventStreamAsString(getJSONArtifactConfiguration(str, "org.wso2.event.sensor.stream_1.0.0.json"));
        Assert.assertEquals(this.eventStreamManagerAdminServiceClient.getEventStreamCount(), eventStreamCount + 1);
        this.eventReceiverAdminServiceClient.addEventReceiverConfiguration(getXMLArtifactConfiguration(str, "jmsReceiverText.xml"));
        Assert.assertEquals(this.eventReceiverAdminServiceClient.getActiveEventReceiverCount(), activeEventReceiverCount + 1);
        this.eventPublisherAdminServiceClient.addEventPublisherConfiguration(getXMLArtifactConfiguration(str, "wso2EventPublisher.xml"));
        Assert.assertEquals(this.eventPublisherAdminServiceClient.getActiveEventPublisherCount(), activeEventPublisherCount + 1);
        Wso2EventServer wso2EventServer = new Wso2EventServer(str, 8461, true);
        new Thread(wso2EventServer).start();
        Thread.sleep(LeaseDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL);
        JMSPublisherClient.publish("topicText", "text", str, "topicText.txt");
        Thread.sleep(2000L);
        this.eventStreamManagerAdminServiceClient.removeEventStream("org.wso2.event.sensor.stream", "1.0.0");
        this.eventReceiverAdminServiceClient.removeInactiveEventReceiverConfiguration("jmsReceiverText.xml");
        this.eventPublisherAdminServiceClient.removeInactiveEventPublisherConfiguration("wso2EventPublisher.xml");
        Thread.sleep(2000L);
        ArrayList arrayList = new ArrayList();
        Event event = new Event();
        event.setStreamId("org.wso2.event.sensor.stream:1.0.0");
        event.setMetaData(new Object[]{19900813115534L, false, 901, "temperature"});
        event.setCorrelationData(new Object[]{Double.valueOf(20.44345d), Double.valueOf(5.443435d)});
        event.setPayloadData(new Object[]{Float.valueOf(8.9f), Double.valueOf(1.23434d)});
        arrayList.add(event);
        Event event2 = new Event();
        event2.setStreamId("org.wso2.event.sensor.stream:1.0.0");
        event2.setMetaData(new Object[]{19900813115534L, false, 902, "temperature"});
        event2.setCorrelationData(new Object[]{Double.valueOf(20.44345d), Double.valueOf(5.443435d)});
        event2.setPayloadData(new Object[]{Float.valueOf(8.9f), Double.valueOf(1.23434d)});
        arrayList.add(event2);
        Event event3 = new Event();
        event3.setStreamId("org.wso2.event.sensor.stream:1.0.0");
        event3.setMetaData(new Object[]{19900813115534L, false, 903, "temperature"});
        event3.setCorrelationData(new Object[]{Double.valueOf(20.44345d), Double.valueOf(5.443435d)});
        event3.setPayloadData(new Object[]{Float.valueOf(8.9f), Double.valueOf(1.23434d)});
        arrayList.add(event3);
        try {
            try {
                Assert.assertEquals(wso2EventServer.getMsgCount(), 3L, "Incorrect number of messages consumed!");
                List<Event> preservedEventList = wso2EventServer.getPreservedEventList();
                Iterator<Event> it = preservedEventList.iterator();
                while (it.hasNext()) {
                    it.next().setTimeStamp(0L);
                }
                Assert.assertEquals(preservedEventList, arrayList, "Mapping is incorrect!");
                wso2EventServer.stop();
            } catch (Throwable th) {
                log.error("Exception thrown: " + th.getMessage(), th);
                Assert.fail("Exception: " + th.getMessage());
                wso2EventServer.stop();
            }
        } catch (Throwable th2) {
            wso2EventServer.stop();
            throw th2;
        }
    }

    @Test(groups = {"wso2.cep"}, description = "Testing jms receiver with jms properties", dependsOnMethods = {"jmsTextTestWithDefaultMappingScenario"})
    public void jmsPropertiesTestWithDefaultMappingScenario() throws Exception {
        String str = "inputflows" + File.separator + "sample0022";
        int eventStreamCount = this.eventStreamManagerAdminServiceClient.getEventStreamCount();
        int activeEventReceiverCount = this.eventReceiverAdminServiceClient.getActiveEventReceiverCount();
        int activeEventPublisherCount = this.eventPublisherAdminServiceClient.getActiveEventPublisherCount();
        this.eventStreamManagerAdminServiceClient.addEventStreamAsString(getJSONArtifactConfiguration(str, "org.wso2.event.sensor.stream_1.0.0.json"));
        Assert.assertEquals(this.eventStreamManagerAdminServiceClient.getEventStreamCount(), eventStreamCount + 1);
        this.eventReceiverAdminServiceClient.addEventReceiverConfiguration(getXMLArtifactConfiguration(str, "jmsReceiver.xml"));
        Assert.assertEquals(this.eventReceiverAdminServiceClient.getActiveEventReceiverCount(), activeEventReceiverCount + 1);
        this.eventPublisherAdminServiceClient.addEventPublisherConfiguration(getXMLArtifactConfiguration(str, "wso2EventPublisher.xml"));
        Assert.assertEquals(this.eventPublisherAdminServiceClient.getActiveEventPublisherCount(), activeEventPublisherCount + 1);
        Wso2EventServer wso2EventServer = new Wso2EventServer(str, 8461, true);
        try {
            new Thread(wso2EventServer).start();
            Thread.sleep(LeaseDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL);
        } catch (Throwable th) {
            log.error("Could not start the agent server ", th);
        }
        this.eventReceiverAdminServiceClient.editEventReceiverConfiguration(getXMLArtifactConfiguration(str, "jmsPropertiesReceiver.xml"), "jmsReceiver");
        Assert.assertEquals(this.eventReceiverAdminServiceClient.getActiveEventReceiverCount(), activeEventReceiverCount + 1);
        Thread.sleep(10000L);
        JMSPublisherClient.publish("topicMap", "csv", str, "topicMap.csv");
        Thread.sleep(LeaseDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL);
        this.eventStreamManagerAdminServiceClient.removeEventStream("org.wso2.event.sensor.stream", "1.0.0");
        this.eventReceiverAdminServiceClient.removeInactiveEventReceiverConfiguration("jmsReceiver.xml");
        this.eventPublisherAdminServiceClient.removeInactiveEventPublisherConfiguration("wso2EventPublisher.xml");
        Thread.sleep(2000L);
        ArrayList arrayList = new ArrayList();
        Event event = new Event();
        event.setStreamId("org.wso2.event.sensor.stream:1.0.0");
        event.setMetaData(new Object[]{19900813115534L, false, 601, "temperature"});
        event.setCorrelationData(new Object[]{Double.valueOf(90.34344d), Double.valueOf(20.44345d)});
        event.setPayloadData(new Object[]{Float.valueOf(2.3f), Double.valueOf(20.44345d)});
        arrayList.add(event);
        Event event2 = new Event();
        event2.setStreamId("org.wso2.event.sensor.stream:1.0.0");
        event2.setMetaData(new Object[]{19900813115534L, false, 602, "temperature"});
        event2.setCorrelationData(new Object[]{Double.valueOf(90.34344d), Double.valueOf(20.44345d)});
        event2.setPayloadData(new Object[]{Float.valueOf(2.3f), Double.valueOf(20.44345d)});
        arrayList.add(event2);
        Event event3 = new Event();
        event3.setStreamId("org.wso2.event.sensor.stream:1.0.0");
        event3.setMetaData(new Object[]{19900813115534L, false, 603, "temperature"});
        event3.setCorrelationData(new Object[]{Double.valueOf(90.34344d), Double.valueOf(20.44345d)});
        event3.setPayloadData(new Object[]{Float.valueOf(2.3f), Double.valueOf(20.44345d)});
        arrayList.add(event3);
        try {
            try {
                Assert.assertEquals(wso2EventServer.getMsgCount(), 3L, "Incorrect number of messages consumed!");
                List<Event> preservedEventList = wso2EventServer.getPreservedEventList();
                Iterator<Event> it = preservedEventList.iterator();
                while (it.hasNext()) {
                    it.next().setTimeStamp(0L);
                }
                Assert.assertEquals(preservedEventList, arrayList, "Mapping is incorrect!");
            } finally {
                try {
                    wso2EventServer.stop();
                } catch (Throwable th2) {
                    log.error("Could not stop the agent server ", th2);
                }
            }
        } catch (Throwable th3) {
            log.error("Exception thrown: " + th3.getMessage(), th3);
            Assert.fail("Exception: " + th3.getMessage());
            try {
                wso2EventServer.stop();
            } catch (Throwable th4) {
                log.error("Could not stop the agent server ", th4);
            }
        }
    }

    @AfterClass(alwaysRun = true)
    public void destroy() throws Exception {
        try {
            Thread.sleep(LeaseDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL);
            if (this.activeMqBroker != null) {
                this.activeMqBroker.stop();
            }
            Thread.sleep(LeaseDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL);
            if (this.serverManager != null) {
                this.serverManager.removeFromComponentLib("activemq-core-5.7.0.jar");
                this.serverManager.removeFromComponentLib("geronimo-j2ee-management_1.1_spec-1.0.1.jar");
                this.serverManager.restoreToLastConfiguration();
            }
            super.cleanup();
        } catch (Throwable th) {
            if (this.serverManager != null) {
                this.serverManager.removeFromComponentLib("activemq-core-5.7.0.jar");
                this.serverManager.removeFromComponentLib("geronimo-j2ee-management_1.1_spec-1.0.1.jar");
                this.serverManager.restoreToLastConfiguration();
            }
            throw th;
        }
    }

    private void setupActiveMQBroker() {
        this.activeMqBroker = new JMSBrokerController(BrokerService.DEFAULT_BROKER_NAME, getJMSBrokerConfiguration());
        if (JMSBrokerController.isBrokerStarted()) {
            return;
        }
        Assert.assertTrue(this.activeMqBroker.start(), "ActiveMQ Broker starting failed");
    }

    private JMSBrokerConfiguration getJMSBrokerConfiguration() {
        return JMSBrokerConfigurationProvider.getInstance().getBrokerConfiguration();
    }
}
