/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.am.integration.tests.streamingapis.serversentevents;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.URI;
import java.net.URL;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import javax.servlet.Servlet;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.json.JSONObject;
import org.testng.Assert;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
import org.wso2.am.integration.clients.admin.ApiResponse;
import org.wso2.am.integration.clients.admin.api.dto.AdvancedThrottlePolicyDTO;
import org.wso2.am.integration.clients.admin.api.dto.EventCountLimitDTO;
import org.wso2.am.integration.clients.admin.api.dto.ThrottleLimitDTO;
import org.wso2.am.integration.clients.publisher.api.v1.dto.APIDTO;
import org.wso2.am.integration.clients.store.api.v1.dto.APIListDTO;
import org.wso2.am.integration.clients.store.api.v1.dto.ApplicationDTO;
import org.wso2.am.integration.clients.store.api.v1.dto.ApplicationKeyDTO;
import org.wso2.am.integration.clients.store.api.v1.dto.ApplicationKeyGenerateRequestDTO;
import org.wso2.am.integration.clients.store.api.v1.dto.SubscriptionDTO;
import org.wso2.am.integration.clients.store.api.v1.dto.TopicListDTO;
import org.wso2.am.integration.test.impl.DtoFactory;
import org.wso2.am.integration.test.utils.APIManagerIntegrationTestException;
import org.wso2.am.integration.test.utils.base.APIMIntegrationBaseTest;
import org.wso2.am.integration.test.utils.bean.APILifeCycleAction;
import org.wso2.am.integration.test.utils.bean.APIRequest;
import org.wso2.am.integration.test.utils.generic.APIMTestCaseUtils;
import org.wso2.am.integration.test.utils.token.TokenUtils;
import org.wso2.am.integration.tests.streamingapis.StreamingApiTestUtils;
import org.wso2.am.integration.tests.streamingapis.serversentevents.client.SimpleSseReceiver;
import org.wso2.am.integration.tests.streamingapis.serversentevents.server.SseServlet;
import org.wso2.carbon.apimgt.api.model.APIIdentifier;
import org.wso2.carbon.automation.engine.annotations.ExecutionEnvironment;
import org.wso2.carbon.automation.engine.annotations.SetEnvironment;
import org.wso2.carbon.automation.engine.context.TestUserMode;
import org.wso2.carbon.automation.engine.frameworkutils.FrameworkPathUtil;
import org.wso2.carbon.automation.test.utils.common.TestConfigurationProvider;
import org.wso2.carbon.automation.test.utils.http.client.HttpResponse;
import org.wso2.carbon.integration.common.utils.mgt.ServerConfigurationManager;

@SetEnvironment(executionEnvironments={ExecutionEnvironment.STANDALONE})
public class ServerSentEventsAPITestCase
extends APIMIntegrationBaseTest {
    private final Log log = LogFactory.getLog(ServerSentEventsAPITestCase.class);
    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
    private String sseEventPublisherSource = TestConfigurationProvider.getResourceLocation() + File.separator + "artifacts" + File.separator + "AM" + File.separator + "configFiles" + File.separator + "streamingAPIs" + File.separator + "serverSentEventsTest" + File.separator;
    private String sseRequestEventPublisherSource = "SSE_Req_Logger.xml";
    private String sseThrottleOutEventPublisherSource = "SSE_Throttle_Out_Logger.xml";
    private String sseEventPublisherTarget = FrameworkPathUtil.getCarbonHome() + File.separator + "repository" + File.separator + "deployment" + File.separator + "server" + File.separator + "eventpublishers" + File.separator;
    private String apiName = "SSEAPI";
    private String applicationName = "SSEApplication";
    private ServerConfigurationManager serverConfigurationManager;
    private String provider;
    private APIRequest apiRequest;
    private int sseServerPort;
    private String sseServerHost;
    private String apiId;
    private String appId;
    private SseServlet sseServlet;
    private Server sseServer;
    private SimpleSseReceiver sseReceiver;
    private String apiEndpoint;
    private String consumerKey;
    private String consumerSecret;

    @Factory(dataProvider="userModeDataProvider")
    public ServerSentEventsAPITestCase(TestUserMode userMode) {
        this.userMode = userMode;
    }

    @DataProvider
    public static Object[][] userModeDataProvider() {
        return new Object[][]{{TestUserMode.SUPER_TENANT_ADMIN}, {TestUserMode.TENANT_ADMIN}};
    }

    @BeforeClass(alwaysRun=true)
    public void setEnvironment() throws Exception {
        super.init(this.userMode);
        this.serverConfigurationManager = new ServerConfigurationManager(this.gatewayContextWrk);
        this.serverConfigurationManager.applyConfigurationWithoutRestart(new File(this.sseEventPublisherSource + this.sseRequestEventPublisherSource), new File(this.sseEventPublisherTarget + this.sseRequestEventPublisherSource), false);
        this.serverConfigurationManager.applyConfigurationWithoutRestart(new File(this.sseEventPublisherSource + this.sseThrottleOutEventPublisherSource), new File(this.sseEventPublisherTarget + this.sseThrottleOutEventPublisherSource), false);
        this.sseServerHost = InetAddress.getLocalHost().getHostName();
        int lowerPortLimit = 8080;
        int upperPortLimit = 8090;
        this.sseServerPort = StreamingApiTestUtils.getAvailablePort(lowerPortLimit, upperPortLimit, this.sseServerHost);
        if (this.sseServerPort == -1) {
            throw new APIManagerIntegrationTestException("No available port in the range " + lowerPortLimit + "-" + upperPortLimit + " was found");
        }
        this.log.info((Object)("Selected port " + this.sseServerPort + " to start backend server"));
        this.initializeSseServer(this.sseServerPort);
    }

    @Test(description="Publish SSE API")
    public void testPublishSseApi() throws Exception {
        this.provider = this.user.getUserName();
        String apiContext = "sse";
        String apiVersion = "1.0.0";
        URI endpointUri = new URI("http://" + this.sseServerHost + ":" + this.sseServerPort);
        this.apiRequest = new APIRequest(this.apiName, apiContext, endpointUri, endpointUri);
        this.apiRequest.setVersion(apiVersion);
        this.apiRequest.setTiersCollection("AsyncUnlimited");
        this.apiRequest.setProvider(this.provider);
        this.apiRequest.setType("SSE");
        HttpResponse addAPIResponse = this.restAPIPublisher.addAPI(this.apiRequest);
        this.apiId = addAPIResponse.getData();
        this.createAPIRevisionAndDeployUsingRest(this.apiId, this.restAPIPublisher);
        this.restAPIPublisher.changeAPILifeCycleStatus(this.apiId, APILifeCycleAction.PUBLISH.getAction(), null);
        this.waitForAPIDeploymentSync(this.user.getUserName(), this.apiName, apiVersion, "\"isApiExists\":true");
        APIIdentifier apiIdentifier = new APIIdentifier(this.provider, this.apiName, apiVersion);
        this.apiEndpoint = TestUserMode.SUPER_TENANT_ADMIN.equals((Object)this.userMode) || TestUserMode.SUPER_TENANT_USER.equals((Object)this.userMode) ? this.getSuperTenantAPIInvocationURLHttp(apiContext, apiVersion) : this.getAPIInvocationURLHttp(apiContext, apiVersion);
        org.wso2.am.integration.clients.publisher.api.v1.dto.APIListDTO apiPublisherAllAPIs = this.restAPIPublisher.getAllAPIs();
        Assert.assertTrue((boolean)APIMTestCaseUtils.isAPIAvailable((APIIdentifier)apiIdentifier, (org.wso2.am.integration.clients.publisher.api.v1.dto.APIListDTO)apiPublisherAllAPIs), (String)"Published API is visible in API Publisher.");
        APIListDTO restAPIStoreAllAPIs = TestUserMode.SUPER_TENANT_ADMIN == this.userMode ? this.restAPIStore.getAllAPIs() : this.restAPIStore.getAllAPIs(this.user.getUserDomain());
        Assert.assertTrue((boolean)APIMTestCaseUtils.isAPIAvailableInStore((APIIdentifier)apiIdentifier, (APIListDTO)restAPIStoreAllAPIs), (String)"Published API is visible in API Store.");
    }

    @Test(description="Create Application and subscribe", dependsOnMethods={"testPublishSseApi"})
    public void testSseApiApplicationSubscription() throws Exception {
        HttpResponse applicationResponse = this.restAPIStore.createApplication(this.applicationName, "", "Unlimited", ApplicationDTO.TokenTypeEnum.OAUTH);
        this.appId = applicationResponse.getData();
        SubscriptionDTO subscriptionDTO = this.restAPIStore.subscribeToAPI(this.apiId, this.appId, "AsyncUnlimited");
        Assert.assertEquals((Object)subscriptionDTO.getStatus(), (Object)SubscriptionDTO.StatusEnum.UNBLOCKED);
    }

    @Test(description="check for topics of a SSE api", dependsOnMethods={"testSseApiApplicationSubscription"})
    public void testTopicRetrievalofSSEApi() throws Exception {
        HttpResponse topicResponse = this.restAPIStore.getTopics(this.apiId, this.user.getUserDomain());
        Gson g = new Gson();
        TopicListDTO topicListDTO = (TopicListDTO)g.fromJson(topicResponse.getData(), TopicListDTO.class);
        Assert.assertEquals((int)topicListDTO.getCount(), (int)1);
    }

    @Test(description="Invoke SSE API", dependsOnMethods={"testSseApiApplicationSubscription"})
    public void testInvokeSseApi() throws Exception {
        ArrayList<String> grantTypes = new ArrayList<String>();
        grantTypes.add("password");
        grantTypes.add("refresh_token");
        grantTypes.add("client_credentials");
        ApplicationKeyDTO applicationKeyDTO = this.restAPIStore.generateKeys(this.appId, "3600", null, ApplicationKeyGenerateRequestDTO.KeyTypeEnum.PRODUCTION, null, grantTypes);
        String accessToken = applicationKeyDTO.getToken().getAccessToken();
        this.consumerKey = applicationKeyDTO.getConsumerKey();
        this.consumerSecret = applicationKeyDTO.getConsumerSecret();
        this.invokeSseApi(accessToken, 30000L);
        int sent = this.sseServlet.getEventsSent();
        int received = this.sseReceiver.getReceivedDataEventsCount();
        Assert.assertNotEquals((Object)sent, (Object)0);
        Assert.assertEquals((int)sent, (int)received);
        this.sseServlet.setEventsSent(0);
        this.sseReceiver.setReceivedDataEventsCount(0);
    }

    public void testSseApiThrottling() throws Exception {
        FileInputStream inputStream = new FileInputStream(this.getAMResourceLocation() + File.separator + "configFiles" + File.separator + "streamingAPIs" + File.separator + "serverSentEventsTest" + File.separator + "policy.json");
        ObjectMapper mapper = new ObjectMapper();
        JsonNode jsonMap = mapper.readTree((InputStream)inputStream);
        String policyName = jsonMap.get("policyName").textValue();
        String policyDescription = jsonMap.get("policyDescription").textValue();
        JsonNode defaultLimitJson = jsonMap.get("defaultLimit");
        JsonNode requestCountJson = defaultLimitJson.get("requestCount");
        Long requestCountLimit = Long.valueOf(String.valueOf(requestCountJson.get("requestCount")));
        String timeUnit = requestCountJson.get("timeUnit").textValue();
        Integer unitTime = Integer.valueOf(String.valueOf(requestCountJson.get("unitTime")));
        EventCountLimitDTO eventCountLimitDTO = DtoFactory.createEventCountLimitDTO((String)timeUnit, (Integer)unitTime, (Long)requestCountLimit);
        ThrottleLimitDTO defaultLimit = DtoFactory.createEventCountThrottleLimitDTO((EventCountLimitDTO)eventCountLimitDTO);
        AdvancedThrottlePolicyDTO advancedPolicyDTO = DtoFactory.createAdvancedThrottlePolicyDTO((String)policyName, (String)"", (String)policyDescription, (boolean)false, (ThrottleLimitDTO)defaultLimit, new ArrayList());
        ApiResponse addedPolicy = this.restAPIAdmin.addAdvancedThrottlingPolicy(advancedPolicyDTO);
        Assert.assertEquals((int)addedPolicy.getStatusCode(), (int)201);
        AdvancedThrottlePolicyDTO addedAdvancedPolicyDTO = (AdvancedThrottlePolicyDTO)addedPolicy.getData();
        String apiPolicyId = addedAdvancedPolicyDTO.getPolicyId();
        Assert.assertNotNull((Object)apiPolicyId, (String)"The policy ID cannot be null or empty");
        HttpResponse response = this.restAPIPublisher.getAPI(this.apiId);
        Gson g = new Gson();
        APIDTO apidto = (APIDTO)g.fromJson(response.getData(), APIDTO.class);
        apidto.setApiThrottlingPolicy("SSETestThrottlingPolicy");
        APIDTO updatedAPI = this.restAPIPublisher.updateAPI(apidto);
        Assert.assertEquals((String)updatedAPI.getApiThrottlingPolicy(), (String)"SSETestThrottlingPolicy");
        URL tokenEndpointURL = new URL(this.getKeyManagerURLHttps() + "/oauth2/token");
        String subsAccessTokenPayload = APIMTestCaseUtils.getPayloadForPasswordGrant((String)this.user.getUserName(), (String)this.user.getPassword());
        JSONObject subsAccessTokenGenerationResponse = new JSONObject(this.apiStore.generateUserAccessKey(this.consumerKey, this.consumerSecret, subsAccessTokenPayload, tokenEndpointURL).getData());
        String subsRefreshToken = subsAccessTokenGenerationResponse.getString("refresh_token");
        Assert.assertFalse((boolean)StringUtils.isEmpty((String)subsRefreshToken), (String)"Refresh token of access token generated by subscriber is empty");
        String requestBody = APIMTestCaseUtils.getPayloadForPasswordGrant((String)this.user.getUserName(), (String)this.user.getPassword());
        JSONObject accessTokenGenerationResponse = new JSONObject(this.apiStore.generateUserAccessKey(this.consumerKey, this.consumerSecret, requestBody, tokenEndpointURL).getData());
        String refreshToken = accessTokenGenerationResponse.getString("refresh_token");
        String getAccessTokenFromRefreshTokenRequestBody = "grant_type=refresh_token&refresh_token=" + refreshToken;
        accessTokenGenerationResponse = new JSONObject(this.apiStore.generateUserAccessKey(this.consumerKey, this.consumerSecret, getAccessTokenFromRefreshTokenRequestBody, tokenEndpointURL).getData());
        String userAccessToken = accessTokenGenerationResponse.getString("access_token");
        Assert.assertNotNull((Object)("Access Token not found " + accessTokenGenerationResponse), (String)userAccessToken);
        String tokenJti = TokenUtils.getJtiOfJwtToken((String)userAccessToken);
        this.testThrottling(tokenJti);
    }

    private void testThrottling(String accessToken) throws Exception {
        this.startAndStopSseServer(180000L);
        while (LocalDateTime.now().getSecond() > 40) {
            Thread.sleep(5000L);
        }
        long startTime = System.currentTimeMillis();
        this.startSseReceiver(accessToken, null);
        long endTime = System.currentTimeMillis();
        Assert.assertTrue((endTime - startTime < 60000L ? 1 : 0) != 0);
        final AtomicBoolean isThrottled = new AtomicBoolean(false);
        this.startSseReceiver(accessToken, new Consumer<Boolean>(){

            @Override
            public void accept(Boolean aBoolean) {
                isThrottled.set(aBoolean);
            }
        });
        Thread.sleep(3000L);
        Assert.assertTrue((boolean)isThrottled.get());
    }

    private void initializeSseServer(int port) {
        Server server = new Server(port);
        ServletHandler servletHandler = new ServletHandler();
        server.setHandler((Handler)servletHandler);
        this.sseServlet = new SseServlet();
        ServletHolder servletHolder = new ServletHolder((Servlet)this.sseServlet);
        servletHandler.addServletWithMapping(servletHolder, "/memory");
        this.sseServer = server;
    }

    private void invokeSseApi(String bearerToken, long runForMillis) throws Exception {
        this.startAndStopSseServer(runForMillis);
        Thread.sleep(5000L);
        this.startSseReceiver(bearerToken, null);
    }

    private void startAndStopSseServer(final long stopAfterMillis) {
        this.executorService.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    ServerSentEventsAPITestCase.this.sseServer.start();
                    ServerSentEventsAPITestCase.this.log.info((Object)("SSE Server Started and will be stopped after: " + stopAfterMillis + "ms."));
                    Thread.sleep(stopAfterMillis);
                    ServerSentEventsAPITestCase.this.sseServer.stop();
                    ServerSentEventsAPITestCase.this.log.info((Object)"SSE Server Stopped.");
                }
                catch (Exception e) {
                    ServerSentEventsAPITestCase.this.log.error((Object)"Failed to start/stop the SSE server.", (Throwable)e);
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void startSseReceiver(String bearerToken, Consumer<Boolean> throttledResponseConsumer) {
        Client client = ClientBuilder.newClient();
        WebTarget target = client.target(this.apiEndpoint + "/memory");
        this.sseReceiver = new SimpleSseReceiver(target, bearerToken);
        this.sseReceiver.registerThrottledResponseConsumer(throttledResponseConsumer);
        try {
            this.sseReceiver.open();
        }
        finally {
            this.sseReceiver.close();
        }
    }

    @AfterTest(alwaysRun=true)
    public void destroy() throws Exception {
        this.serverConfigurationManager.restoreToLastConfiguration(false);
        this.executorService.shutdownNow();
        super.cleanUp();
    }
}

