package org.wso2.analytics.apim.integration.tests.apim.incrementalprocessing;

import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.wso2.analytics.apim.integration.tests.apim.analytics.APIMAnalyticsBaseTestCase;

/* loaded from: input_file:org/wso2/analytics/apim/integration/tests/apim/incrementalprocessing/IncrementalProcessingTestCase.class */
public class IncrementalProcessingTestCase extends APIMAnalyticsBaseTestCase {
    private static final String REQUESTS_PER_MINUTE_STREAM_NAME = "org.wso2.apimgt.statistics.perMinuteRequest";
    private static final String RESPONSES_PER_MINUTE_STREAM_NAME = "org.wso2.apimgt.statistics.perMinuteResponse";
    private static final String EXECUTION_TIMES_PER_MINUTE_STREAM_NAME = "org.wso2.apimgt.statistics.perMinuteExecutionTimes";
    private static final String REQUESTS_PER_MINUTE_STREAM_VERSION = "1.0.0";
    private static final String RESPONSES_PER_MINUTE_STREAM_VERSION = "1.0.0";
    private static final String EXECUTION_TIMES_PER_MINUTE_STREAM_VERSION = "1.0.0";
    private static final String TEST_RESOURCE_PATH = "incrementalProcessing";
    private static final String TEST_REQUESTS_FILE = "requests.csv";
    private static final String TEST_RESPONSES_FILE = "responses.csv";
    private static final String TEST_EXECUTION_TIMES_FILE = "executionTimes.csv";
    private static final String SPARK_SCRIPT = "APIM_INCREMENTAL_PROCESSING_SCRIPT";
    private static final String ORG_WSO2_APIMGT_STATISTICS_PERMINUTEREQUEST = "ORG_WSO2_APIMGT_STATISTICS_PERMINUTEREQUEST";
    private static final String ORG_WSO2_APIMGT_STATISTICS_PERHOURREQUEST = "ORG_WSO2_APIMGT_STATISTICS_PERHOURREQUEST";
    private static final String ORG_WSO2_APIMGT_STATISTICS_PERDAYREQUEST = "ORG_WSO2_APIMGT_STATISTICS_PERDAYREQUEST";
    private static final String ORG_WSO2_APIMGT_STATISTICS_PERMINUTERESPONSE = "ORG_WSO2_APIMGT_STATISTICS_PERMINUTERESPONSE";
    private static final String ORG_WSO2_APIMGT_STATISTICS_PERHOURRESPONSE = "ORG_WSO2_APIMGT_STATISTICS_PERHOURRESPONSE";
    private static final String ORG_WSO2_APIMGT_STATISTICS_PERDAYRESPONSE = "ORG_WSO2_APIMGT_STATISTICS_PERDAYRESPONSE";
    private static final String ORG_WSO2_APIMGT_STATISTICS_PERMINUTEEXECUTIONTIMES = "ORG_WSO2_APIMGT_STATISTICS_PERMINUTEEXECUTIONTIMES";
    private static final String ORG_WSO2_APIMGT_STATISTICS_PERHOUREXECUTIONTIMES = "ORG_WSO2_APIMGT_STATISTICS_PERHOUREXECUTIONTIMES";
    private static final String ORG_WSO2_APIMGT_STATISTICS_PERDAYEXECUTIONTIMES = "ORG_WSO2_APIMGT_STATISTICS_PERDAYEXECUTIONTIMES";
    private final int MAX_TRIES = 20;

    @BeforeClass(alwaysRun = true)
    public void setup() throws Exception {
        super.init();
        purgeData();
        Thread.sleep(1000L);
    }

    @AfterClass(alwaysRun = true)
    public void cleanup() throws Exception {
        purgeData();
    }

    @Test(groups = {"wso2.analytics.apim"}, description = "Tests if the Spark script is deployed")
    public void testIncrementalProcessingSparkScriptDeployment() throws Exception {
        Assert.assertTrue(isSparkScriptExists(SPARK_SCRIPT), "APIM_INCREMENTAL_PROCESSING_SCRIPT spark script is not deployed.");
    }

    @Test(groups = {"wso2.analytics.apim"}, description = "Test if the Simulation data has been published", dependsOnMethods = {"testIncrementalProcessingSparkScriptDeployment"})
    public void testInitialSimulationDataSent() throws Exception {
        pubishEventsFromCSV(TEST_RESOURCE_PATH, TEST_REQUESTS_FILE, getStreamId(REQUESTS_PER_MINUTE_STREAM_NAME, "1.0.0"), 1L);
        Thread.sleep(5000L);
        pubishEventsFromCSV(TEST_RESOURCE_PATH, TEST_RESPONSES_FILE, getStreamId(RESPONSES_PER_MINUTE_STREAM_NAME, "1.0.0"), 1L);
        Thread.sleep(5000L);
        pubishEventsFromCSV(TEST_RESOURCE_PATH, TEST_EXECUTION_TIMES_FILE, getStreamId(EXECUTION_TIMES_PER_MINUTE_STREAM_NAME, "1.0.0"), 1L);
        Thread.sleep(5000L);
        int i = 0;
        boolean z = false;
        long j = 0;
        while (i < 20) {
            j = getRecordCount(-1234, "ORG_WSO2_APIMGT_STATISTICS_PERMINUTEREQUEST");
            z = j >= 4;
            if (z) {
                break;
            }
            i++;
            Thread.sleep(10000L);
        }
        Assert.assertTrue(z, "Simulation events did not get published to requests per minute stream, expected entry count:4 but found: " + j + "!");
        int i2 = 0;
        boolean z2 = false;
        long j2 = 0;
        while (i2 < 20) {
            j2 = getRecordCount(-1234, ORG_WSO2_APIMGT_STATISTICS_PERMINUTERESPONSE);
            z2 = j2 >= 4;
            if (z2) {
                break;
            }
            i2++;
            Thread.sleep(10000L);
        }
        Assert.assertTrue(z2, "Simulation events did not get published to responses per minute stream, expected entry count:4 but found: " + j2 + "!");
        int i3 = 0;
        boolean z3 = false;
        long j3 = 0;
        while (i3 < 20) {
            j3 = getRecordCount(-1234, ORG_WSO2_APIMGT_STATISTICS_PERMINUTEEXECUTIONTIMES);
            z3 = j3 >= 4;
            if (z3) {
                break;
            }
            i3++;
            Thread.sleep(10000L);
        }
        Assert.assertTrue(z3, "Simulation events did not get published to execution times per minute stream, expected entry count:4 but found: " + j3 + "!");
    }

    @Test(groups = {"wso2.analytics.apim"}, description = "Test APIM_INCREMENTAL_PROCESSING_SCRIPT Spark Script execution", dependsOnMethods = {"testInitialSimulationDataSent"})
    public void testIncrementalProcessingSparkScriptExecution() throws Exception {
        executeSparkScript(SPARK_SCRIPT);
        Assert.assertTrue(isRecordExists(-1234, ORG_WSO2_APIMGT_STATISTICS_PERHOURREQUEST, 20), "Spark script did not execute as expected, No entries found for table ORG_WSO2_APIMGT_STATISTICS_PERHOURREQUEST!");
        Assert.assertTrue(isRecordExists(-1234, ORG_WSO2_APIMGT_STATISTICS_PERDAYREQUEST, 20), "Spark script did not execute as expected, No entries found for table ORG_WSO2_APIMGT_STATISTICS_PERDAYREQUEST!");
        Assert.assertTrue(isRecordExists(-1234, ORG_WSO2_APIMGT_STATISTICS_PERHOURRESPONSE, 20), "Spark script did not execute as expected, No entries found for table ORG_WSO2_APIMGT_STATISTICS_PERHOURRESPONSE!");
        Assert.assertTrue(isRecordExists(-1234, ORG_WSO2_APIMGT_STATISTICS_PERDAYRESPONSE, 20), "Spark script did not execute as expected, No entries found for table ORG_WSO2_APIMGT_STATISTICS_PERDAYRESPONSE!");
        Assert.assertTrue(isRecordExists(-1234, ORG_WSO2_APIMGT_STATISTICS_PERHOUREXECUTIONTIMES, 20), "Spark script did not execute as expected, No entries found for table ORG_WSO2_APIMGT_STATISTICS_PERHOUREXECUTIONTIMES!");
        Assert.assertTrue(isRecordExists(-1234, ORG_WSO2_APIMGT_STATISTICS_PERDAYEXECUTIONTIMES, 20), "Spark script did not execute as expected, No entries found for table ORG_WSO2_APIMGT_STATISTICS_PERDAYEXECUTIONTIMES!");
    }

    public void purgeData() throws Exception {
        if (isTableExist(-1234, "ORG_WSO2_APIMGT_STATISTICS_PERMINUTEREQUEST")) {
            deleteData(-1234, "ORG_WSO2_APIMGT_STATISTICS_PERMINUTEREQUEST");
        }
        if (isTableExist(-1234, ORG_WSO2_APIMGT_STATISTICS_PERHOURREQUEST)) {
            deleteData(-1234, ORG_WSO2_APIMGT_STATISTICS_PERHOURREQUEST);
        }
        if (isTableExist(-1234, ORG_WSO2_APIMGT_STATISTICS_PERDAYREQUEST)) {
            deleteData(-1234, ORG_WSO2_APIMGT_STATISTICS_PERDAYREQUEST);
        }
        if (isTableExist(-1234, ORG_WSO2_APIMGT_STATISTICS_PERMINUTERESPONSE)) {
            deleteData(-1234, ORG_WSO2_APIMGT_STATISTICS_PERMINUTERESPONSE);
        }
        if (isTableExist(-1234, ORG_WSO2_APIMGT_STATISTICS_PERHOURRESPONSE)) {
            deleteData(-1234, ORG_WSO2_APIMGT_STATISTICS_PERHOURRESPONSE);
        }
        if (isTableExist(-1234, ORG_WSO2_APIMGT_STATISTICS_PERDAYRESPONSE)) {
            deleteData(-1234, ORG_WSO2_APIMGT_STATISTICS_PERDAYRESPONSE);
        }
        if (isTableExist(-1234, ORG_WSO2_APIMGT_STATISTICS_PERMINUTEEXECUTIONTIMES)) {
            deleteData(-1234, ORG_WSO2_APIMGT_STATISTICS_PERMINUTEEXECUTIONTIMES);
        }
        if (isTableExist(-1234, ORG_WSO2_APIMGT_STATISTICS_PERHOUREXECUTIONTIMES)) {
            deleteData(-1234, ORG_WSO2_APIMGT_STATISTICS_PERHOUREXECUTIONTIMES);
        }
        if (isTableExist(-1234, ORG_WSO2_APIMGT_STATISTICS_PERDAYEXECUTIONTIMES)) {
            deleteData(-1234, ORG_WSO2_APIMGT_STATISTICS_PERDAYEXECUTIONTIMES);
        }
    }
}
