package org.wso2.am.integration.tests.streamingapis.serversentevents;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import java.io.File;
import java.io.FileInputStream;
import java.net.InetAddress;
import java.net.URI;
import java.net.URL;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import javax.ws.rs.client.ClientBuilder;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.json.JSONObject;
import org.springframework.util.backoff.ExponentialBackOff;
import org.springframework.util.backoff.FixedBackOff;
import org.testng.Assert;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
import org.wso2.am.integration.clients.admin.ApiResponse;
import org.wso2.am.integration.clients.admin.api.dto.AdvancedThrottlePolicyDTO;
import org.wso2.am.integration.clients.publisher.api.v1.dto.APIDTO;
import org.wso2.am.integration.clients.store.api.v1.dto.ApplicationDTO;
import org.wso2.am.integration.clients.store.api.v1.dto.ApplicationKeyDTO;
import org.wso2.am.integration.clients.store.api.v1.dto.ApplicationKeyGenerateRequestDTO;
import org.wso2.am.integration.clients.store.api.v1.dto.SubscriptionDTO;
import org.wso2.am.integration.clients.store.api.v1.dto.TopicListDTO;
import org.wso2.am.integration.test.impl.DtoFactory;
import org.wso2.am.integration.test.utils.APIManagerIntegrationTestException;
import org.wso2.am.integration.test.utils.base.APIMIntegrationBaseTest;
import org.wso2.am.integration.test.utils.bean.APILifeCycleAction;
import org.wso2.am.integration.test.utils.bean.APIRequest;
import org.wso2.am.integration.test.utils.generic.APIMTestCaseUtils;
import org.wso2.am.integration.test.utils.token.TokenUtils;
import org.wso2.am.integration.tests.restapi.RESTAPITestConstants;
import org.wso2.am.integration.tests.streamingapis.StreamingApiTestUtils;
import org.wso2.am.integration.tests.streamingapis.serversentevents.client.SimpleSseReceiver;
import org.wso2.am.integration.tests.streamingapis.serversentevents.server.SseServlet;
import org.wso2.carbon.apimgt.api.model.APIIdentifier;
import org.wso2.carbon.automation.engine.annotations.ExecutionEnvironment;
import org.wso2.carbon.automation.engine.annotations.SetEnvironment;
import org.wso2.carbon.automation.engine.context.TestUserMode;
import org.wso2.carbon.automation.engine.frameworkutils.FrameworkPathUtil;
import org.wso2.carbon.automation.test.utils.common.TestConfigurationProvider;
import org.wso2.carbon.integration.common.utils.mgt.ServerConfigurationManager;

@SetEnvironment(executionEnvironments = {ExecutionEnvironment.STANDALONE})
/* loaded from: input_file:org/wso2/am/integration/tests/streamingapis/serversentevents/ServerSentEventsAPITestCase.class */
public class ServerSentEventsAPITestCase extends APIMIntegrationBaseTest {
    private final Log log = LogFactory.getLog(ServerSentEventsAPITestCase.class);
    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
    private String sseEventPublisherSource = TestConfigurationProvider.getResourceLocation() + File.separator + "artifacts" + File.separator + "AM" + File.separator + "configFiles" + File.separator + "streamingAPIs" + File.separator + "serverSentEventsTest" + File.separator;
    private String sseRequestEventPublisherSource = "SSE_Req_Logger.xml";
    private String sseThrottleOutEventPublisherSource = "SSE_Throttle_Out_Logger.xml";
    private String sseEventPublisherTarget = FrameworkPathUtil.getCarbonHome() + File.separator + "repository" + File.separator + "deployment" + File.separator + "server" + File.separator + "eventpublishers" + File.separator;
    private String apiName = "SSEAPI";
    private String applicationName = "SSEApplication";
    private ServerConfigurationManager serverConfigurationManager;
    private String provider;
    private APIRequest apiRequest;
    private int sseServerPort;
    private String sseServerHost;
    private String apiId;
    private String appId;
    private SseServlet sseServlet;
    private Server sseServer;
    private SimpleSseReceiver sseReceiver;
    private String apiEndpoint;
    private String consumerKey;
    private String consumerSecret;

    @Factory(dataProvider = "userModeDataProvider")
    public ServerSentEventsAPITestCase(TestUserMode testUserMode) {
        this.userMode = testUserMode;
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider
    public static Object[][] userModeDataProvider() {
        return new Object[]{new Object[]{TestUserMode.SUPER_TENANT_ADMIN}, new Object[]{TestUserMode.TENANT_ADMIN}};
    }

    @BeforeClass(alwaysRun = true)
    public void setEnvironment() throws Exception {
        super.init(this.userMode);
        this.serverConfigurationManager = new ServerConfigurationManager(this.gatewayContextWrk);
        this.serverConfigurationManager.applyConfigurationWithoutRestart(new File(this.sseEventPublisherSource + this.sseRequestEventPublisherSource), new File(this.sseEventPublisherTarget + this.sseRequestEventPublisherSource), false);
        this.serverConfigurationManager.applyConfigurationWithoutRestart(new File(this.sseEventPublisherSource + this.sseThrottleOutEventPublisherSource), new File(this.sseEventPublisherTarget + this.sseThrottleOutEventPublisherSource), false);
        this.sseServerHost = InetAddress.getLocalHost().getHostName();
        this.sseServerPort = StreamingApiTestUtils.getAvailablePort(8080, 8090, this.sseServerHost);
        if (this.sseServerPort == -1) {
            throw new APIManagerIntegrationTestException("No available port in the range 8080-8090 was found");
        }
        this.log.info("Selected port " + this.sseServerPort + " to start backend server");
        initializeSseServer(this.sseServerPort);
    }

    @Test(description = "Publish SSE API")
    public void testPublishSseApi() throws Exception {
        this.provider = this.user.getUserName();
        URI uri = new URI("http://" + this.sseServerHost + ":" + this.sseServerPort);
        this.apiRequest = new APIRequest(this.apiName, "sse", uri, uri);
        this.apiRequest.setVersion("1.0.0");
        this.apiRequest.setTiersCollection("AsyncUnlimited");
        this.apiRequest.setProvider(this.provider);
        this.apiRequest.setType("SSE");
        this.apiId = this.restAPIPublisher.addAPI(this.apiRequest).getData();
        createAPIRevisionAndDeployUsingRest(this.apiId, this.restAPIPublisher);
        this.restAPIPublisher.changeAPILifeCycleStatus(this.apiId, APILifeCycleAction.PUBLISH.getAction(), (String) null);
        waitForAPIDeploymentSync(this.user.getUserName(), this.apiName, "1.0.0", "\"isApiExists\":true");
        APIIdentifier aPIIdentifier = new APIIdentifier(this.provider, this.apiName, "1.0.0");
        if (TestUserMode.SUPER_TENANT_ADMIN.equals(this.userMode) || TestUserMode.SUPER_TENANT_USER.equals(this.userMode)) {
            this.apiEndpoint = getSuperTenantAPIInvocationURLHttp("sse", "1.0.0");
        } else {
            this.apiEndpoint = getAPIInvocationURLHttp("sse", "1.0.0");
        }
        Assert.assertTrue(APIMTestCaseUtils.isAPIAvailable(aPIIdentifier, this.restAPIPublisher.getAllAPIs()), "Published API is visible in API Publisher.");
        Assert.assertTrue(APIMTestCaseUtils.isAPIAvailableInStore(aPIIdentifier, TestUserMode.SUPER_TENANT_ADMIN == this.userMode ? this.restAPIStore.getAllAPIs() : this.restAPIStore.getAllAPIs(this.user.getUserDomain())), "Published API is visible in API Store.");
    }

    @Test(description = "Create Application and subscribe", dependsOnMethods = {"testPublishSseApi"})
    public void testSseApiApplicationSubscription() throws Exception {
        this.appId = this.restAPIStore.createApplication(this.applicationName, "", "Unlimited", ApplicationDTO.TokenTypeEnum.OAUTH).getData();
        Assert.assertEquals(this.restAPIStore.subscribeToAPI(this.apiId, this.appId, "AsyncUnlimited").getStatus(), SubscriptionDTO.StatusEnum.UNBLOCKED);
    }

    @Test(description = "check for topics of a SSE api", dependsOnMethods = {"testSseApiApplicationSubscription"})
    public void testTopicRetrievalofSSEApi() throws Exception {
        Assert.assertEquals(((TopicListDTO) new Gson().fromJson(this.restAPIStore.getTopics(this.apiId, this.user.getUserDomain()).getData(), TopicListDTO.class)).getCount().intValue(), 1);
    }

    @Test(description = "Invoke SSE API", dependsOnMethods = {"testSseApiApplicationSubscription"})
    public void testInvokeSseApi() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add("password");
        arrayList.add("refresh_token");
        arrayList.add("client_credentials");
        ApplicationKeyDTO generateKeys = this.restAPIStore.generateKeys(this.appId, "3600", (String) null, ApplicationKeyGenerateRequestDTO.KeyTypeEnum.PRODUCTION, (ArrayList) null, arrayList);
        String accessToken = generateKeys.getToken().getAccessToken();
        this.consumerKey = generateKeys.getConsumerKey();
        this.consumerSecret = generateKeys.getConsumerSecret();
        invokeSseApi(accessToken, ExponentialBackOff.DEFAULT_MAX_INTERVAL);
        int eventsSent = this.sseServlet.getEventsSent();
        int receivedDataEventsCount = this.sseReceiver.getReceivedDataEventsCount();
        Assert.assertNotEquals(Integer.valueOf(eventsSent), 0);
        Assert.assertEquals(eventsSent, receivedDataEventsCount);
        this.sseServlet.setEventsSent(0);
        this.sseReceiver.setReceivedDataEventsCount(0);
    }

    public void testSseApiThrottling() throws Exception {
        JsonNode readTree = new ObjectMapper().readTree(new FileInputStream(getAMResourceLocation() + File.separator + "configFiles" + File.separator + "streamingAPIs" + File.separator + "serverSentEventsTest" + File.separator + "policy.json"));
        String textValue = readTree.get("policyName").textValue();
        String textValue2 = readTree.get("policyDescription").textValue();
        JsonNode jsonNode = readTree.get("defaultLimit").get("requestCount");
        ApiResponse addAdvancedThrottlingPolicy = this.restAPIAdmin.addAdvancedThrottlingPolicy(DtoFactory.createAdvancedThrottlePolicyDTO(textValue, "", textValue2, false, DtoFactory.createEventCountThrottleLimitDTO(DtoFactory.createEventCountLimitDTO(jsonNode.get("timeUnit").textValue(), Integer.valueOf(String.valueOf(jsonNode.get("unitTime"))), Long.valueOf(String.valueOf(jsonNode.get("requestCount"))))), new ArrayList()));
        Assert.assertEquals(addAdvancedThrottlingPolicy.getStatusCode(), 201);
        Assert.assertNotNull(((AdvancedThrottlePolicyDTO) addAdvancedThrottlingPolicy.getData()).getPolicyId(), "The policy ID cannot be null or empty");
        APIDTO apidto = (APIDTO) new Gson().fromJson(this.restAPIPublisher.getAPI(this.apiId).getData(), APIDTO.class);
        apidto.setApiThrottlingPolicy("SSETestThrottlingPolicy");
        Assert.assertEquals(this.restAPIPublisher.updateAPI(apidto).getApiThrottlingPolicy(), "SSETestThrottlingPolicy");
        URL url = new URL(getKeyManagerURLHttps() + "/oauth2/token");
        Assert.assertFalse(StringUtils.isEmpty(new JSONObject(this.apiStore.generateUserAccessKey(this.consumerKey, this.consumerSecret, APIMTestCaseUtils.getPayloadForPasswordGrant(this.user.getUserName(), this.user.getPassword()), url).getData()).getString("refresh_token")), "Refresh token of access token generated by subscriber is empty");
        JSONObject jSONObject = new JSONObject(this.apiStore.generateUserAccessKey(this.consumerKey, this.consumerSecret, "grant_type=refresh_token&refresh_token=" + new JSONObject(this.apiStore.generateUserAccessKey(this.consumerKey, this.consumerSecret, APIMTestCaseUtils.getPayloadForPasswordGrant(this.user.getUserName(), this.user.getPassword()), url).getData()).getString("refresh_token"), url).getData());
        String string = jSONObject.getString(RESTAPITestConstants.ACCESS_TOKEN_TEXT);
        Assert.assertNotNull("Access Token not found " + jSONObject, string);
        testThrottling(TokenUtils.getJtiOfJwtToken(string));
    }

    private void testThrottling(String str) throws Exception {
        startAndStopSseServer(180000L);
        while (LocalDateTime.now().getSecond() > 40) {
            Thread.sleep(FixedBackOff.DEFAULT_INTERVAL);
        }
        long currentTimeMillis = System.currentTimeMillis();
        startSseReceiver(str, null);
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis < 60000);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        startSseReceiver(str, new Consumer<Boolean>() { // from class: org.wso2.am.integration.tests.streamingapis.serversentevents.ServerSentEventsAPITestCase.1
            @Override // java.util.function.Consumer
            public void accept(Boolean bool) {
                atomicBoolean.set(bool.booleanValue());
            }
        });
        Thread.sleep(3000L);
        Assert.assertTrue(atomicBoolean.get());
    }

    private void initializeSseServer(int i) {
        Server server = new Server(i);
        ServletHandler servletHandler = new ServletHandler();
        server.setHandler(servletHandler);
        this.sseServlet = new SseServlet();
        servletHandler.addServletWithMapping(new ServletHolder(this.sseServlet), "/memory");
        this.sseServer = server;
    }

    private void invokeSseApi(String str, long j) throws Exception {
        startAndStopSseServer(j);
        Thread.sleep(FixedBackOff.DEFAULT_INTERVAL);
        startSseReceiver(str, null);
    }

    private void startAndStopSseServer(final long j) {
        this.executorService.execute(new Runnable() { // from class: org.wso2.am.integration.tests.streamingapis.serversentevents.ServerSentEventsAPITestCase.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    ServerSentEventsAPITestCase.this.sseServer.start();
                    ServerSentEventsAPITestCase.this.log.info("SSE Server Started and will be stopped after: " + j + "ms.");
                    Thread.sleep(j);
                    ServerSentEventsAPITestCase.this.sseServer.stop();
                    ServerSentEventsAPITestCase.this.log.info("SSE Server Stopped.");
                } catch (Exception e) {
                    ServerSentEventsAPITestCase.this.log.error("Failed to start/stop the SSE server.", e);
                }
            }
        });
    }

    private void startSseReceiver(String str, Consumer<Boolean> consumer) {
        this.sseReceiver = new SimpleSseReceiver(ClientBuilder.newClient().target(this.apiEndpoint + "/memory"), str);
        this.sseReceiver.registerThrottledResponseConsumer(consumer);
        try {
            this.sseReceiver.open();
            this.sseReceiver.close();
        } catch (Throwable th) {
            this.sseReceiver.close();
            throw th;
        }
    }

    @AfterTest(alwaysRun = true)
    public void destroy() throws Exception {
        this.serverConfigurationManager.restoreToLastConfiguration(false);
        this.executorService.shutdownNow();
        super.cleanUp();
    }
}
