package io.siddhi.extension.io.http.source;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.SystemParameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.table.Table;
import io.siddhi.core.util.SiddhiAppRuntimeBuilder;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.Option;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.http.metrics.SourceMetrics;
import io.siddhi.extension.io.http.source.util.HttpSourceUtil;
import io.siddhi.extension.io.http.util.HttpConstants;
import io.siddhi.extension.io.http.util.HttpIoUtil;
import io.siddhi.query.api.annotation.Annotation;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.definition.TableDefinition;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.log4j.Logger;
import org.wso2.carbon.si.metrics.core.internal.MetricsDataHolder;
import org.wso2.transport.http.netty.contract.Constants;
import org.wso2.transport.http.netty.contract.config.ListenerConfiguration;

@Extension(name = "websubhub", namespace = "source", description = " WebSub Hub source receive subscription requests via Http and according to the request, the subscription details will be saved to the given table and against the callback and topic name. The subscription request  MUST have a Content-Type header of application/x-www-form-urlencoded and following MUST provide as parameter body hub.callback \t - REQUIRED. The subscriber's callback URL where content distribution notifications should be delivered. The callback URL SHOULD be an unguessable URL that is unique per subscription.\nhub.mode \t - REQUIRED. The literal string \"subscribe\" or \"unsubscribe\", depending on the goal of the request.\nhub.topic \t - REQUIRED. The topic URL that the subscriber wishes to subscribe to or unsubscribe from.\n hub.lease_seconds\t - OPTIONAL. Number of seconds for which the subscriber would like to have the subscription active, given as a positive decimal integer. \nhub.secret\t - OPTIONAL. A subscriber-provided cryptographically random unique secret string that will be used to compute an HMAC digest for authorized content distribution. If not supplied, the HMAC digest will not be present for content distribution requests. ", parameters = {@Parameter(name = HttpConstants.HUB_ID, description = "Unique id for the WebSub Hub", type = {DataType.STRING}), @Parameter(name = HttpConstants.WEB_SUB_SUBSCRIPTION_DATA_TABLE_KEY, description = "Table name to store the subscription details related to the hub ", type = {DataType.STRING}), @Parameter(name = HttpConstants.RECEIVER_URL, description = "The URL on which events should be received. To enable SSL use `https` protocol in the url.", type = {DataType.STRING}, optional = true, defaultValue = "`http://0.0.0.0:9763/<appNAme>/<streamName>`"), @Parameter(name = HttpConstants.TOPIC_LIST, description = "topics allowed in the websub hub", type = {DataType.STRING}, defaultValue = "empty"), @Parameter(name = HttpConstants.IS_AUTH, description = "This only works in VM, Docker and Kubernetes.\nWhere when enabled it authenticates each request using the `Authorization:'Basic encodeBase64(username:Password)'` header.", type = {DataType.STRING}, optional = true, defaultValue = "false"), @Parameter(name = HttpConstants.WORKER_COUNT, description = "The number of active worker threads to serve the incoming events. By default the value is set to `1` to ensure events are processed in the same order they arrived. By increasing this value, higher performance can be achieved in the expense of loosing event ordering.", type = {DataType.INT}, optional = true, defaultValue = "1"), @Parameter(name = HttpConstants.SOCKET_IDEAL_TIMEOUT, description = "Idle timeout for HTTP connection in millis.", type = {DataType.INT}, optional = true, defaultValue = "120000"), @Parameter(name = HttpConstants.SSL_VERIFY_CLIENT, description = "The type of client certificate verification. Supported values are `require`, `optional`.", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.SSL_PROTOCOL, description = "SSL/TLS protocol.", type = {DataType.STRING}, optional = true, defaultValue = Constants.TLS_PROTOCOL), @Parameter(name = HttpConstants.TLS_STORE_TYPE, description = "TLS store type.", type = {DataType.STRING}, optional = true, defaultValue = Constants.JKS), @Parameter(name = HttpConstants.SSS_CONFIGS, description = "SSL/TSL configurations in format `\"'<key>:<value>','<key>:<value>'\"`.\nSome supported parameters:\n - SSL/TLS protocols: `'sslEnabledProtocols:TLSv1.1,TLSv1.2'`\n - List of ciphers: `'ciphers:TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256'`\n - Enable session creation: `'client.enable.session.creation:true'`\n - Supported server names: `'server.suported.server.names:server'`\n - Add HTTP SNIMatcher: `'server.supported.snimatchers:SNIMatcher'`", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.REQUEST_SIZE_VALIDATION_CONFIGS, description = "Configurations to validate the HTTP request size.\nExpected format `\"'<key>:<value>','<key>:<value>'\"`.\nSome supported configurations :\n - Enable request size validation: `'request.size.validation:true'`\n If request size is validated\n - Maximum request size: `'request.size.validation.maximum.value:2048'`\n - Response status code when request size validation fails: `'request.size.validation.reject.status.code:401'`\n - Response message when request size validation fails: `'request.size.validation.reject.message:Message is bigger than the valid size'`\n - Response Content-Type when request size validation fails: `'request.size.validation.reject.message.content.type:plain/text'`", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = "header.validation.configurations", description = "Configurations to validate HTTP headers.\nExpected format `\"'<key>:<value>','<key>:<value>'\"`.\nSome supported configurations :\n - Enable header size validation: `'header.size.validation:true'`\n If header size is validated\n - Maximum length of initial line: `'header.validation.maximum.request.line:4096'`\n - Maximum length of all headers: `'header.validation.maximum.size:8192'`\n - Maximum length of the content or each chunk: `'header.validation.maximum.chunk.size:8192'`\n - Response status code when header validation fails: `'header.validation.reject.status.code:401'`\n - Response message when header validation fails: `'header.validation.reject.message:Message header is bigger than the valid size'`\n - Response Content-Type when header validation fails: `'header.validation.reject.message.content.type:plain/text'`", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.SERVER_BOOTSTRAP_CONFIGS, description = "Server bootstrap configurations in format `\"'<key>:<value>','<key>:<value>'\"`.\nSome supported configurations :\n - Server connect timeout in millis: `'server.bootstrap.connect.timeout:15000'`\n - Server socket timeout in seconds: `'server.bootstrap.socket.timeout:15'`\n - Enable TCP no delay: `'server.bootstrap.nodelay:true'`\n - Enable server keep alive: `'server.bootstrap.keepalive:true'`\n - Send buffer size: `'server.bootstrap.sendbuffersize:1048576'`\n - Receive buffer size: `'server.bootstrap.recievebuffersize:1048576'`\n - Number of connections queued: `'server.bootstrap.socket.backlog:100'`", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.TRACE_LOG_ENABLED, description = "Enable trace log for traffic monitoring.", defaultValue = "false", optional = true, type = {DataType.BOOL})}, examples = {@Example(syntax = "@app.name('StockProcessor')\n\n@store(type='rdbms' , jdbc.url='jdbc:mysql://localhost:3306/production?useSSL=false', username='root', password='root', jdbc.driver.name='com.mysql.jdbc.Driver') \n@source(type='websubhub' , hub.id='anu_123',table.name='SessionTable', receiver.url='http://localhost:8006/productionStream',basic.auth.enabled='false', @map(type='keyvalue',implicit.cast.enable='true')) \ndefine stream webSubStream(callback string, lease_seconds long, secret string, topic string, mode string);\n", description = "Above WebSubHub listening on http://localhost:8006/productionStream for thesubscription requests.")}, systemParameter = {@SystemParameter(name = HttpConstants.SERVER_BOOTSTRAP_BOSS_GROUP_SIZE, description = "Number of boss threads to accept incoming connections.", defaultValue = "Number of available processors", possibleParameters = {"Any positive integer"}), @SystemParameter(name = HttpConstants.SERVER_BOOTSTRAP_WORKER_GROUP_SIZE, description = "Number of worker threads to accept the connections from boss threads and perform non-blocking read and write from one or more channels.", defaultValue = "(Number of available processors) * 2", possibleParameters = {"Any positive integer"}), @SystemParameter(name = HttpConstants.SERVER_BOOTSTRAP_CLIENT_GROUP_SIZE, description = "Number of client threads to perform non-blocking read and write to one or more channels.", defaultValue = "(Number of available processors) * 2", possibleParameters = {"Any positive integer"}), @SystemParameter(name = HttpConstants.DEFAULT_HOST, description = "The default host of the transport.", defaultValue = "0.0.0.0", possibleParameters = {"Any valid host"}), @SystemParameter(name = HttpConstants.DEFAULT_SOURCE_SCHEME, description = "The default protocol.", defaultValue = "http", possibleParameters = {"http", "https"}), @SystemParameter(name = HttpConstants.HTTP_PORT, description = "The default HTTP port when default scheme is `http`.", defaultValue = HttpConstants.HTTP_PORT_VALUE, possibleParameters = {"Any valid port"}), @SystemParameter(name = HttpConstants.HTTPS_PORT, description = "The default HTTPS port when default scheme is `https`.", defaultValue = HttpConstants.HTTPS_PORT_VALUE, possibleParameters = {"Any valid port"}), @SystemParameter(name = HttpConstants.KEYSTORE_FILE, description = "The default keystore file path.", defaultValue = "`${carbon.home}/resources/security/wso2carbon.jks`", possibleParameters = {"Path to `.jks` file"}), @SystemParameter(name = "keyStorePassword", description = "The default keystore password.", defaultValue = "wso2carbon", possibleParameters = {"Keystore password as string"})})
/* loaded from: input_file:io/siddhi/extension/io/http/source/HttpWebSubSource.class */
public class HttpWebSubSource extends Source {
    private static final Logger log = Logger.getLogger(HttpWebSubSource.class);
    public String siddhiAppName;
    protected SourceMetrics metrics;
    private HttpConnectorRegistry httpConnectorRegistry;
    private SourceEventListener sourceEventListener;
    private OptionHolder optionHolder;
    private ConfigReader configReader;
    private SiddhiAppContext siddhiAppContext;
    private String listenerUrl;
    private boolean isAuth;
    private ListenerConfiguration listenerConfiguration;
    private int workerThread;
    private String[] requestedTransportPropertyNames;
    private boolean isSecured;
    private ServiceDeploymentInfo serviceDeploymentInfo;
    private Table webSubMetaTable;
    private Option httpHeaderOption;
    private String hubId;
    private String topicList;
    private List<String> topics = new ArrayList();

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return this.serviceDeploymentInfo;
    }

    public StateFactory init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] strArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        String str;
        this.siddhiAppName = siddhiAppContext.getName();
        initConnectorRegistry(optionHolder, configReader);
        this.sourceEventListener = sourceEventListener;
        this.optionHolder = optionHolder;
        this.configReader = configReader;
        this.siddhiAppContext = siddhiAppContext;
        this.hubId = optionHolder.validateAndGetStaticValue(HttpConstants.HUB_ID);
        String readConfig = configReader.readConfig(HttpConstants.DEFAULT_SOURCE_SCHEME, "http");
        this.topicList = optionHolder.validateAndGetStaticValue(HttpConstants.TOPIC_LIST);
        this.topics = validateTopics(this.topicList);
        this.httpHeaderOption = optionHolder.getOrCreateOption(HttpConstants.HEADERS, HttpConstants.DEFAULT_HEADER);
        String validateAndGetStaticValue = optionHolder.validateAndGetStaticValue(HttpConstants.WEB_SUB_SUBSCRIPTION_DATA_TABLE_KEY, HttpConstants.WEB_SUB_SUBSCRIPTION_DATA_TABLE_DEFAULT_NAME);
        SiddhiAppRuntimeBuilder siddhiAppRuntimeBuilder = new SiddhiAppRuntimeBuilder(this.siddhiAppContext);
        TableDefinition generateSubscriptionTableDefinition = generateSubscriptionTableDefinition(validateAndGetStaticValue, sourceEventListener);
        siddhiAppRuntimeBuilder.defineTable(generateSubscriptionTableDefinition);
        siddhiAppRuntimeBuilder.getTableDefinitionMap().put(validateAndGetStaticValue, generateSubscriptionTableDefinition);
        this.webSubMetaTable = (Table) siddhiAppRuntimeBuilder.getTableMap().get(validateAndGetStaticValue);
        if ("https".equals(readConfig)) {
            str = "https://" + configReader.readConfig(HttpConstants.DEFAULT_HOST, "0.0.0.0") + ":" + Integer.parseInt(configReader.readConfig(HttpConstants.HTTPS_PORT, HttpConstants.HTTPS_PORT_VALUE)) + "/" + siddhiAppContext.getName() + "/" + sourceEventListener.getStreamDefinition().getId();
        } else {
            str = "http://" + configReader.readConfig(HttpConstants.DEFAULT_HOST, "0.0.0.0") + ":" + Integer.parseInt(configReader.readConfig(HttpConstants.HTTP_PORT, HttpConstants.HTTP_PORT_VALUE)) + "/" + siddhiAppContext.getName() + "/" + sourceEventListener.getStreamDefinition().getId();
        }
        this.listenerUrl = optionHolder.validateAndGetStaticValue(HttpConstants.RECEIVER_URL, str);
        this.isAuth = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(HttpConstants.IS_AUTH, "false").toLowerCase(Locale.ENGLISH));
        this.workerThread = Integer.parseInt(optionHolder.validateAndGetStaticValue(HttpConstants.WORKER_COUNT, "1"));
        this.sourceEventListener = sourceEventListener;
        this.requestedTransportPropertyNames = (String[]) strArr.clone();
        int parseInt = Integer.parseInt(optionHolder.validateAndGetStaticValue(HttpConstants.SOCKET_IDEAL_TIMEOUT, "-1"));
        String validateAndGetStaticValue2 = optionHolder.validateAndGetStaticValue(HttpConstants.SSL_VERIFY_CLIENT, "");
        String validateAndGetStaticValue3 = optionHolder.validateAndGetStaticValue(HttpConstants.SSL_PROTOCOL, "");
        String validateAndGetStaticValue4 = optionHolder.validateAndGetStaticValue(HttpConstants.TLS_STORE_TYPE, "");
        String validateAndGetStaticValue5 = optionHolder.validateAndGetStaticValue(HttpConstants.REQUEST_SIZE_VALIDATION_CONFIGS, "");
        String validateAndGetStaticValue6 = optionHolder.validateAndGetStaticValue(HttpConstants.SSS_CONFIGS, "");
        if (validateAndGetStaticValue6.equalsIgnoreCase("")) {
            validateAndGetStaticValue6 = optionHolder.validateAndGetStaticValue("parameters", "");
        }
        String validateAndGetStaticValue7 = optionHolder.validateAndGetStaticValue(HttpConstants.TRACE_LOG_ENABLED, configReader.readConfig("httpTraceLogEnabled", ""));
        this.listenerConfiguration = HttpSourceUtil.getListenerConfiguration(this.listenerUrl, configReader);
        if (parseInt != -1) {
            this.listenerConfiguration.setSocketIdleTimeout(parseInt);
        }
        if (!"".equals(validateAndGetStaticValue2)) {
            this.listenerConfiguration.setVerifyClient(validateAndGetStaticValue2);
        }
        if (!"".equals(validateAndGetStaticValue3)) {
            this.listenerConfiguration.setSSLProtocol(validateAndGetStaticValue3);
        }
        if (!"".equals(validateAndGetStaticValue4)) {
            this.listenerConfiguration.setTLSStoreType(validateAndGetStaticValue4);
        }
        if (!"".equals(validateAndGetStaticValue7)) {
            this.listenerConfiguration.setHttpTraceLogEnabled(Boolean.parseBoolean(validateAndGetStaticValue7));
        }
        if (!"".equals(validateAndGetStaticValue5)) {
            this.listenerConfiguration.setMsgSizeValidationConfig(HttpConnectorRegistry.getInstance().populateRequestSizeValidationConfiguration());
        }
        this.isSecured = this.listenerConfiguration.getScheme().equalsIgnoreCase("https");
        int port = this.listenerConfiguration.getPort();
        this.listenerConfiguration.setParameters(HttpIoUtil.populateParameters(validateAndGetStaticValue6));
        this.serviceDeploymentInfo = new ServiceDeploymentInfo(port, this.isSecured);
        initMetrics(siddhiAppContext.getName());
        return null;
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{Map.class};
    }

    public void connect(Source.ConnectionCallback connectionCallback, State state) throws ConnectionUnavailableException {
        this.httpConnectorRegistry.createHttpServerConnector(this.listenerConfiguration, this.metrics);
        this.httpConnectorRegistry.registerSourceListener(this.sourceEventListener, this.listenerUrl, this.workerThread, Boolean.valueOf(this.isAuth), this.requestedTransportPropertyNames, this.siddhiAppName, this.metrics, this.webSubMetaTable, this.hubId, this.siddhiAppContext, this.topics);
    }

    protected void initConnectorRegistry(OptionHolder optionHolder, ConfigReader configReader) {
        String validateAndGetStaticValue = optionHolder.validateAndGetStaticValue(HttpConstants.REQUEST_SIZE_VALIDATION_CONFIGS, "");
        String validateAndGetStaticValue2 = optionHolder.validateAndGetStaticValue(HttpConstants.SERVER_BOOTSTRAP_CONFIGS, "");
        this.httpConnectorRegistry = HttpConnectorRegistry.getInstance();
        this.httpConnectorRegistry.initBootstrapConfigIfFirst(configReader);
        this.httpConnectorRegistry.setTransportConfig(validateAndGetStaticValue2, validateAndGetStaticValue);
    }

    public void disconnect() {
        this.httpConnectorRegistry.unregisterSourceListener(this.listenerUrl, this.siddhiAppName, this.metrics);
        this.httpConnectorRegistry.unregisterServerConnector(this.listenerUrl);
    }

    public void destroy() {
        this.httpConnectorRegistry.clearBootstrapConfigIfLast();
    }

    public void pause() {
        HttpSourceListener httpSourceListener = this.httpConnectorRegistry.getSourceListenersMap().get(HttpSourceUtil.getSourceListenerKey(this.listenerUrl, this.metrics));
        if (httpSourceListener == null || !httpSourceListener.isRunning()) {
            return;
        }
        httpSourceListener.pause();
    }

    public void resume() {
        HttpSourceListener httpSourceListener = this.httpConnectorRegistry.getSourceListenersMap().get(HttpSourceUtil.getSourceListenerKey(this.listenerUrl, this.metrics));
        if (httpSourceListener == null || !httpSourceListener.isPaused()) {
            return;
        }
        httpSourceListener.resume();
    }

    protected void initMetrics(String str) {
        if (MetricsDataHolder.getInstance().getMetricService() == null || !MetricsDataHolder.getInstance().getMetricManagementService().isEnabled()) {
            return;
        }
        try {
            if (MetricsDataHolder.getInstance().getMetricManagementService().isReporterRunning(HttpConstants.PROMETHEUS_REPORTER_NAME)) {
                this.metrics = new SourceMetrics(str, this.sourceEventListener.getStreamDefinition().getId(), this.listenerUrl);
            }
        } catch (IllegalArgumentException e) {
            log.debug("Prometheus reporter is not running. Hence http source metrics will not be initialized for " + str);
        }
    }

    private TableDefinition generateSubscriptionTableDefinition(String str, SourceEventListener sourceEventListener) {
        ArrayList arrayList = new ArrayList();
        for (Annotation annotation : sourceEventListener.getStreamDefinition().getAnnotations()) {
            if (annotation.getName().equalsIgnoreCase("Store")) {
                arrayList.add(annotation);
            }
        }
        Annotation annotation2 = new Annotation("PrimaryKey");
        Annotation annotation3 = new Annotation("Index");
        TableDefinition id = TableDefinition.id(str);
        for (Attribute attribute : sourceEventListener.getStreamDefinition().getAttributeList()) {
            id.attribute(attribute.getName(), attribute.getType());
        }
        annotation2.element("callback");
        annotation2.element("topic");
        annotation3.element(HttpConstants.HUB_ID_COLUMN_NAME);
        arrayList.add(annotation3);
        arrayList.add(annotation2);
        id.attribute(HttpConstants.HUB_ID_COLUMN_NAME, Attribute.Type.STRING);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            id.annotation((Annotation) it.next());
        }
        id.attribute(HttpConstants.REQUEST_TIMESTAMP, Attribute.Type.LONG);
        return id;
    }

    private List<String> validateTopics(String str) {
        return new ArrayList(Arrays.asList(str.split(Constants.COMMA)));
    }
}
