package org.wso2.carbon.integration.test.storm;

import java.rmi.RemoteException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.security.SecurityAdminMBean;
import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
import org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgent;
import org.apache.activemq.transport.stomp.Stomp;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.wso2.carbon.automation.engine.context.AutomationContext;
import org.wso2.carbon.automation.engine.context.TestUserMode;
import org.wso2.carbon.automation.engine.context.beans.Instance;
import org.wso2.carbon.event.stream.stub.types.EventStreamAttributeDto;
import org.wso2.carbon.event.stream.stub.types.EventStreamDefinitionDto;
import org.wso2.carbon.integration.common.utils.LoginLogoutClient;
import org.wso2.carbon.integration.test.client.AnalyticStatClient;
import org.wso2.carbon.integration.test.client.StockQuoteClient;
import org.wso2.carbon.integration.test.client.Wso2EventServer;
import org.wso2.cep.integration.common.utils.CEPIntegrationTest;

/* loaded from: input_file:org/wso2/carbon/integration/test/storm/StormTestCase.class */
public class StormTestCase extends CEPIntegrationTest {
    private static Log log = LogFactory.getLog(StormTestCase.class);
    private AutomationContext automationContext;
    private Map<String, Instance> instanceMap;
    private Map<String, AutomationContext> contextMap;

    @BeforeClass(alwaysRun = true)
    public void init() throws Exception {
        super.init();
        this.automationContext = new AutomationContext("CEP", TestUserMode.SUPER_TENANT_ADMIN);
        this.instanceMap = this.automationContext.getProductGroup().getInstanceMap();
        this.contextMap = new HashMap();
        if (this.instanceMap != null && this.instanceMap.size() > 0) {
            Iterator<Map.Entry<String, Instance>> it = this.instanceMap.entrySet().iterator();
            while (it.hasNext()) {
                String key = it.next().getKey();
                this.contextMap.put(key, new AutomationContext("CEP", key, TestUserMode.SUPER_TENANT_ADMIN));
                log.info(key);
            }
        }
        log.info("Cluster instance loading");
    }

    @Test(groups = {"wso2.cep"}, description = "Test CEP Storm integration for single query setup one member ")
    public void testSingleQueryTopologyOneMember() throws Exception {
        configureNode(this.contextMap.get("cep001"));
        Wso2EventServer wso2EventServer = new Wso2EventServer("StormTestCase", 8621, true);
        new Thread(wso2EventServer).start();
        Thread.sleep(LeaseDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL);
        AnalyticStatClient.publish((String) this.contextMap.get("cep001").getInstance().getHosts().get(MulticastDiscoveryAgent.DEFAULT_HOST_STR), String.valueOf(8411), SecurityAdminMBean.OPERATION_ADMIN, SecurityAdminMBean.OPERATION_ADMIN, 100);
        StockQuoteClient.publish((String) this.contextMap.get("cep001").getInstance().getHosts().get(MulticastDiscoveryAgent.DEFAULT_HOST_STR), String.valueOf(8411), SecurityAdminMBean.OPERATION_ADMIN, SecurityAdminMBean.OPERATION_ADMIN, 100);
        Thread.sleep(BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC);
        Assert.assertTrue(wso2EventServer.getMsgCount() > 0);
        wso2EventServer.stop();
    }

    @Test(groups = {"wso2.cep"}, description = "Test CEP Storm integration for single query setup")
    public void testSingleQueryTopology() throws Exception {
        configureNode(this.contextMap.get("cep002"));
        configureNode(this.contextMap.get("cep003"));
        Wso2EventServer wso2EventServer = new Wso2EventServer("StormTestCase", 8621, true);
        new Thread(wso2EventServer).start();
        Thread.sleep(LeaseDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL);
        AnalyticStatClient.publish((String) this.contextMap.get("cep002").getInstance().getHosts().get(MulticastDiscoveryAgent.DEFAULT_HOST_STR), (String) this.contextMap.get("cep002").getInstance().getPorts().get("thrift_receiver"), SecurityAdminMBean.OPERATION_ADMIN, SecurityAdminMBean.OPERATION_ADMIN, 100);
        StockQuoteClient.publish((String) this.contextMap.get("cep002").getInstance().getHosts().get(MulticastDiscoveryAgent.DEFAULT_HOST_STR), (String) this.contextMap.get("cep002").getInstance().getPorts().get("thrift_receiver"), SecurityAdminMBean.OPERATION_ADMIN, SecurityAdminMBean.OPERATION_ADMIN, 100);
        Thread.sleep(BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC);
        Assert.assertTrue(wso2EventServer.getMsgCount() > 0);
        wso2EventServer.stop();
    }

    private void configureNode(AutomationContext automationContext) throws Exception {
        String backEndUrl = automationContext.getContextUrls().getBackEndUrl();
        String login = new LoginLogoutClient(automationContext).login();
        this.eventReceiverAdminServiceClient = this.configurationUtil.getEventReceiverAdminServiceClient(backEndUrl, login);
        this.eventPublisherAdminServiceClient = this.configurationUtil.getEventPublisherAdminServiceClient(backEndUrl, login);
        this.eventProcessorAdminServiceClient = this.configurationUtil.getEventProcessorAdminServiceClient(backEndUrl, login);
        this.eventStreamManagerAdminServiceClient = this.configurationUtil.getEventStreamManagerAdminServiceClient(backEndUrl, login);
        log.info("Adding stream definitions");
        defineStreams();
        log.info("Adding event receiver analyticsWso2EventReceiver");
        addEventReceiver("analyticsWso2EventReceiver.xml");
        log.info("Adding event receiver stockQuoteWso2EventReceiver");
        addEventReceiver("stockQuoteWso2EventReceiver.xml");
        log.info("Adding event publisher fortuneCompanyWSO2EventPublisher");
        addEventPublisher("fortuneCompanyWSO2EventPublisher.xml");
        log.info("Adding execution plan");
        addExecutionPlan("PreprocessStats.siddhiql");
    }

    private void addExecutionPlan(String str) throws Exception {
        int activeExecutionPlanConfigurationCount = this.eventProcessorAdminServiceClient.getActiveExecutionPlanConfigurationCount();
        this.eventProcessorAdminServiceClient.addExecutionPlan(getExecutionPlanFromFile("StormTestCase", str));
        Thread.sleep(3000L);
        Assert.assertEquals(this.eventProcessorAdminServiceClient.getActiveExecutionPlanConfigurationCount(), activeExecutionPlanConfigurationCount + 1);
    }

    private void addEventReceiver(String str) throws Exception {
        int activeEventReceiverCount = this.eventReceiverAdminServiceClient.getActiveEventReceiverCount();
        this.eventReceiverAdminServiceClient.addEventReceiverConfiguration(getXMLArtifactConfiguration("StormTestCase", str));
        Thread.sleep(3000L);
        Assert.assertEquals(this.eventReceiverAdminServiceClient.getActiveEventReceiverCount(), activeEventReceiverCount + 1);
    }

    private void addEventPublisher(String str) throws Exception {
        int eventPublisherCount = this.eventPublisherAdminServiceClient.getEventPublisherCount();
        this.eventPublisherAdminServiceClient.addEventPublisherConfiguration(getXMLArtifactConfiguration("StormTestCase", str));
        Thread.sleep(3000L);
        Assert.assertEquals(this.eventPublisherAdminServiceClient.getActiveEventPublisherCount(), eventPublisherCount + 1);
    }

    private void defineStreams() throws RemoteException, InterruptedException {
        log.info("Adding analytic statistic stream definition");
        int eventStreamCount = this.eventStreamManagerAdminServiceClient.getEventStreamCount();
        EventStreamDefinitionDto eventStreamDefinitionDto = new EventStreamDefinitionDto();
        EventStreamAttributeDto eventStreamAttributeDto = new EventStreamAttributeDto();
        eventStreamAttributeDto.setAttributeName("ipAdd");
        eventStreamAttributeDto.setAttributeType("string");
        EventStreamAttributeDto eventStreamAttributeDto2 = new EventStreamAttributeDto();
        eventStreamAttributeDto2.setAttributeName("index");
        eventStreamAttributeDto2.setAttributeType("long");
        EventStreamAttributeDto eventStreamAttributeDto3 = new EventStreamAttributeDto();
        eventStreamAttributeDto3.setAttributeName(Stomp.Headers.Message.TIMESTAMP);
        eventStreamAttributeDto3.setAttributeType("long");
        EventStreamAttributeDto eventStreamAttributeDto4 = new EventStreamAttributeDto();
        eventStreamAttributeDto4.setAttributeName("nanoTime");
        eventStreamAttributeDto4.setAttributeType("long");
        EventStreamAttributeDto[] eventStreamAttributeDtoArr = {eventStreamAttributeDto, eventStreamAttributeDto2, eventStreamAttributeDto3, eventStreamAttributeDto4};
        EventStreamAttributeDto eventStreamAttributeDto5 = new EventStreamAttributeDto();
        eventStreamAttributeDto5.setAttributeName("userID");
        eventStreamAttributeDto5.setAttributeType("string");
        EventStreamAttributeDto eventStreamAttributeDto6 = new EventStreamAttributeDto();
        eventStreamAttributeDto6.setAttributeName("searchTerms");
        eventStreamAttributeDto6.setAttributeType("string");
        eventStreamDefinitionDto.setName(AnalyticStatClient.STREAM_NAME1);
        eventStreamDefinitionDto.setVersion("1.3.0");
        eventStreamDefinitionDto.setMetaData(eventStreamAttributeDtoArr);
        eventStreamDefinitionDto.setCorrelationData((EventStreamAttributeDto[]) null);
        eventStreamDefinitionDto.setPayloadData(new EventStreamAttributeDto[]{eventStreamAttributeDto5, eventStreamAttributeDto6});
        eventStreamDefinitionDto.setDescription(Stomp.EMPTY);
        eventStreamDefinitionDto.setNickName(Stomp.EMPTY);
        this.eventStreamManagerAdminServiceClient.addEventStreamAsDTO(eventStreamDefinitionDto);
        Thread.sleep(1000L);
        Assert.assertEquals(this.eventStreamManagerAdminServiceClient.getEventStreamCount(), eventStreamCount + 1);
        log.info("Adding stock-quote stream definition");
        EventStreamAttributeDto eventStreamAttributeDto7 = new EventStreamAttributeDto();
        eventStreamAttributeDto7.setAttributeName("price");
        eventStreamAttributeDto7.setAttributeType("int");
        EventStreamAttributeDto eventStreamAttributeDto8 = new EventStreamAttributeDto();
        eventStreamAttributeDto8.setAttributeName("symbol");
        eventStreamAttributeDto8.setAttributeType("string");
        EventStreamAttributeDto[] eventStreamAttributeDtoArr2 = {eventStreamAttributeDto7, eventStreamAttributeDto8};
        EventStreamDefinitionDto eventStreamDefinitionDto2 = new EventStreamDefinitionDto();
        eventStreamDefinitionDto2.setName(StockQuoteClient.STREAM_NAME1);
        eventStreamDefinitionDto2.setVersion("1.3.0");
        eventStreamDefinitionDto2.setMetaData((EventStreamAttributeDto[]) null);
        eventStreamDefinitionDto2.setCorrelationData((EventStreamAttributeDto[]) null);
        eventStreamDefinitionDto2.setPayloadData(eventStreamAttributeDtoArr2);
        eventStreamDefinitionDto2.setDescription(Stomp.EMPTY);
        eventStreamDefinitionDto2.setNickName(Stomp.EMPTY);
        this.eventStreamManagerAdminServiceClient.addEventStreamAsDTO(eventStreamDefinitionDto2);
        Thread.sleep(1000L);
        Assert.assertEquals(this.eventStreamManagerAdminServiceClient.getEventStreamCount(), eventStreamCount + 2);
        log.info("Adding fortune company stream definition");
        EventStreamAttributeDto eventStreamAttributeDto9 = new EventStreamAttributeDto();
        eventStreamAttributeDto9.setAttributeName("price");
        eventStreamAttributeDto9.setAttributeType("int");
        EventStreamAttributeDto eventStreamAttributeDto10 = new EventStreamAttributeDto();
        eventStreamAttributeDto10.setAttributeName("symbol");
        eventStreamAttributeDto10.setAttributeType("string");
        EventStreamAttributeDto eventStreamAttributeDto11 = new EventStreamAttributeDto();
        eventStreamAttributeDto11.setAttributeName("count");
        eventStreamAttributeDto11.setAttributeType("long");
        EventStreamAttributeDto[] eventStreamAttributeDtoArr3 = {eventStreamAttributeDto9, eventStreamAttributeDto10, eventStreamAttributeDto11};
        EventStreamDefinitionDto eventStreamDefinitionDto3 = new EventStreamDefinitionDto();
        eventStreamDefinitionDto3.setName("fortuneCompanyStream");
        eventStreamDefinitionDto3.setVersion("1.0.0");
        eventStreamDefinitionDto3.setMetaData((EventStreamAttributeDto[]) null);
        eventStreamDefinitionDto3.setCorrelationData((EventStreamAttributeDto[]) null);
        eventStreamDefinitionDto3.setPayloadData(eventStreamAttributeDtoArr3);
        eventStreamDefinitionDto3.setDescription(Stomp.EMPTY);
        eventStreamDefinitionDto3.setNickName(Stomp.EMPTY);
        this.eventStreamManagerAdminServiceClient.addEventStreamAsDTO(eventStreamDefinitionDto3);
        Thread.sleep(1000L);
        Assert.assertEquals(this.eventStreamManagerAdminServiceClient.getEventStreamCount(), eventStreamCount + 3);
    }

    @AfterClass(alwaysRun = true)
    public void clean() throws Exception {
        this.eventPublisherAdminServiceClient.removeActiveEventPublisherConfiguration("fortuneCompanyWSO2EventPublisher");
        this.eventProcessorAdminServiceClient.removeActiveExecutionPlan("PreprocessStats");
        this.eventReceiverAdminServiceClient.removeActiveEventReceiverConfiguration("analyticsWso2EventReceiver");
        this.eventReceiverAdminServiceClient.removeActiveEventReceiverConfiguration("stockQuoteWso2EventReceiver");
        this.eventStreamManagerAdminServiceClient.removeEventStream(AnalyticStatClient.STREAM_NAME1, "1.3.0");
        this.eventStreamManagerAdminServiceClient.removeEventStream(StockQuoteClient.STREAM_NAME1, "1.3.0");
        this.eventStreamManagerAdminServiceClient.removeEventStream("fortuneCompanyStream", "1.0.0");
    }

    @AfterClass(alwaysRun = true)
    public void destroy() throws Exception {
        super.cleanup();
    }
}
