/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.http.source;

import io.netty.buffer.Unpooled;
import io.netty.handler.codec.base64.Base64;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.SystemParameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.Option;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.http.metrics.SourceMetrics;
import io.siddhi.extension.io.http.sink.ClientConnector;
import io.siddhi.extension.io.http.sink.util.HttpSinkUtil;
import io.siddhi.extension.io.http.source.SSEResponseConnectorListener;
import io.siddhi.extension.io.http.source.SSEResponseListener;
import io.siddhi.extension.io.http.source.SSESourceConnectorRegistry;
import io.siddhi.extension.io.http.util.HTTPSourceRegistry;
import io.siddhi.extension.io.http.util.HttpConstants;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.wso2.carbon.messaging.Header;
import org.wso2.carbon.si.metrics.core.internal.MetricsDataHolder;
import org.wso2.transport.http.netty.contract.HttpResponseFuture;
import org.wso2.transport.http.netty.contract.config.KeepAliveConfig;
import org.wso2.transport.http.netty.contract.config.SenderConfiguration;
import org.wso2.transport.http.netty.contractimpl.DefaultHttpWsConnectorFactory;
import org.wso2.transport.http.netty.contractimpl.sender.channel.pool.PoolConfiguration;
import org.wso2.transport.http.netty.message.HttpCarbonMessage;

@Extension(name="sse", namespace="source", description="HTTP SSE source send a request to a given url and listen to the response stream.", parameters={@Parameter(name="receiver.url", description="The sse endpoint url which should be listened.", type={DataType.STRING}), @Parameter(name="basic.auth.username", description="The username to be included in the authentication header when calling endpoints protected by basic authentication. `basic.auth.password` property should be also set when using this property.", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="basic.auth.password", description="The password to be included in the authentication header when calling endpoints protected by basic authentication. `basic.auth.username` property should be also set when using this property.", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="worker.count", description="The number of active worker threads to serve the incoming events. By default the value is set to `1` to ensure events are processed in the same order they arrived. By increasing this value, higher performance can be achieved in the expense of loosing event ordering.", type={DataType.INT}, optional=true, defaultValue="1"), @Parameter(name="headers", description="HTTP request headers in format `\"'<key>:<value>','<key>:<value>'\"`.\nWhen the `Content-Type` header is not provided the system decides the Content-Type based on the provided sink mapper as following: \n - `@map(type='xml')`: `application/xml`\n - `@map(type='json')`: `application/json`\n - `@map(type='text')`: `plain/text`\n - `@map(type='keyvalue')`: `application/x-www-form-urlencoded`\n - For all other cases system defaults to `plain/text`\nAlso the `Content-Length` header need not to be provided, as the system automatically defines it by calculating the size of the payload.", type={DataType.STRING}, optional=true, defaultValue="Content-Type and Content-Length headers"), @Parameter(name="https.truststore.file", description="The file path of the client truststore when sending messages through `https` protocol.", type={DataType.STRING}, optional=true, defaultValue="`${carbon.home}/resources/security/client-truststore.jks`"), @Parameter(name="https.truststore.password", description="The password for the client-truststore.", type={DataType.STRING}, optional=true, defaultValue="wso2carbon"), @Parameter(name="client.bootstrap.configurations", description="Client bootstrap configurations in format `\"'<key>:<value>','<key>:<value>'\"`.\nSome supported configurations :\n - Client connect timeout in millis: `'client.bootstrap.connect.timeout:15000'`\n - Client socket timeout in seconds: `'client.bootstrap.socket.timeout:15'`\n - Client socket reuse: `'client.bootstrap.socket.reuse:true'`\n - Enable TCP no delay: `'client.bootstrap.nodelay:true'`\n - Enable client keep alive: `'client.bootstrap.keepalive:true'`\n - Send buffer size: `'client.bootstrap.sendbuffersize:1048576'`\n - Receive buffer size: `'client.bootstrap.recievebuffersize:1048576'`", type={DataType.STRING}, optional=true, defaultValue="-")}, systemParameter={@SystemParameter(name="defaultScheme", description="The default protocol.", defaultValue="http", possibleParameters={"http", "https"}), @SystemParameter(name="defaultHttpPort", description="The default HTTP port when default scheme is `http`.", defaultValue="8280", possibleParameters={"Any valid port"}), @SystemParameter(name="defaultHost", description="The default host of the transport.", defaultValue="0.0.0.0", possibleParameters={"Any valid host"})}, examples={@Example(syntax="@Source(type='sse', receiver.url='http://localhost:8080/sse', @map(type='json')) define stream IncomingStream (param1 string);", description="This subscribes to the events which gets published by the SSE server at receiver.url")})
public class SSESource
extends Source {
    private static final Logger log = LogManager.getLogger(SSESource.class);
    private int workerThread;
    private String siddhiAppName;
    private String streamID;
    private String eventSourceUrl;
    private String mapType;
    private String clientStoreFile;
    private String clientStorePass;
    private String clientBootstrapConfiguration;
    private String userName;
    private String userPassword;
    private String authType;
    private String authorizationHeader;
    private String[] requestedTransportPropertyNames;
    private Option httpHeaderOption;
    private ClientConnector clientConnector;
    private DefaultHttpWsConnectorFactory httpConnectorFactory;
    private ConfigReader configReader;
    private PoolConfiguration connectionPoolConfiguration;
    private SSESourceConnectorRegistry httpConnectorRegistry;
    private SSEResponseConnectorListener httpSSEResponseConnectorListener;
    private ServiceDeploymentInfo serviceDeploymentInfo;
    private SourceEventListener sourceEventListener;
    private SourceMetrics metrics;

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return this.serviceDeploymentInfo;
    }

    public StateFactory init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] requestedTransportPropertyNames, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        boolean isSecured;
        int port;
        this.siddhiAppName = siddhiAppContext.getName();
        this.streamID = sourceEventListener.getStreamDefinition().getId();
        this.sourceEventListener = sourceEventListener;
        this.configReader = configReader;
        this.mapType = "json";
        this.workerThread = Integer.parseInt(optionHolder.validateAndGetStaticValue("worker.count", "1"));
        this.clientStoreFile = optionHolder.validateAndGetStaticValue("https.truststore.file", HttpSinkUtil.trustStorePath(configReader));
        this.clientStorePass = optionHolder.validateAndGetStaticValue("https.truststore.password", HttpSinkUtil.trustStorePassword(configReader));
        this.clientBootstrapConfiguration = optionHolder.validateAndGetStaticValue("client.bootstrap.configurations", "");
        this.userName = optionHolder.validateAndGetStaticValue("basic.auth.username", "");
        this.userPassword = optionHolder.validateAndGetStaticValue("basic.auth.password", "");
        this.httpHeaderOption = optionHolder.getOrCreateOption("headers", HttpConstants.DEFAULT_HEADER);
        this.authType = this.validateAndGetAuthType();
        String scheme = configReader.readConfig("defaultScheme", "http");
        if ("https".equals(scheme)) {
            port = Integer.parseInt(configReader.readConfig("defaultHttpsPort", "8243"));
            isSecured = true;
        } else {
            port = Integer.parseInt(configReader.readConfig("defaultHttpPort", "8280"));
            isSecured = false;
        }
        this.serviceDeploymentInfo = new ServiceDeploymentInfo(port, isSecured);
        this.eventSourceUrl = optionHolder.validateAndGetOption("receiver.url").getValue();
        this.httpConnectorRegistry = SSESourceConnectorRegistry.getInstance();
        this.requestedTransportPropertyNames = (String[])requestedTransportPropertyNames.clone();
        this.httpConnectorFactory = HttpSinkUtil.createConnectorFactory(configReader);
        this.connectionPoolConfiguration = HttpSinkUtil.createPoolConfigurations(optionHolder);
        this.clientConnector = this.createClientConnector();
        this.initMetrics();
        return null;
    }

    public Class[] getOutputEventClasses() {
        return new Class[0];
    }

    public void connect(Source.ConnectionCallback connectionCallback, State state) throws ConnectionUnavailableException {
        this.httpSSEResponseConnectorListener = new SSEResponseConnectorListener(this.workerThread, this.sourceEventListener, this.streamID, this.requestedTransportPropertyNames, this.siddhiAppName, this.metrics);
        this.httpConnectorRegistry.registerSourceListener(this.httpSSEResponseConnectorListener, this.streamID);
        HTTPSourceRegistry.registerSSESource(this.streamID, this);
        String httpMethod = "GET";
        String headers = this.httpHeaderOption.getValue();
        List<Header> headersList = HttpSinkUtil.getHeaders(headers);
        String contentType = HttpSinkUtil.getContentType(this.mapType, headersList);
        HttpMethod httpReqMethod = new HttpMethod(httpMethod);
        HttpCarbonMessage cMessage = new HttpCarbonMessage(new DefaultHttpRequest(HttpVersion.HTTP_1_1, httpReqMethod, ""));
        cMessage = this.generateCarbonMessage(headersList, contentType, httpMethod, cMessage, this.clientConnector.getHttpURLProperties());
        cMessage.completeMessage();
        HttpResponseFuture httpResponseFuture = this.clientConnector.send(cMessage);
        CountDownLatch latch = null;
        if ("oauth".equals(this.authType)) {
            latch = new CountDownLatch(1);
        }
        SSEResponseListener httpListener = new SSEResponseListener(this, this.streamID, latch, this.metrics);
        httpResponseFuture.setHttpConnectorListener(httpListener);
        if (latch != null) {
            try {
                boolean latchCount = latch.await(30L, TimeUnit.SECONDS);
                if (!latchCount) {
                    log.debug("Timeout due to getting response from " + this.clientConnector.getPublisherURL() + ". Message dropped.");
                    throw new ConnectionUnavailableException("Time out due to getting response from " + this.clientConnector.getPublisherURL() + ". Message dropped.");
                }
            }
            catch (InterruptedException e) {
                log.debug("Failed to get a response from " + this.clientConnector.getPublisherURL() + "," + e + ". Message dropped.");
                throw new ConnectionUnavailableException("Failed to get a response from " + this.clientConnector.getPublisherURL() + ", " + e + ". Message dropped.");
            }
        }
    }

    public void disconnect() {
        if (this.clientConnector != null) {
            String publisherURL = this.clientConnector.getPublisherURL();
            this.clientConnector = null;
            log.debug("Server connector for url " + publisherURL + " disconnected.");
        }
        if (this.httpConnectorFactory != null) {
            this.httpConnectorFactory.shutdownNow();
            this.httpConnectorFactory = null;
        }
        this.httpConnectorRegistry.unregisterSourceListener(this.streamID, this.siddhiAppName);
        HTTPSourceRegistry.removeSSESource(this.streamID);
    }

    public void destroy() {
        if (this.clientConnector != null) {
            String publisherURL = this.clientConnector.getPublisherURL();
            this.clientConnector = null;
            log.debug("Server connector for url " + publisherURL + " disconnected.");
        }
    }

    public void pause() {
    }

    public void resume() {
    }

    private String validateAndGetAuthType() {
        if ("".equals(this.userName) ^ "".equals(this.userPassword)) {
            throw new SiddhiAppCreationException("Please provide user name and password in http sink with the stream " + this.streamID + " in Siddhi app " + this.siddhiAppName);
        }
        if (!"".equals(this.userName)) {
            byte[] val = (this.userName + ":" + this.userPassword).getBytes(Charset.defaultCharset());
            this.authorizationHeader = "Basic " + Base64.encode(Unpooled.copiedBuffer(val));
        }
        if (!"".equals(this.userName) && !"".equals(this.userPassword)) {
            return "basic.auth";
        }
        return "no.auth";
    }

    public SSEResponseConnectorListener getConnectorListener() {
        return this.httpSSEResponseConnectorListener;
    }

    public ClientConnector createClientConnector() {
        Map<String, String> httpURLProperties = HttpSinkUtil.getURLProperties(this.eventSourceUrl);
        SenderConfiguration senderConfig = HttpSinkUtil.getSenderConfigurations(httpURLProperties, this.clientStoreFile, this.clientStorePass, this.configReader);
        senderConfig.setKeepAliveConfig(KeepAliveConfig.ALWAYS);
        if ("".equals(this.eventSourceUrl)) {
            throw new SiddhiAppCreationException("Event Source URL found empty but it is Mandatory field in http source " + this.streamID);
        }
        senderConfig.setPoolConfiguration(this.connectionPoolConfiguration);
        Map<String, Object> bootStrapProperties = HttpSinkUtil.populateTransportConfiguration(this.clientBootstrapConfiguration);
        return new ClientConnector(this.eventSourceUrl, httpURLProperties, this.httpConnectorFactory.createHttpClientConnector(bootStrapProperties, senderConfig));
    }

    private HttpCarbonMessage generateCarbonMessage(List<Header> headers, String contentType, String httpMethod, HttpCarbonMessage cMessage, Map<String, String> httpURLProperties) {
        cMessage.setProperty("PROTOCOL", httpURLProperties.get("PROTOCOL"));
        cMessage.setProperty("TO", httpURLProperties.get("TO"));
        cMessage.setProperty("host", httpURLProperties.get("host"));
        cMessage.setProperty("port", Integer.valueOf(httpURLProperties.get("port")));
        cMessage.setHttpMethod(httpMethod);
        cMessage.setRequestUrl(httpURLProperties.get("REQUEST_URL"));
        HttpHeaders httpHeaders = cMessage.getHeaders();
        if (!this.userName.equals("") && !this.userPassword.equals("")) {
            httpHeaders.set("Authorization", (Object)this.authorizationHeader);
        } else if (!this.userName.equals("") || !this.userPassword.equals("")) {
            log.error("One of the basic authentication username or password missing. Hence basic authentication not supported.");
        }
        httpHeaders.set("host", cMessage.getProperty("host"));
        if (headers != null) {
            for (Header header : headers) {
                httpHeaders.set(header.getName(), (Object)header.getValue());
            }
        }
        if (contentType.contains(this.mapType)) {
            httpHeaders.set("Content-Type", (Object)contentType);
        }
        cMessage.setHttpMethod(httpMethod);
        return cMessage;
    }

    public boolean matches(String thatSinkId) {
        return Objects.equals(this.streamID, thatSinkId);
    }

    private void initMetrics() {
        if (MetricsDataHolder.getInstance().getMetricService() != null && MetricsDataHolder.getInstance().getMetricManagementService().isEnabled()) {
            try {
                if (MetricsDataHolder.getInstance().getMetricManagementService().isReporterRunning("prometheus")) {
                    this.metrics = new SourceMetrics(this.siddhiAppName, this.sourceEventListener.getStreamDefinition().getId(), this.eventSourceUrl);
                }
            }
            catch (IllegalArgumentException e) {
                log.debug("Prometheus reporter is not running. Hence sse source metrics will not be initialized for " + this.siddhiAppName);
            }
        }
    }
}

