package org.wso2.carbon.integration.test.storm;

import java.io.File;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
import org.apache.activemq.transport.stomp.Stomp;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.wso2.carbon.automation.engine.context.TestUserMode;
import org.wso2.carbon.automation.engine.frameworkutils.FrameworkPathUtil;
import org.wso2.carbon.automation.test.utils.common.TestConfigurationProvider;
import org.wso2.carbon.event.processor.common.storm.manager.service.StormManagerService;
import org.wso2.carbon.integration.common.utils.mgt.ServerConfigurationManager;
import org.wso2.cep.integration.common.utils.CEPIntegrationTest;

/* loaded from: input_file:org/wso2/carbon/integration/test/storm/StormManagerServiceTestCase.class */
public class StormManagerServiceTestCase extends CEPIntegrationTest {
    private static final String EVENT_PROCESSING_FILE = "event-processor.xml";
    private static final String AXIS2_XML_FILE = "axis2.xml";
    private static final String RESOURCE_LOCATION = TestConfigurationProvider.getResourceLocation() + File.separator + "artifacts" + File.separator + "CEP" + File.separator + "StormTestCase" + File.separator + "ManagerServiceTestCase";
    private static final String CARBON_HOME = FrameworkPathUtil.getCarbonHome();
    private static Log log = LogFactory.getLog(StormManagerServiceTestCase.class);
    private ServerConfigurationManager serverConfigManager;

    @BeforeClass(alwaysRun = true)
    public void init() throws Exception {
        log.info("Initializing CEP server to act as storm manager");
        super.init(TestUserMode.SUPER_TENANT_ADMIN);
        this.serverConfigManager = new ServerConfigurationManager(this.cepServer);
        log.info("Replacing event-processor.xml");
        this.serverConfigManager.applyConfigurationWithoutRestart(new File(RESOURCE_LOCATION + File.separator + EVENT_PROCESSING_FILE), new File(CARBON_HOME + File.separator + "repository" + File.separator + "conf" + File.separator + EVENT_PROCESSING_FILE), true);
        log.info("Replacing axis2.xml");
        this.serverConfigManager.applyConfigurationWithoutRestart(new File(RESOURCE_LOCATION + File.separator + AXIS2_XML_FILE), new File(CARBON_HOME + File.separator + "repository" + File.separator + "conf" + File.separator + "axis2" + File.separator + AXIS2_XML_FILE), true);
        log.info("Restarting CEP server");
        this.serverConfigManager.restartGracefully();
        Thread.sleep(LeaseDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL);
        log.info("Initialization completed");
    }

    private StormManagerService.Client createManagerServiceClient() throws TTransportException {
        TSocket tSocket = new TSocket(BrokerService.DEFAULT_BROKER_NAME, 8904);
        TBinaryProtocol tBinaryProtocol = new TBinaryProtocol(tSocket);
        tSocket.open();
        return new StormManagerService.Client(tBinaryProtocol);
    }

    @AfterClass(alwaysRun = true)
    public void destroy() throws Exception {
        this.serverConfigManager.restoreToLastConfiguration();
        super.cleanup();
    }

    @Test(groups = {"wso2.cep"}, description = "Single Storm Publisher and CEP publisher for a single execution plan on same host")
    public void singleReceiverAndPublisherInSameHost() throws Exception {
        StormManagerService.Client createManagerServiceClient = createManagerServiceClient();
        createManagerServiceClient.registerCEPPublisher(1234, "Test1ExecutionPlan", "127.0.0.1", 15000);
        String[] split = createManagerServiceClient.getCEPPublisher(1234, "Test1ExecutionPlan", "127.0.0.1").split(Stomp.Headers.SEPERATOR);
        Assert.assertEquals(split[0], "127.0.0.1");
        Assert.assertEquals(split[1], "15000");
        createManagerServiceClient.registerStormReceiver(1234, "Test1ExecutionPlan", "127.0.0.1", 16000);
        String[] split2 = createManagerServiceClient.getStormReceiver(1234, "Test1ExecutionPlan", "127.0.0.1").split(Stomp.Headers.SEPERATOR);
        Assert.assertEquals(split2[0], "127.0.0.1");
        Assert.assertEquals(split2[1], "16000");
    }

    @Test(groups = {"wso2.cep"}, description = "Single Receiver and Publisher for a single execution plan on different hosts")
    public void singleReceiverAndPublisherInDifferentHosts() throws Exception {
        StormManagerService.Client createManagerServiceClient = createManagerServiceClient();
        createManagerServiceClient.registerCEPPublisher(1234, "Test2ExecutionPlan", "127.0.0.1", 15000);
        createManagerServiceClient.registerCEPPublisher(1234, "Test2ExecutionPlan", "127.10.0.1", 15001);
        String[] split = createManagerServiceClient.getCEPPublisher(1234, "Test2ExecutionPlan", "127.0.0.1").split(Stomp.Headers.SEPERATOR);
        Assert.assertEquals(split[0], "127.0.0.1");
        Assert.assertEquals(split[1], "15000");
        String[] split2 = createManagerServiceClient.getCEPPublisher(1234, "Test2ExecutionPlan", "127.10.0.1").split(Stomp.Headers.SEPERATOR);
        Assert.assertEquals(split2[0], "127.10.0.1");
        Assert.assertEquals(split2[1], "15001");
        createManagerServiceClient.registerStormReceiver(1234, "Test2ExecutionPlan", "127.0.0.1", 16000);
        createManagerServiceClient.registerStormReceiver(1234, "Test2ExecutionPlan", "127.10.0.1", 16001);
        String[] split3 = createManagerServiceClient.getStormReceiver(1234, "Test2ExecutionPlan", "127.0.0.1").split(Stomp.Headers.SEPERATOR);
        Assert.assertEquals(split3[0], "127.0.0.1");
        Assert.assertEquals(split3[1], "16000");
        String[] split4 = createManagerServiceClient.getStormReceiver(1234, "Test2ExecutionPlan", "127.10.0.1").split(Stomp.Headers.SEPERATOR);
        Assert.assertEquals(split4[0], "127.10.0.1");
        Assert.assertEquals(split4[1], "16001");
    }

    @Test(groups = {"wso2.cep"}, description = "Multiple storm receivers and multiple CEP publishers for a single execution plan on same host")
    public void twoReceiversAndTwoPublishersInSameHost() throws Exception {
        StormManagerService.Client createManagerServiceClient = createManagerServiceClient();
        createManagerServiceClient.registerCEPPublisher(4321, "Test3ExecutionPlan", "127.0.0.1", 25000);
        createManagerServiceClient.registerCEPPublisher(4321, "Test3ExecutionPlan", "127.0.0.1", 25001);
        Assert.assertNotEquals(createManagerServiceClient.getCEPPublisher(4321, "Test3ExecutionPlan", "127.0.0.1").split(Stomp.Headers.SEPERATOR)[1], createManagerServiceClient.getCEPPublisher(4321, "Test3ExecutionPlan", "127.0.0.1").split(Stomp.Headers.SEPERATOR)[1]);
        createManagerServiceClient.registerStormReceiver(1234, "Test3ExecutionPlan", "127.0.0.1", 26000);
        createManagerServiceClient.registerStormReceiver(1234, "Test3ExecutionPlan", "127.0.0.1", 26001);
        Assert.assertNotEquals(createManagerServiceClient.getStormReceiver(1234, "Test3ExecutionPlan", "127.0.0.1").split(Stomp.Headers.SEPERATOR)[1], createManagerServiceClient.getStormReceiver(1234, "Test3ExecutionPlan", "127.0.0.1").split(Stomp.Headers.SEPERATOR)[1]);
    }

    @Test(groups = {"wso2.cep"}, description = "Single Receiver and Publisher for a Two execution plans on different hosts")
    public void singleReceiverAndPublisherForDifferentExecutionPlans() throws Exception {
        StormManagerService.Client createManagerServiceClient = createManagerServiceClient();
        createManagerServiceClient.registerCEPPublisher(1234, "Test4ExecutionPlan1", "127.0.0.1", 15000);
        createManagerServiceClient.registerCEPPublisher(1234, "Test4ExecutionPlan2", "127.0.0.1", 15001);
        String[] split = createManagerServiceClient.getCEPPublisher(1234, "Test4ExecutionPlan1", "127.0.0.1").split(Stomp.Headers.SEPERATOR);
        Assert.assertEquals(split[0], "127.0.0.1");
        Assert.assertEquals(split[1], "15000");
        String[] split2 = createManagerServiceClient.getCEPPublisher(1234, "Test4ExecutionPlan2", "127.0.0.1").split(Stomp.Headers.SEPERATOR);
        Assert.assertEquals(split2[0], "127.0.0.1");
        Assert.assertEquals(split2[1], "15001");
        createManagerServiceClient.registerStormReceiver(1234, "Test4ExecutionPlan1", "127.0.0.1", 16000);
        createManagerServiceClient.registerStormReceiver(1234, "Test4ExecutionPlan2", "127.0.0.1", 16001);
        String[] split3 = createManagerServiceClient.getStormReceiver(1234, "Test4ExecutionPlan1", "127.0.0.1").split(Stomp.Headers.SEPERATOR);
        Assert.assertEquals(split3[0], "127.0.0.1");
        Assert.assertEquals(split3[1], "16000");
        String[] split4 = createManagerServiceClient.getStormReceiver(1234, "Test4ExecutionPlan2", "127.0.0.1").split(Stomp.Headers.SEPERATOR);
        Assert.assertEquals(split4[0], "127.0.0.1");
        Assert.assertEquals(split4[1], "16001");
    }

    @Test(groups = {"wso2.cep"}, description = "Single Receiver and Publisher for a Two execution plans on different hosts")
    public void singleReceiverAndPublisherForDifferentTenants() throws Exception {
        StormManagerService.Client createManagerServiceClient = createManagerServiceClient();
        createManagerServiceClient.registerCEPPublisher(1234, "Test5ExecutionPlan", "127.0.0.1", 15000);
        createManagerServiceClient.registerCEPPublisher(-1234, "Test5ExecutionPlan", "127.0.0.1", 15001);
        String[] split = createManagerServiceClient.getCEPPublisher(1234, "Test5ExecutionPlan", "127.0.0.1").split(Stomp.Headers.SEPERATOR);
        Assert.assertEquals(split[0], "127.0.0.1");
        Assert.assertEquals(split[1], "15000");
        String[] split2 = createManagerServiceClient.getCEPPublisher(-1234, "Test5ExecutionPlan", "127.0.0.1").split(Stomp.Headers.SEPERATOR);
        Assert.assertEquals(split2[0], "127.0.0.1");
        Assert.assertEquals(split2[1], "15001");
        createManagerServiceClient.registerStormReceiver(1234, "Test5ExecutionPlan", "127.0.0.1", 36000);
        createManagerServiceClient.registerStormReceiver(-1234, "Test5ExecutionPlan", "127.0.0.1", 36001);
        String[] split3 = createManagerServiceClient.getStormReceiver(1234, "Test5ExecutionPlan", "127.0.0.1").split(Stomp.Headers.SEPERATOR);
        Assert.assertEquals(split3[0], "127.0.0.1");
        Assert.assertEquals(split3[1], "36000");
        String[] split4 = createManagerServiceClient.getStormReceiver(-1234, "Test5ExecutionPlan", "127.0.0.1").split(Stomp.Headers.SEPERATOR);
        Assert.assertEquals(split4[0], "127.0.0.1");
        Assert.assertEquals(split4[1], "36001");
    }
}
