/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.http.source;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.SystemParameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.http.metrics.SourceMetrics;
import io.siddhi.extension.io.http.source.HttpConnectorRegistry;
import io.siddhi.extension.io.http.source.HttpSourceListener;
import io.siddhi.extension.io.http.source.util.HttpSourceUtil;
import io.siddhi.extension.io.http.util.HttpIoUtil;
import java.util.Locale;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.wso2.carbon.si.metrics.core.internal.MetricsDataHolder;
import org.wso2.transport.http.netty.contract.config.ListenerConfiguration;

@Extension(name="http", namespace="source", description="HTTP source receives POST requests via HTTP and HTTPS protocols in format such as `text`, `XML` and `JSON`. It also supports basic authentication to ensure events are received from authorized users/systems.\nThe request headers and properties can be accessed via transport properties in the format `trp:<header>`.", parameters={@Parameter(name="receiver.url", description="The URL on which events should be received. To enable SSL use `https` protocol in the url.", type={DataType.STRING}, optional=true, defaultValue="`http://0.0.0.0:9763/<appNAme>/<streamName>`"), @Parameter(name="basic.auth.enabled", description="This only works in VM, Docker and Kubernetes.\nWhere when enabled it authenticates each request using the `Authorization:'Basic encodeBase64(username:Password)'` header.", type={DataType.STRING}, optional=true, defaultValue="false"), @Parameter(name="worker.count", description="The number of active worker threads to serve the incoming events. By default the value is set to `1` to ensure events are processed in the same order they arrived. By increasing this value, higher performance can be achieved in the expense of loosing event ordering.", type={DataType.INT}, optional=true, defaultValue="1"), @Parameter(name="socket.idle.timeout", description="Idle timeout for HTTP connection in millis.", type={DataType.INT}, optional=true, defaultValue="120000"), @Parameter(name="ssl.verify.client", description="The type of client certificate verification. Supported values are `require`, `optional`.", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="ssl.protocol", description="SSL/TLS protocol.", type={DataType.STRING}, optional=true, defaultValue="TLS"), @Parameter(name="tls.store.type", description="TLS store type.", type={DataType.STRING}, optional=true, defaultValue="JKS"), @Parameter(name="ssl.configurations", description="SSL/TSL configurations in format `\"'<key>:<value>','<key>:<value>'\"`.\nSome supported parameters:\n - SSL/TLS protocols: `'sslEnabledProtocols:TLSv1.1,TLSv1.2'`\n - List of ciphers: `'ciphers:TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256'`\n - Enable session creation: `'client.enable.session.creation:true'`\n - Supported server names: `'server.suported.server.names:server'`\n - Add HTTP SNIMatcher: `'server.supported.snimatchers:SNIMatcher'`", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="request.size.validation.configurations", description="Configurations to validate the HTTP request size.\nExpected format `\"'<key>:<value>','<key>:<value>'\"`.\nSome supported configurations :\n - Enable request size validation: `'request.size.validation:true'`\n If request size is validated\n - Maximum request size: `'request.size.validation.maximum.value:2048'`\n - Response status code when request size validation fails: `'request.size.validation.reject.status.code:401'`\n - Response message when request size validation fails: `'request.size.validation.reject.message:Message is bigger than the valid size'`\n - Response Content-Type when request size validation fails: `'request.size.validation.reject.message.content.type:plain/text'`", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="header.validation.configurations", description="Configurations to validate HTTP headers.\nExpected format `\"'<key>:<value>','<key>:<value>'\"`.\nSome supported configurations :\n - Enable header size validation: `'header.size.validation:true'`\n If header size is validated\n - Maximum length of initial line: `'header.validation.maximum.request.line:4096'`\n - Maximum length of all headers: `'header.validation.maximum.size:8192'`\n - Maximum length of the content or each chunk: `'header.validation.maximum.chunk.size:8192'`\n - Response status code when header validation fails: `'header.validation.reject.status.code:401'`\n - Response message when header validation fails: `'header.validation.reject.message:Message header is bigger than the valid size'`\n - Response Content-Type when header validation fails: `'header.validation.reject.message.content.type:plain/text'`", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="server.bootstrap.configurations", description="Server bootstrap configurations in format `\"'<key>:<value>','<key>:<value>'\"`.\nSome supported configurations :\n - Server connect timeout in millis: `'server.bootstrap.connect.timeout:15000'`\n - Server socket timeout in seconds: `'server.bootstrap.socket.timeout:15'`\n - Enable TCP no delay: `'server.bootstrap.nodelay:true'`\n - Enable server keep alive: `'server.bootstrap.keepalive:true'`\n - Send buffer size: `'server.bootstrap.sendbuffersize:1048576'`\n - Receive buffer size: `'server.bootstrap.recievebuffersize:1048576'`\n - Number of connections queued: `'server.bootstrap.socket.backlog:100'`", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="trace.log.enabled", description="Enable trace log for traffic monitoring.", defaultValue="false", optional=true, type={DataType.BOOL})}, examples={@Example(syntax="@app.name('StockProcessor')\n\n@source(type='http', @map(type = 'json'))\ndefine stream StockStream (symbol string, price float, volume long);\n", description="Above HTTP source listeners on url `http://0.0.0.0:9763/StockProcessor/StockStream` for JSON messages on the format:\n```{\n  \"event\": {\n    \"symbol\": \"FB\",\n    \"price\": 24.5,\n    \"volume\": 5000\n  }\n}```It maps the incoming messages and sends them to `StockStream` for processing."), @Example(syntax="@source(type='http', receiver.url='http://localhost:5005/stocks',\n        @map(type = 'xml'))\ndefine stream StockStream (symbol string, price float, volume long);\n", description="Above HTTP source listeners on url `http://localhost:5005/stocks` for JSON messages on the format:\n```<events>\n    <event>\n        <symbol>Fb</symbol>\n        <price>55.6</price>\n        <volume>100</volume>\n    </event>\n</events>```\nIt maps the incoming messages and sends them to `StockStream` for processing.")}, systemParameter={@SystemParameter(name="serverBootstrapBossGroupSize", description="Number of boss threads to accept incoming connections.", defaultValue="Number of available processors", possibleParameters={"Any positive integer"}), @SystemParameter(name="serverBootstrapWorkerGroupSize", description="Number of worker threads to accept the connections from boss threads and perform non-blocking read and write from one or more channels.", defaultValue="(Number of available processors) * 2", possibleParameters={"Any positive integer"}), @SystemParameter(name="serverBootstrapClientGroupSize", description="Number of client threads to perform non-blocking read and write to one or more channels.", defaultValue="(Number of available processors) * 2", possibleParameters={"Any positive integer"}), @SystemParameter(name="defaultHost", description="The default host of the transport.", defaultValue="0.0.0.0", possibleParameters={"Any valid host"}), @SystemParameter(name="defaultScheme", description="The default protocol.", defaultValue="http", possibleParameters={"http", "https"}), @SystemParameter(name="defaultHttpPort", description="The default HTTP port when default scheme is `http`.", defaultValue="8280", possibleParameters={"Any valid port"}), @SystemParameter(name="defaultHttpsPort", description="The default HTTPS port when default scheme is `https`.", defaultValue="8243", possibleParameters={"Any valid port"}), @SystemParameter(name="keyStoreLocation", description="The default keystore file path.", defaultValue="`${carbon.home}/resources/security/wso2carbon.jks`", possibleParameters={"Path to `.jks` file"}), @SystemParameter(name="keyStorePassword", description="The default keystore password.", defaultValue="wso2carbon", possibleParameters={"Keystore password as string"})})
public class HttpSource
extends Source {
    private static final Logger log = LogManager.getLogger(HttpSource.class);
    protected String listenerUrl;
    protected Boolean isAuth;
    protected int workerThread;
    protected SourceEventListener sourceEventListener;
    protected String[] requestedTransportPropertyNames;
    protected ListenerConfiguration listenerConfiguration;
    private HttpConnectorRegistry httpConnectorRegistry;
    private String siddhiAppName;
    private ServiceDeploymentInfo serviceDeploymentInfo;
    private boolean isSecured;
    protected SourceMetrics metrics;

    public StateFactory init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] requestedTransportPropertyNames, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.initSource(sourceEventListener, optionHolder, requestedTransportPropertyNames, configReader, siddhiAppContext);
        this.initConnectorRegistry(optionHolder, configReader);
        this.initMetrics(siddhiAppContext.getName());
        return null;
    }

    protected void initConnectorRegistry(OptionHolder optionHolder, ConfigReader configReader) {
        String requestSizeValidationConfigList = optionHolder.validateAndGetStaticValue("request.size.validation.configurations", "");
        String serverBootstrapPropertiesList = optionHolder.validateAndGetStaticValue("server.bootstrap.configurations", "");
        this.httpConnectorRegistry = HttpConnectorRegistry.getInstance();
        this.httpConnectorRegistry.initBootstrapConfigIfFirst(configReader);
        this.httpConnectorRegistry.setTransportConfig(serverBootstrapPropertiesList, requestSizeValidationConfigList);
    }

    protected void initSource(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] requestedTransportPropertyNames, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        String defaultURL;
        int port;
        this.siddhiAppName = siddhiAppContext.getName();
        String scheme = configReader.readConfig("defaultScheme", "http");
        if ("https".equals(scheme)) {
            port = Integer.parseInt(configReader.readConfig("defaultHttpsPort", "8243"));
            defaultURL = "https://" + configReader.readConfig("defaultHost", "0.0.0.0") + ":" + port + "/" + siddhiAppContext.getName() + "/" + sourceEventListener.getStreamDefinition().getId();
        } else {
            port = Integer.parseInt(configReader.readConfig("defaultHttpPort", "8280"));
            defaultURL = "http://" + configReader.readConfig("defaultHost", "0.0.0.0") + ":" + port + "/" + siddhiAppContext.getName() + "/" + sourceEventListener.getStreamDefinition().getId();
        }
        this.listenerUrl = optionHolder.validateAndGetStaticValue("receiver.url", defaultURL);
        this.isAuth = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue("basic.auth.enabled", "false").toLowerCase(Locale.ENGLISH));
        this.workerThread = Integer.parseInt(optionHolder.validateAndGetStaticValue("worker.count", "1"));
        this.sourceEventListener = sourceEventListener;
        this.requestedTransportPropertyNames = (String[])requestedTransportPropertyNames.clone();
        int socketIdleTimeout = Integer.parseInt(optionHolder.validateAndGetStaticValue("socket.idle.timeout", "-1"));
        String verifyClient = optionHolder.validateAndGetStaticValue("ssl.verify.client", "");
        String sslProtocol = optionHolder.validateAndGetStaticValue("ssl.protocol", "");
        String tlsStoreType = optionHolder.validateAndGetStaticValue("tls.store.type", "");
        String requestSizeValidationConfigList = optionHolder.validateAndGetStaticValue("request.size.validation.configurations", "");
        String sslConfigs = optionHolder.validateAndGetStaticValue("ssl.configurations", "");
        if (sslConfigs.equalsIgnoreCase("")) {
            sslConfigs = optionHolder.validateAndGetStaticValue("parameters", "");
        }
        String traceLog = optionHolder.validateAndGetStaticValue("trace.log.enabled", configReader.readConfig("httpTraceLogEnabled", ""));
        this.listenerConfiguration = HttpSourceUtil.getListenerConfiguration(this.listenerUrl, configReader);
        if (socketIdleTimeout != -1) {
            this.listenerConfiguration.setSocketIdleTimeout(socketIdleTimeout);
        }
        if (!"".equals(verifyClient)) {
            this.listenerConfiguration.setVerifyClient(verifyClient);
        }
        if (!"".equals(sslProtocol)) {
            this.listenerConfiguration.setSSLProtocol(sslProtocol);
        }
        if (!"".equals(tlsStoreType)) {
            this.listenerConfiguration.setTLSStoreType(tlsStoreType);
        }
        if (!"".equals(traceLog)) {
            this.listenerConfiguration.setHttpTraceLogEnabled(Boolean.parseBoolean(traceLog));
        }
        if (!"".equals(requestSizeValidationConfigList)) {
            this.listenerConfiguration.setMsgSizeValidationConfig(HttpConnectorRegistry.getInstance().populateRequestSizeValidationConfiguration());
        }
        this.isSecured = this.listenerConfiguration.getScheme().equalsIgnoreCase("https");
        port = this.listenerConfiguration.getPort();
        this.listenerConfiguration.setParameters(HttpIoUtil.populateParameters(sslConfigs));
        this.serviceDeploymentInfo = new ServiceDeploymentInfo(port, this.isSecured);
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return this.serviceDeploymentInfo;
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{String.class};
    }

    public void connect(Source.ConnectionCallback connectionCallback, State state) throws ConnectionUnavailableException {
        this.httpConnectorRegistry.createHttpServerConnector(this.listenerConfiguration, this.metrics);
        this.httpConnectorRegistry.registerSourceListener(this.sourceEventListener, this.listenerUrl, this.workerThread, this.isAuth, this.requestedTransportPropertyNames, this.siddhiAppName, this.metrics);
    }

    public void disconnect() {
        this.httpConnectorRegistry.unregisterSourceListener(this.listenerUrl, this.siddhiAppName, this.metrics);
        this.httpConnectorRegistry.unregisterServerConnector(this.listenerUrl);
    }

    public void destroy() {
        this.httpConnectorRegistry.clearBootstrapConfigIfLast();
    }

    public void pause() {
        HttpSourceListener httpSourceListener = this.httpConnectorRegistry.getSourceListenersMap().get(HttpSourceUtil.getSourceListenerKey(this.listenerUrl, this.metrics));
        if (httpSourceListener != null && httpSourceListener.isRunning()) {
            httpSourceListener.pause();
        }
    }

    public void resume() {
        HttpSourceListener httpSourceListener = this.httpConnectorRegistry.getSourceListenersMap().get(HttpSourceUtil.getSourceListenerKey(this.listenerUrl, this.metrics));
        if (httpSourceListener != null && httpSourceListener.isPaused()) {
            httpSourceListener.resume();
        }
    }

    protected void initMetrics(String appName) {
        if (MetricsDataHolder.getInstance().getMetricService() != null && MetricsDataHolder.getInstance().getMetricManagementService().isEnabled()) {
            try {
                if (MetricsDataHolder.getInstance().getMetricManagementService().isReporterRunning("prometheus")) {
                    this.metrics = new SourceMetrics(appName, this.sourceEventListener.getStreamDefinition().getId(), this.listenerUrl);
                }
            }
            catch (IllegalArgumentException e) {
                log.debug("Prometheus reporter is not running. Hence http source metrics will not be initialized for " + appName);
            }
        }
    }
}

