/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.http.sink;

import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.SystemParameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.Event;
import io.siddhi.core.event.state.StateEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.query.OnDemandQueryRuntime;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.table.Table;
import io.siddhi.core.util.SiddhiAppRuntimeBuilder;
import io.siddhi.core.util.collection.operator.CompiledCondition;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.parser.OnDemandQueryParser;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.Option;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.http.metrics.EndpointStatus;
import io.siddhi.extension.io.http.metrics.SinkMetrics;
import io.siddhi.extension.io.http.sink.ClientConnector;
import io.siddhi.extension.io.http.sink.WebSubSubscriptionDTO;
import io.siddhi.extension.io.http.sink.util.HttpSinkUtil;
import io.siddhi.extension.io.http.util.HttpConstants;
import io.siddhi.extension.io.http.util.HttpIoUtil;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.definition.StreamDefinition;
import io.siddhi.query.api.definition.TableDefinition;
import io.siddhi.query.api.execution.query.OnDemandQuery;
import io.siddhi.query.api.execution.query.input.store.InputStore;
import io.siddhi.query.api.execution.query.selection.OutputAttribute;
import io.siddhi.query.api.execution.query.selection.Selector;
import io.siddhi.query.api.expression.Expression;
import io.siddhi.query.api.expression.Variable;
import io.siddhi.query.api.expression.condition.Compare;
import io.siddhi.query.api.expression.constant.StringConstant;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.wso2.carbon.messaging.Header;
import org.wso2.carbon.si.metrics.core.internal.MetricsDataHolder;
import org.wso2.transport.http.netty.contract.HttpConnectorListener;
import org.wso2.transport.http.netty.contract.HttpResponseFuture;
import org.wso2.transport.http.netty.contract.config.ChunkConfig;
import org.wso2.transport.http.netty.contract.config.ProxyServerConfiguration;
import org.wso2.transport.http.netty.contract.config.SenderConfiguration;
import org.wso2.transport.http.netty.contractimpl.DefaultHttpWsConnectorFactory;
import org.wso2.transport.http.netty.contractimpl.sender.channel.pool.PoolConfiguration;
import org.wso2.transport.http.netty.message.HttpCarbonMessage;
import org.wso2.transport.http.netty.message.HttpMessageDataStreamer;

@Extension(name="websubhub", namespace="sink", description="WebSubHubEventPublisher publishes messages via HTTP/HTTP according to the provided URL when subscribe to the WebSub hub. The table.name, hub.id are mandatory when defining the websubhub source ", parameters={@Parameter(name="hub.id", description="Id of the hub that the messages needed to process ", type={DataType.STRING}), @Parameter(name="table.name", description="Name of the table which subscription data holds related to the hub  ", type={DataType.STRING}), @Parameter(name="https.truststore.file", description="The file path of the client truststore when sending messages through `https` protocol.", type={DataType.STRING}, optional=true, defaultValue="`${carbon.home}/resources/security/client-truststore.jks`"), @Parameter(name="https.truststore.password", description="The password for the client-truststore.", type={DataType.STRING}, optional=true, defaultValue="wso2carbon"), @Parameter(name="consumer.key", description="Consumer key used for calling endpoints protected by OAuth 2.0", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="consumer.secret", description="Consumer secret used for calling endpoints protected by OAuth 2.0", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="token.url", description="Token URL to generate a new access tokens when calling endpoints protected by OAuth 2.0", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="refresh.token", description="Refresh token used for generating new access tokens when calling endpoints protected by OAuth 2.0", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="headers", description="HTTP request headers in format `\"'<key>:<value>','<key>:<value>'\"`.\nWhen `Content-Type` header is not provided the system derives the Content-Type based on the provided sink mapper as following: \n - `@map(type='xml')`: `application/xml`\n - `@map(type='json')`: `application/json`\n - `@map(type='text')`: `plain/text`\n - `@map(type='keyvalue')`: `application/x-www-form-urlencoded`\n - For all other cases system defaults to `plain/text`\nAlso the `Content-Length` header need not to be provided, as the system automatically defines it by calculating the size of the payload.", type={DataType.STRING}, optional=true, defaultValue="Content-Type and Content-Length headers"), @Parameter(name="method", description="The HTTP method used for calling the endpoint.", type={DataType.STRING}, optional=true, defaultValue="POST"), @Parameter(name="socket.idle.timeout", description="Socket timeout in millis.", type={DataType.INT}, optional=true, defaultValue="6000"), @Parameter(name="chunk.disabled", description="Disable chunked transfer encoding.", type={DataType.BOOL}, optional=true, defaultValue="false"), @Parameter(name="ssl.protocol", description="SSL/TLS protocol.", type={DataType.STRING}, optional=true, defaultValue="TLS"), @Parameter(name="ssl.verification.disabled", description="Disable SSL verification.", type={DataType.BOOL}, optional=true, defaultValue="false"), @Parameter(name="tls.store.type", description="TLS store type.", type={DataType.STRING}, optional=true, defaultValue="JKS"), @Parameter(name="ssl.configurations", description="SSL/TSL configurations in format `\"'<key>:<value>','<key>:<value>'\"`.\nSome supported parameters:\n - SSL/TLS protocols: `'sslEnabledProtocols:TLSv1.1,TLSv1.2'`\n - List of ciphers: `'ciphers:TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256'`\n - Enable session creation: `'client.enable.session.creation:true'`\n - Supported server names: `'server.suported.server.names:server'`\n - Add HTTP SNIMatcher: `'server.supported.snimatchers:SNIMatcher'`", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="proxy.host", description="Proxy server host", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="proxy.port", description="Proxy server port", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="proxy.username", description="Proxy server username", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="proxy.password", description="Proxy server password", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="client.bootstrap.configurations", description="Client bootstrap configurations in format `\"'<key>:<value>','<key>:<value>'\"`.\nSome supported configurations :\n - Client connect timeout in millis: `'client.bootstrap.connect.timeout:15000'`\n - Client socket timeout in seconds: `'client.bootstrap.socket.timeout:15'`\n - Client socket reuse: `'client.bootstrap.socket.reuse:true'`\n - Enable TCP no delay: `'client.bootstrap.nodelay:true'`\n - Enable client keep alive: `'client.bootstrap.keepalive:true'`\n - Send buffer size: `'client.bootstrap.sendbuffersize:1048576'`\n - Receive buffer size: `'client.bootstrap.recievebuffersize:1048576'`", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="max.pool.active.connections", description="Maximum possible number of active connection per client pool.", type={DataType.INT}, optional=true, defaultValue="-1"), @Parameter(name="min.pool.idle.connections", description="Minimum number of idle connections that can exist per client pool.", type={DataType.INT}, optional=true, defaultValue="0"), @Parameter(name="max.pool.idle.connections", description="Maximum number of idle connections that can exist per client pool.", type={DataType.INT}, optional=true, defaultValue="100"), @Parameter(name="executor.service.threads", description="Thread count for the executor service.", type={DataType.INT}, optional=true, defaultValue="20"), @Parameter(name="min.evictable.idle.time", description="Minimum time (in millis) a connection may sit idle in the client pool before it become eligible for eviction.", type={DataType.STRING}, optional=true, defaultValue="300000"), @Parameter(name="time.between.eviction.runs", description="Time between two eviction operations (in millis) on the client pool.", type={DataType.STRING}, optional=true, defaultValue="30000"), @Parameter(name="max.wait.time", description="The maximum time (in millis) the pool will wait (when there are no available connections) for a connection to be returned to the pool.", type={DataType.STRING}, optional=true, defaultValue="60000"), @Parameter(name="test.on.borrow", description="Enable connections to be validated before being borrowed from the client pool.", type={DataType.BOOL}, optional=true, defaultValue="true"), @Parameter(name="test.while.idle", description="Enable connections to be validated during the eviction operation (if any).", type={DataType.BOOL}, optional=true, defaultValue="true"), @Parameter(name="exhausted.action", description="Action that should be taken when the maximum number of active connections are being used. This action should be indicated as an int and possible action values are following.\n0 - Fail the request.\n1 - Block the request, until a connection returns to the pool.\n2 - Grow the connection pool size.", type={DataType.INT}, optional=true, defaultValue="1 (Block when exhausted)"), @Parameter(name="hostname.verification.enabled", description="Enable hostname verification.", type={DataType.BOOL}, optional=true, defaultValue="true")}, examples={@Example(syntax="@store(type='rdbms' , jdbc.url='jdbc:mysql://localhost:3306/production?useSSL=false', username='root', password='root', jdbc.driver.name='com.mysql.jdbc.Driver') \n@sink(type='websubhubeventpublisher', hub.id=\"anu_123\" , table.name='SessionTable',publisher.url=\"mysql://localhost:3306/production?useSSL=false\",\n@map(type='keyvalue',implicit.cast.enable='true'))\ndefine stream LowProductionAlertStream (topic string, payload string);", description="Subscribed users will received the messages generated through the hub and will publish to the callback url when subscribe. ")}, systemParameter={@SystemParameter(name="clientBootstrapClientGroupSize", description="Number of client threads to perform non-blocking read and write to one or more channels.", defaultValue="(Number of available processors) * 2", possibleParameters={"Any positive integer"}), @SystemParameter(name="clientBootstrapBossGroupSize", description="Number of boss threads to accept incoming connections.", defaultValue="Number of available processors", possibleParameters={"Any positive integer"}), @SystemParameter(name="clientBootstrapWorkerGroupSize", description="Number of worker threads to accept the connections from boss threads and perform non-blocking read and write from one or more channels.", defaultValue="(Number of available processors) * 2", possibleParameters={"Any positive integer"}), @SystemParameter(name="trustStoreLocation", description="The default truststore file path.", defaultValue="`${carbon.home}/resources/security/client-truststore.jks`", possibleParameters={"Path to client truststore `.jks` file"}), @SystemParameter(name="trustStorePassword", description="The default truststore password.", defaultValue="wso2carbon", possibleParameters={"Truststore password as string"})})
public class WebSubHubSink
extends Sink {
    private static final Logger log = LogManager.getLogger(WebSubHubSink.class);
    private final String[] outputColumns = new String[]{"callback", "topic", "secret", "lease_seconds", "timestamp"};
    protected String streamID;
    protected String consumerKey;
    protected String consumerSecret;
    protected ClientConnector staticClientConnector;
    protected SiddhiAppContext siddhiAppContext;
    protected String tokenURL;
    protected SinkMetrics metrics;
    protected long startTime;
    protected long endTime;
    String mapType;
    Option httpHeaderOption;
    Option httpMethodOption;
    String[] mandatoryColumns = new String[]{"callback", "lease_seconds", "secret", "topic", "mode", "id", "timestamp"};
    private String clientStoreFile;
    private String clientStorePass;
    private int socketIdleTimeout;
    private String sslProtocol;
    private String tlsStoreType;
    private String chunkDisabled;
    private String parametersList;
    private String clientBootstrapConfiguration;
    private ConfigReader configReader;
    private String hostnameVerificationEnabled;
    private String sslVerificationDisabled;
    private Executor executor = null;
    private DefaultHttpWsConnectorFactory httpConnectorFactory;
    private ProxyServerConfiguration proxyServerConfiguration;
    private PoolConfiguration connectionPoolConfiguration;
    private String hubId;
    private Table subscriptionTable;
    private Map<String, List<WebSubSubscriptionDTO>> webSubSubscriptionMap;
    private ScheduledExecutorService scheduledExecutorService;
    private StreamDefinition outputStreamDefinition;
    private OnDemandQueryRuntime onDemandQueryRuntime;
    private long webSubSubscriptionMapUpdateTimeInterval;
    private List<WebSubSubscriptionDTO> expiredSubscriptions = new ArrayList<WebSubSubscriptionDTO>();

    public Class[] getSupportedInputEventClasses() {
        return new Class[]{Map.class};
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public String[] getSupportedDynamicOptions() {
        return new String[]{"headers", "method", "publisher.url", "refresh.token"};
    }

    protected StateFactory init(StreamDefinition outputStreamDefinition, OptionHolder optionHolder, ConfigReader sinkConfigReader, SiddhiAppContext siddhiAppContext) {
        this.outputStreamDefinition = outputStreamDefinition;
        this.configReader = sinkConfigReader;
        this.siddhiAppContext = siddhiAppContext;
        this.streamID = siddhiAppContext.getName() + ":" + outputStreamDefinition.toString();
        this.httpHeaderOption = optionHolder.getOrCreateOption("headers", HttpConstants.DEFAULT_HEADER);
        this.httpMethodOption = optionHolder.getOrCreateOption("method", "POST");
        this.consumerKey = optionHolder.validateAndGetStaticValue("consumer.key", "");
        this.consumerSecret = optionHolder.validateAndGetStaticValue("consumer.secret", "");
        this.tokenURL = optionHolder.validateAndGetStaticValue("token.url", "");
        this.clientStoreFile = optionHolder.validateAndGetStaticValue("https.truststore.file", HttpSinkUtil.trustStorePath(this.configReader));
        this.webSubSubscriptionMapUpdateTimeInterval = Long.parseLong(optionHolder.validateAndGetStaticValue("hub.update.time", "0"));
        this.clientStorePass = optionHolder.validateAndGetStaticValue("https.truststore.password", HttpSinkUtil.trustStorePassword(this.configReader));
        this.socketIdleTimeout = Integer.parseInt(optionHolder.validateAndGetStaticValue("socket.idle.timeout", "-1"));
        this.sslProtocol = optionHolder.validateAndGetStaticValue("ssl.protocol", "");
        this.tlsStoreType = optionHolder.validateAndGetStaticValue("tls.store.type", "");
        this.chunkDisabled = optionHolder.validateAndGetStaticValue("chunk.disabled", "");
        this.hubId = optionHolder.validateAndGetStaticValue("hub.id");
        String tableName = optionHolder.validateAndGetStaticValue("table.name");
        this.subscriptionTable = this.getSubscriptionTable(tableName);
        this.scheduledExecutorService = new ScheduledThreadPoolExecutor(10);
        this.setOnDemandQueryRuntimeForFindSubscription();
        this.connectionPoolConfiguration = HttpSinkUtil.createPoolConfigurations(optionHolder);
        this.executor = Executors.newFixedThreadPool(this.connectionPoolConfiguration.getExecutorServiceThreads());
        this.parametersList = optionHolder.validateAndGetStaticValue("parameters", "");
        this.clientBootstrapConfiguration = optionHolder.validateAndGetStaticValue("client.bootstrap.configurations", "");
        this.hostnameVerificationEnabled = optionHolder.validateAndGetStaticValue("hostname.verification.enabled", "true");
        this.sslVerificationDisabled = optionHolder.validateAndGetStaticValue("ssl.verification.disabled", "false");
        this.proxyServerConfiguration = HttpSinkUtil.createProxyServerConfiguration(optionHolder, this.streamID, siddhiAppContext.getName());
        this.httpConnectorFactory = HttpSinkUtil.createConnectorFactory(this.configReader);
        this.initMetrics(outputStreamDefinition.getId());
        return null;
    }

    public void publish(Object payload, DynamicOptions dynamicOptions, State state) throws ConnectionUnavailableException {
        List<WebSubSubscriptionDTO> subscriptionListToPublish;
        ArrayList<Header> headersList;
        this.startTime = System.currentTimeMillis();
        HashMap payloadMap = (HashMap)payload;
        Object topic = payloadMap.remove("topic");
        String headers = this.httpHeaderOption.getValue(dynamicOptions);
        ArrayList<Header> arrayList = headersList = HttpSinkUtil.getHeaders(headers) != null ? HttpSinkUtil.getHeaders(headers) : new ArrayList<Header>();
        if (topic != null && (subscriptionListToPublish = this.webSubSubscriptionMap.get(topic.toString())) != null) {
            for (WebSubSubscriptionDTO webSubDTO : subscriptionListToPublish) {
                long subscriptionLeaseTime = webSubDTO.getLeaseSeconds();
                long subscribedTime = webSubDTO.getTimestamp();
                if (System.currentTimeMillis() < subscribedTime + subscriptionLeaseTime * 1000L) {
                    if (!webSubDTO.getSecret().equalsIgnoreCase("")) {
                        String signatureHeaderValue = "sha256=" + this.getHexValue(payloadMap.get("payload").toString());
                        Header signatureHeader = new Header("X-Hub-Signature", signatureHeaderValue);
                        headersList.add(signatureHeader);
                    }
                    String publisherURL = webSubDTO.getCallback();
                    ClientConnector clientConnector = this.createClientConnector(publisherURL);
                    if (this.mapType == null) {
                        this.mapType = this.getMapper().getType();
                    }
                    this.sendRequest(payloadMap, dynamicOptions, headersList, clientConnector, publisherURL);
                    continue;
                }
                log.debug("Added to expired subscription list " + webSubDTO.getCallback() + " : " + webSubDTO.getTopic());
                this.expiredSubscriptions.add(webSubDTO);
            }
            for (WebSubSubscriptionDTO expiredSubscription : this.expiredSubscriptions) {
                subscriptionListToPublish.remove(expiredSubscription);
            }
            this.webSubSubscriptionMap.replace(topic.toString(), subscriptionListToPublish);
        }
    }

    protected int sendRequest(Object payload, DynamicOptions dynamicOptions, List<Header> headersList, ClientConnector clientConnector, String publisherURL) throws ConnectionUnavailableException {
        String httpMethod = "".equals(this.httpMethodOption.getValue(dynamicOptions)) ? "POST" : this.httpMethodOption.getValue(dynamicOptions);
        String contentType = HttpSinkUtil.getContentType(this.mapType, headersList);
        String messageBody = this.getMessageBody(payload);
        HttpMethod httpReqMethod = new HttpMethod(httpMethod);
        HttpCarbonMessage cMessage = new HttpCarbonMessage(new DefaultHttpRequest(HttpVersion.HTTP_1_1, httpReqMethod, ""));
        cMessage = this.generateCarbonMessage(headersList, contentType, httpMethod, cMessage, clientConnector.getHttpURLProperties());
        if (this.metrics != null) {
            this.metrics.getTotalWritesMetric().inc();
            this.metrics.getTotalHttpWritesMetric(publisherURL).inc();
            this.metrics.getRequestSizeMetric(publisherURL).inc(HttpSinkUtil.getByteSize(messageBody));
        }
        if (!"GET".equals(httpMethod)) {
            cMessage.addHttpContent(new DefaultLastHttpContent(Unpooled.wrappedBuffer(messageBody.getBytes(Charset.defaultCharset()))));
        }
        cMessage.completeMessage();
        HttpResponseFuture responseFuture = clientConnector.send(cMessage);
        HTTPWebSubResponseListener responseListener = new HTTPWebSubResponseListener(payload, dynamicOptions, this, clientConnector.getPublisherURL());
        responseFuture.setHttpConnectorListener(responseListener);
        return 200;
    }

    private String getStringFromInputStream(InputStream in) {
        String result;
        BufferedInputStream bis = new BufferedInputStream(in);
        try (ByteArrayOutputStream bos = new ByteArrayOutputStream();){
            int data;
            while ((data = bis.read()) != -1) {
                bos.write(data);
            }
            result = bos.toString(StandardCharsets.UTF_8.toString());
        }
        catch (IOException ioe) {
            log.error("Couldn't read the complete input stream due to: " + ioe.getMessage(), (Throwable)ioe);
            return "";
        }
        return result;
    }

    public void connect() throws ConnectionUnavailableException {
        this.subscriptionTable.connectWithRetry();
        if (this.webSubSubscriptionMapUpdateTimeInterval != 0L) {
            this.scheduledExecutorService.scheduleAtFixedRate(new SubscriptionMapUpdate(false, this.onDemandQueryRuntime, this.hubId), 0L, this.webSubSubscriptionMapUpdateTimeInterval, TimeUnit.SECONDS);
        } else {
            this.scheduledExecutorService.scheduleAtFixedRate(new SubscriptionMapUpdate(true, this.onDemandQueryRuntime, this.hubId), 0L, 1L, TimeUnit.SECONDS);
        }
        this.scheduledExecutorService.scheduleAtFixedRate(new SubscriptionTableCleanupTask(this.subscriptionTable, this.siddhiAppContext), 0L, 10L, TimeUnit.SECONDS);
    }

    public void disconnect() {
        if (this.staticClientConnector != null) {
            String publisherURL = this.staticClientConnector.getPublisherURL();
            this.staticClientConnector = null;
            log.debug("Server connector for url " + publisherURL + " disconnected.");
        }
        if (this.httpConnectorFactory != null) {
            this.httpConnectorFactory.shutdownNow();
            this.httpConnectorFactory = null;
        }
        this.scheduledExecutorService.shutdownNow();
    }

    public void destroy() {
        if (this.staticClientConnector != null) {
            String publisherURL = this.staticClientConnector.getPublisherURL();
            this.staticClientConnector = null;
            log.debug("Server connector for url " + publisherURL + " disconnected.");
        }
    }

    protected void initMetrics(String streamName) {
        if (MetricsDataHolder.getInstance().getMetricService() != null && MetricsDataHolder.getInstance().getMetricManagementService().isEnabled()) {
            try {
                if (MetricsDataHolder.getInstance().getMetricManagementService().isReporterRunning("prometheus")) {
                    this.metrics = new SinkMetrics(this.siddhiAppContext.getName(), streamName);
                }
            }
            catch (IllegalArgumentException e) {
                log.debug("Prometheus reporter is not running. Hence http sink metrics will not be initialized for " + this.siddhiAppContext.getName());
            }
        }
    }

    HttpCarbonMessage generateCarbonMessage(List<Header> headers, String contentType, String httpMethod, HttpCarbonMessage cMessage, Map<String, String> httpURLProperties) {
        cMessage.setProperty("PROTOCOL", httpURLProperties.get("PROTOCOL"));
        cMessage.setProperty("TO", httpURLProperties.get("TO"));
        cMessage.setProperty("host", httpURLProperties.get("host"));
        cMessage.setProperty("port", Integer.valueOf(httpURLProperties.get("port")));
        cMessage.setHttpMethod(httpMethod);
        cMessage.setRequestUrl(httpURLProperties.get("REQUEST_URL"));
        HttpHeaders httpHeaders = cMessage.getHeaders();
        httpHeaders.set("host", cMessage.getProperty("host"));
        if (headers != null) {
            for (Header header : headers) {
                httpHeaders.set(header.getName(), (Object)header.getValue());
            }
        }
        if (contentType.contains(this.mapType)) {
            httpHeaders.set("Content-Type", (Object)contentType);
        }
        cMessage.setHttpMethod(httpMethod);
        return cMessage;
    }

    String getMessageBody(Object payload) {
        if ("keyvalue".equals(this.mapType)) {
            HashMap params = (HashMap)payload;
            return params.get("payload").toString();
        }
        return (String)payload;
    }

    public ClientConnector createClientConnector(String publisherURL) {
        String scheme = HttpSinkUtil.getScheme(publisherURL);
        Map<String, String> httpURLProperties = HttpSinkUtil.getURLProperties(publisherURL);
        SenderConfiguration senderConfig = HttpSinkUtil.getSenderConfigurations(httpURLProperties, this.clientStoreFile, this.clientStorePass, this.configReader);
        if ("".equals(publisherURL)) {
            throw new SiddhiAppCreationException("Receiver URL found empty but it is Mandatory field in http sink in " + this.streamID);
        }
        if ("https".equals(scheme) && (this.clientStoreFile == null || this.clientStorePass == null)) {
            throw new ExceptionInInitializerError("Client trustStore file path or password are empty while default scheme is 'https'. Please provide client trustStore file path and password in " + this.streamID);
        }
        if (this.proxyServerConfiguration != null) {
            senderConfig.setProxyServerConfiguration(this.proxyServerConfiguration);
        }
        senderConfig.setPoolConfiguration(this.connectionPoolConfiguration);
        if (this.socketIdleTimeout != -1) {
            senderConfig.setSocketIdleTimeout(this.socketIdleTimeout);
        }
        if (!"".equals(this.sslProtocol)) {
            senderConfig.setSSLProtocol(this.sslProtocol);
        }
        if (!"".equals(this.tlsStoreType)) {
            senderConfig.setTLSStoreType(this.tlsStoreType);
        }
        if (!"".equals(this.chunkDisabled) && this.chunkDisabled != null) {
            if (Boolean.parseBoolean(this.chunkDisabled)) {
                senderConfig.setChunkingConfig(ChunkConfig.NEVER);
            } else {
                senderConfig.setChunkingConfig(ChunkConfig.ALWAYS);
            }
        }
        if (!"".equals(this.parametersList)) {
            senderConfig.setParameters(HttpIoUtil.populateParameters(this.parametersList));
        }
        if (!"true".equalsIgnoreCase(this.hostnameVerificationEnabled)) {
            senderConfig.setHostNameVerificationEnabled(false);
        }
        if ("true".equalsIgnoreCase(this.sslVerificationDisabled)) {
            senderConfig.disableSsl();
        }
        Map<String, Object> bootStrapProperties = HttpSinkUtil.populateTransportConfiguration(this.clientBootstrapConfiguration);
        return new ClientConnector(publisherURL, httpURLProperties, this.httpConnectorFactory.createHttpClientConnector(bootStrapProperties, senderConfig));
    }

    private Table getSubscriptionTable(String tableName) {
        TableDefinition tableDefinition = TableDefinition.id((String)tableName);
        List annotationsInSourceDefinition = this.outputStreamDefinition.getAnnotations();
        for (String column : this.mandatoryColumns) {
            if (column.equals("lease_seconds") || column.equals("timestamp")) {
                tableDefinition.attribute(column, Attribute.Type.LONG);
                continue;
            }
            tableDefinition.attribute(column, Attribute.Type.STRING);
        }
        annotationsInSourceDefinition.forEach(annotation -> {
            if (annotation.getName().equalsIgnoreCase("Store")) {
                tableDefinition.annotation(annotation);
            }
        });
        SiddhiAppRuntimeBuilder siddhiAppRuntimeBuilder = new SiddhiAppRuntimeBuilder(this.siddhiAppContext);
        siddhiAppRuntimeBuilder.defineTable(tableDefinition);
        siddhiAppRuntimeBuilder.getTableDefinitionMap().put(tableName, tableDefinition);
        return (Table)siddhiAppRuntimeBuilder.getTableMap().get(tableName);
    }

    private void setOnDemandQueryRuntimeForFindSubscription() {
        ArrayList<OutputAttribute> selectionList = new ArrayList<OutputAttribute>();
        HashMap<String, Table> tableMap = new HashMap<String, Table>();
        tableMap.put(this.subscriptionTable.getTableDefinition().getId(), this.subscriptionTable);
        HashMap windowMap = new HashMap();
        HashMap aggregationRuntimeMap = new HashMap();
        for (String column : this.outputColumns) {
            selectionList.add(new OutputAttribute(new Variable(column)));
        }
        Selector selector = new Selector().addSelectionList(selectionList);
        InputStore inputStore = InputStore.store((String)this.subscriptionTable.getTableDefinition().getId()).on((Expression)new Compare((Expression)new StringConstant(this.hubId), Compare.Operator.EQUAL, (Expression)new Variable("id")));
        OnDemandQuery onDemandQuery = new OnDemandQuery().from(inputStore).select(selector);
        onDemandQuery.setType(OnDemandQuery.OnDemandQueryType.FIND);
        this.onDemandQueryRuntime = OnDemandQueryParser.parse((OnDemandQuery)onDemandQuery, null, (SiddhiAppContext)this.siddhiAppContext, tableMap, windowMap, aggregationRuntimeMap);
    }

    public void setWebSubSubscriptionMap(Map<String, List<WebSubSubscriptionDTO>> webSubSubscriptionMap) {
        this.webSubSubscriptionMap = webSubSubscriptionMap;
    }

    public void removeFromExpiredSubscriptions(WebSubSubscriptionDTO webSubSubscriptionDTO) {
        this.expiredSubscriptions.remove(webSubSubscriptionDTO);
    }

    String getHexValue(String payload) {
        return DigestUtils.sha256Hex(payload);
    }

    private class SubscriptionTableCleanupTask
    implements Runnable {
        CompiledCondition compiledCondition;

        SubscriptionTableCleanupTask(Table subscriptionTable, SiddhiAppContext siddhiAppContext) {
            HashMap<String, Table> tableMap = new HashMap<String, Table>();
            tableMap.put(subscriptionTable.getTableDefinition().getId(), subscriptionTable);
            SiddhiQueryContext siddhiQueryContext = new SiddhiQueryContext(siddhiAppContext, siddhiAppContext.getName());
            this.compiledCondition = HttpIoUtil.createTableDeleteResource(tableMap, subscriptionTable.getTableDefinition().getId(), siddhiQueryContext);
        }

        @Override
        public void run() {
            ArrayList<WebSubSubscriptionDTO> deletedSubscriptions = new ArrayList<WebSubSubscriptionDTO>();
            if (WebSubHubSink.this.expiredSubscriptions.size() > 0) {
                for (WebSubSubscriptionDTO dto2 : WebSubHubSink.this.expiredSubscriptions) {
                    log.debug("Deleting record " + dto2.getCallback() + " : " + dto2.getTopic());
                    try {
                        WebSubHubSink.this.subscriptionTable.deleteEvents(this.generateComplexEvent(dto2), this.compiledCondition, 1);
                        deletedSubscriptions.add(dto2);
                    }
                    catch (Throwable e) {
                        log.error("Error occurred while deleting expired subscription record from database table. callbackUrl :" + dto2.getCallback() + ", Topic " + dto2.getTopic(), e);
                        WebSubHubSink.this.expiredSubscriptions.remove(dto2);
                    }
                }
                log.info(deletedSubscriptions.size() + "Expired subscriptions deleted successfully.");
                deletedSubscriptions.forEach(dto -> WebSubHubSink.this.expiredSubscriptions.remove(dto));
            }
        }

        private ComplexEventChunk generateComplexEvent(WebSubSubscriptionDTO dto) {
            Object[] event = new Object[]{dto.getCallback(), dto.getTopic()};
            StreamEvent complexEvent = new StreamEvent(0, 0, 2);
            StateEvent stateEvent = new StateEvent(1, 2);
            complexEvent.setOutputData(event);
            stateEvent.addEvent(0, complexEvent);
            stateEvent.setType(ComplexEvent.Type.CURRENT);
            ComplexEventChunk complexEventChunk = new ComplexEventChunk();
            complexEventChunk.add((ComplexEvent)stateEvent);
            return complexEventChunk;
        }
    }

    private class SubscriptionMapUpdate
    implements Runnable {
        boolean isStateCheck;
        OnDemandQueryRuntime onDemandQueryRuntime;
        String hubId;
        boolean initialExecution = true;

        SubscriptionMapUpdate(boolean isStateCheck, OnDemandQueryRuntime onDemandQueryRuntime, String hubId) {
            this.isStateCheck = isStateCheck;
            this.onDemandQueryRuntime = onDemandQueryRuntime;
            this.hubId = hubId;
        }

        @Override
        public void run() {
            if (this.isStateCheck) {
                boolean status = HttpIoUtil.isWebSubSinkUpdated(this.hubId);
                if (status || this.initialExecution) {
                    log.debug("Running SubscriptionMapUpdate task mode Status Check: " + this.isStateCheck + " status : " + status);
                    this.updateSubscriptionMap();
                    this.initialExecution = false;
                }
            } else {
                this.updateSubscriptionMap();
            }
        }

        public void updateSubscriptionMap() {
            HashMap<String, List<WebSubSubscriptionDTO>> tempWebSubSubscriptionDTOMap = new HashMap<String, List<WebSubSubscriptionDTO>>();
            try {
                Event[] events = this.onDemandQueryRuntime.execute();
                if (events != null && events.length > 0) {
                    for (Event event : events) {
                        String topic = event.getData(1).toString();
                        List webSubSubscriptionList = tempWebSubSubscriptionDTOMap.getOrDefault(topic, new ArrayList());
                        webSubSubscriptionList.add(new WebSubSubscriptionDTO(this.hubId, event.getData(0), topic, event.getData(2), event.getData(3), event.getData(4)));
                        tempWebSubSubscriptionDTOMap.put(topic, webSubSubscriptionList);
                    }
                }
                WebSubHubSink.this.setWebSubSubscriptionMap(tempWebSubSubscriptionDTOMap);
            }
            catch (Exception e) {
                log.error("Error occurred while updating webSubSubscriptionMap ", (Throwable)e);
            }
        }
    }

    private class HTTPWebSubResponseListener
    implements HttpConnectorListener {
        private final String publisherURL;
        Object payload;
        DynamicOptions dynamicOptions;
        WebSubHubSink httpSink;

        HTTPWebSubResponseListener(Object payload, DynamicOptions dynamicOptions, WebSubHubSink httpSink, String publisherURL) {
            this.payload = payload;
            this.dynamicOptions = dynamicOptions;
            this.httpSink = httpSink;
            this.publisherURL = publisherURL;
        }

        @Override
        public void onMessage(final HttpCarbonMessage httpCarbonMessage) {
            WebSubHubSink.this.endTime = System.currentTimeMillis();
            if (WebSubHubSink.this.metrics != null) {
                WebSubHubSink.this.metrics.setEndpointStatusMetric(this.publisherURL, EndpointStatus.ONLINE);
                WebSubHubSink.this.metrics.setLatencyMetric(this.publisherURL, WebSubHubSink.this.endTime - WebSubHubSink.this.startTime);
                WebSubHubSink.this.metrics.setLastEventTime(this.publisherURL, WebSubHubSink.this.endTime);
                if (httpCarbonMessage.getHttpStatusCode() / 100 != 2) {
                    WebSubHubSink.this.metrics.getTotalHttpErrorsMetric(this.publisherURL).inc();
                }
            }
            if (WebSubHubSink.this.executor != null) {
                WebSubHubSink.this.executor.execute(new Runnable(){

                    @Override
                    public void run() {
                        WebSubHubSink.this.getStringFromInputStream(new HttpMessageDataStreamer(httpCarbonMessage).getInputStream());
                    }
                });
            }
        }

        @Override
        public void onError(Throwable throwable) {
            if (WebSubHubSink.this.metrics != null) {
                WebSubHubSink.this.metrics.getTotalHttpErrorsMetric(this.publisherURL).inc();
                WebSubHubSink.this.metrics.setEndpointStatusMetric(this.publisherURL, EndpointStatus.OFFLINE);
            }
            this.httpSink.onError(this.payload, this.dynamicOptions, (Exception)new ConnectionUnavailableException("HTTP sink on stream " + this.httpSink.streamID + " of Siddhi App '" + this.httpSink.siddhiAppContext.getName() + "' failed to publish events to endpoint '" + this.publisherURL + "'. " + throwable.getMessage(), throwable));
        }
    }
}

