package io.siddhi.extension.io.http.sink;

import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.SystemParameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.Event;
import io.siddhi.core.event.state.StateEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.query.OnDemandQueryRuntime;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.table.Table;
import io.siddhi.core.util.SiddhiAppRuntimeBuilder;
import io.siddhi.core.util.collection.operator.CompiledCondition;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.parser.OnDemandQueryParser;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.Option;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.http.metrics.EndpointStatus;
import io.siddhi.extension.io.http.metrics.SinkMetrics;
import io.siddhi.extension.io.http.sink.util.HttpSinkUtil;
import io.siddhi.extension.io.http.util.HttpConstants;
import io.siddhi.extension.io.http.util.HttpIoUtil;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.definition.StreamDefinition;
import io.siddhi.query.api.definition.TableDefinition;
import io.siddhi.query.api.execution.query.OnDemandQuery;
import io.siddhi.query.api.execution.query.input.store.InputStore;
import io.siddhi.query.api.execution.query.selection.OutputAttribute;
import io.siddhi.query.api.execution.query.selection.Selector;
import io.siddhi.query.api.expression.Variable;
import io.siddhi.query.api.expression.condition.Compare;
import io.siddhi.query.api.expression.constant.StringConstant;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.log4j.Logger;
import org.wso2.carbon.messaging.Header;
import org.wso2.carbon.si.metrics.core.internal.MetricsDataHolder;
import org.wso2.transport.http.netty.contract.Constants;
import org.wso2.transport.http.netty.contract.HttpConnectorListener;
import org.wso2.transport.http.netty.contract.config.ChunkConfig;
import org.wso2.transport.http.netty.contract.config.ProxyServerConfiguration;
import org.wso2.transport.http.netty.contract.config.SenderConfiguration;
import org.wso2.transport.http.netty.contractimpl.DefaultHttpWsConnectorFactory;
import org.wso2.transport.http.netty.contractimpl.sender.channel.pool.PoolConfiguration;
import org.wso2.transport.http.netty.message.HttpCarbonMessage;
import org.wso2.transport.http.netty.message.HttpMessageDataStreamer;

@Extension(name = "websubhub", namespace = "sink", description = "WebSubHubEventPublisher publishes messages via HTTP/HTTP according to the provided URL when subscribe to the WebSub hub. The table.name, hub.id and ", parameters = {@Parameter(name = HttpConstants.HUB_ID, description = "Id of the hub that the messages needed to process ", type = {DataType.STRING}), @Parameter(name = HttpConstants.WEB_SUB_SUBSCRIPTION_DATA_TABLE_KEY, description = "Name of the table which subscription data holds related to the hub  ", type = {DataType.STRING}), @Parameter(name = HttpConstants.CLIENT_TRUSTSTORE_PATH_PARAM, description = "The file path of the client truststore when sending messages through `https` protocol.", type = {DataType.STRING}, optional = true, defaultValue = "`${carbon.home}/resources/security/client-truststore.jks`"), @Parameter(name = HttpConstants.CLIENT_TRUSTSTORE_PASSWORD_PARAM, description = "The password for the client-truststore.", type = {DataType.STRING}, optional = true, defaultValue = "wso2carbon"), @Parameter(name = HttpConstants.CONSUMER_KEY, description = "Consumer key used for calling endpoints protected by OAuth 2.0", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.CONSUMER_SECRET, description = "Consumer secret used for calling endpoints protected by OAuth 2.0", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.TOKEN_URL, description = "Token URL to generate a new access tokens when calling endpoints protected by OAuth 2.0", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.RECEIVER_REFRESH_TOKEN, description = "Refresh token used for generating new access tokens when calling endpoints protected by OAuth 2.0", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.HEADERS, description = "HTTP request headers in format `\"'<key>:<value>','<key>:<value>'\"`.\nWhen `Content-Type` header is not provided the system derives the Content-Type based on the provided sink mapper as following: \n - `@map(type='xml')`: `application/xml`\n - `@map(type='json')`: `application/json`\n - `@map(type='text')`: `plain/text`\n - `@map(type='keyvalue')`: `application/x-www-form-urlencoded`\n - For all other cases system defaults to `plain/text`\nAlso the `Content-Length` header need not to be provided, as the system automatically defines it by calculating the size of the payload.", type = {DataType.STRING}, optional = true, defaultValue = "Content-Type and Content-Length headers"), @Parameter(name = HttpConstants.METHOD, description = "The HTTP method used for calling the endpoint.", type = {DataType.STRING}, optional = true, defaultValue = "POST"), @Parameter(name = HttpConstants.SOCKET_IDEAL_TIMEOUT, description = "Socket timeout in millis.", type = {DataType.INT}, optional = true, defaultValue = "6000"), @Parameter(name = HttpConstants.CLIENT_CHUNK_DISABLED, description = "Disable chunked transfer encoding.", type = {DataType.BOOL}, optional = true, defaultValue = "false"), @Parameter(name = HttpConstants.SSL_PROTOCOL, description = "SSL/TLS protocol.", type = {DataType.STRING}, optional = true, defaultValue = Constants.TLS_PROTOCOL), @Parameter(name = HttpConstants.SSL_VERIFICATION_DISABLED, description = "Disable SSL verification.", type = {DataType.BOOL}, optional = true, defaultValue = "false"), @Parameter(name = HttpConstants.TLS_STORE_TYPE, description = "TLS store type.", type = {DataType.STRING}, optional = true, defaultValue = Constants.JKS), @Parameter(name = HttpConstants.SSS_CONFIGS, description = "SSL/TSL configurations in format `\"'<key>:<value>','<key>:<value>'\"`.\nSome supported parameters:\n - SSL/TLS protocols: `'sslEnabledProtocols:TLSv1.1,TLSv1.2'`\n - List of ciphers: `'ciphers:TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256'`\n - Enable session creation: `'client.enable.session.creation:true'`\n - Supported server names: `'server.suported.server.names:server'`\n - Add HTTP SNIMatcher: `'server.supported.snimatchers:SNIMatcher'`", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.PROXY_HOST, description = "Proxy server host", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.PROXY_PORT, description = "Proxy server port", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.PROXY_USERNAME, description = "Proxy server username", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.PROXY_PASSWORD, description = "Proxy server password", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.CLIENT_BOOTSTRAP_CONFIGURATION, description = "Client bootstrap configurations in format `\"'<key>:<value>','<key>:<value>'\"`.\nSome supported configurations :\n - Client connect timeout in millis: `'client.bootstrap.connect.timeout:15000'`\n - Client socket timeout in seconds: `'client.bootstrap.socket.timeout:15'`\n - Client socket reuse: `'client.bootstrap.socket.reuse:true'`\n - Enable TCP no delay: `'client.bootstrap.nodelay:true'`\n - Enable client keep alive: `'client.bootstrap.keepalive:true'`\n - Send buffer size: `'client.bootstrap.sendbuffersize:1048576'`\n - Receive buffer size: `'client.bootstrap.recievebuffersize:1048576'`", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.MAX_ACTIVE_CONNECTIONS_PER_POOL, description = "Maximum possible number of active connection per client pool.", type = {DataType.INT}, optional = true, defaultValue = "-1"), @Parameter(name = HttpConstants.MIN_IDLE_CONNECTIONS_PER_POOL, description = "Minimum number of idle connections that can exist per client pool.", type = {DataType.INT}, optional = true, defaultValue = HttpConstants.DEFAULT_MIN_IDLE_CONNECTIONS_PER_POOL), @Parameter(name = HttpConstants.MAX_IDLE_CONNECTIONS_PER_POOL, description = "Maximum number of idle connections that can exist per client pool.", type = {DataType.INT}, optional = true, defaultValue = HttpConstants.DEFAULT_MAX_IDLE_CONNECTIONS_PER_POOL), @Parameter(name = HttpConstants.MIN_EVICTABLE_IDLE_TIME, description = "Minimum time (in millis) a connection may sit idle in the client pool before it become eligible for eviction.", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.DEFAULT_MIN_EVICTABLE_IDLE_TIME), @Parameter(name = HttpConstants.TIME_BETWEEN_EVICTION_RUNS, description = "Time between two eviction operations (in millis) on the client pool.", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.DEFAULT_TIME_BETWEEN_EVICTION_RUNS), @Parameter(name = HttpConstants.MAX_WAIT_TIME, description = "The maximum time (in millis) the pool will wait (when there are no available connections) for a connection to be returned to the pool.", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.DEFAULT_MAX_WAIT_TIME), @Parameter(name = HttpConstants.TEST_ON_BORROW, description = "Enable connections to be validated before being borrowed from the client pool.", type = {DataType.BOOL}, optional = true, defaultValue = "true"), @Parameter(name = HttpConstants.TEST_WHILE_IDLE, description = "Enable connections to be validated during the eviction operation (if any).", type = {DataType.BOOL}, optional = true, defaultValue = "true"), @Parameter(name = HttpConstants.EXHAUSTED_ACTION, description = "Action that should be taken when the maximum number of active connections are being used. This action should be indicated as an int and possible action values are following.\n0 - Fail the request.\n1 - Block the request, until a connection returns to the pool.\n2 - Grow the connection pool size.", type = {DataType.INT}, optional = true, defaultValue = "1 (Block when exhausted)"), @Parameter(name = HttpConstants.HOSTNAME_VERIFICATION_ENABLED, description = "Enable hostname verification.", type = {DataType.BOOL}, optional = true, defaultValue = "true")}, examples = {@Example(syntax = "@store(type='rdbms' , jdbc.url='jdbc:mysql://localhost:3306/production?useSSL=false', username='root', password='root', jdbc.driver.name='com.mysql.jdbc.Driver') \n@sink(type='websubhubeventpublisher', hub.id=\"anu_123\" , table.name='SessionTable',publisher.url=\"mysql://localhost:3306/production?useSSL=false\",\n@map(type='keyvalue',implicit.cast.enable='true'))\ndefine stream LowProductionAlertStream (topic string, payload string);", description = "Subscribed users will received the messages generated through the hub and will publish to the callback url when subscribe. ")}, systemParameter = {@SystemParameter(name = HttpConstants.CLIENT_BOOTSTRAP_CLIENT_GROUP_SIZE, description = "Number of client threads to perform non-blocking read and write to one or more channels.", defaultValue = "(Number of available processors) * 2", possibleParameters = {"Any positive integer"}), @SystemParameter(name = HttpConstants.CLIENT_BOOTSTRAP_BOSS_GROUP_SIZE, description = "Number of boss threads to accept incoming connections.", defaultValue = "Number of available processors", possibleParameters = {"Any positive integer"}), @SystemParameter(name = HttpConstants.CLIENT_BOOTSTRAP_WORKER_GROUP_SIZE, description = "Number of worker threads to accept the connections from boss threads and perform non-blocking read and write from one or more channels.", defaultValue = "(Number of available processors) * 2", possibleParameters = {"Any positive integer"}), @SystemParameter(name = HttpConstants.CLIENT_TRUSTSTORE_PATH, description = "The default truststore file path.", defaultValue = "`${carbon.home}/resources/security/client-truststore.jks`", possibleParameters = {"Path to client truststore `.jks` file"}), @SystemParameter(name = "trustStorePassword", description = "The default truststore password.", defaultValue = "wso2carbon", possibleParameters = {"Truststore password as string"})})
/* loaded from: input_file:io/siddhi/extension/io/http/sink/WebSubHubSink.class */
public class WebSubHubSink extends Sink {
    private static final Logger log = Logger.getLogger(WebSubHubSink.class);
    protected String streamID;
    protected String consumerKey;
    protected String consumerSecret;
    protected ClientConnector staticClientConnector;
    protected SiddhiAppContext siddhiAppContext;
    protected String tokenURL;
    protected SinkMetrics metrics;
    protected long startTime;
    protected long endTime;
    String mapType;
    Option httpHeaderOption;
    Option httpMethodOption;
    private String clientStoreFile;
    private String clientStorePass;
    private int socketIdleTimeout;
    private String sslProtocol;
    private String tlsStoreType;
    private String chunkDisabled;
    private String parametersList;
    private String clientBootstrapConfiguration;
    private ConfigReader configReader;
    private String hostnameVerificationEnabled;
    private String sslVerificationDisabled;
    private DefaultHttpWsConnectorFactory httpConnectorFactory;
    private ProxyServerConfiguration proxyServerConfiguration;
    private PoolConfiguration connectionPoolConfiguration;
    private String hubId;
    private Table subscriptionTable;
    private Map<String, List<WebSubSubscriptionDTO>> webSubSubscriptionMap;
    private ScheduledExecutorService scheduledExecutorService;
    private StreamDefinition outputStreamDefinition;
    private OnDemandQueryRuntime onDemandQueryRuntime;
    private long webSubSubscriptionMapUpdateTimeInterval;
    private final String[] outputColumns = {"callback", "topic", "secret", "lease_seconds", HttpConstants.REQUEST_TIMESTAMP};
    String[] mandatoryColumns = {"callback", "lease_seconds", "secret", "topic", "mode", HttpConstants.HUB_ID_COLUMN_NAME, HttpConstants.REQUEST_TIMESTAMP};
    private Executor executor = null;
    private List<WebSubSubscriptionDTO> expiredSubscriptions = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/siddhi/extension/io/http/sink/WebSubHubSink$HTTPWebSubResponseListener.class */
    public class HTTPWebSubResponseListener implements HttpConnectorListener {
        private final String publisherURL;
        Object payload;
        DynamicOptions dynamicOptions;
        WebSubHubSink httpSink;

        HTTPWebSubResponseListener(Object obj, DynamicOptions dynamicOptions, WebSubHubSink webSubHubSink, String str) {
            this.payload = obj;
            this.dynamicOptions = dynamicOptions;
            this.httpSink = webSubHubSink;
            this.publisherURL = str;
        }

        @Override // org.wso2.transport.http.netty.contract.HttpConnectorListener
        public void onMessage(final HttpCarbonMessage httpCarbonMessage) {
            WebSubHubSink.this.endTime = System.currentTimeMillis();
            if (WebSubHubSink.this.metrics != null) {
                WebSubHubSink.this.metrics.setEndpointStatusMetric(this.publisherURL, EndpointStatus.ONLINE);
                WebSubHubSink.this.metrics.setLatencyMetric(this.publisherURL, WebSubHubSink.this.endTime - WebSubHubSink.this.startTime);
                WebSubHubSink.this.metrics.setLastEventTime(this.publisherURL, WebSubHubSink.this.endTime);
                if (httpCarbonMessage.getHttpStatusCode().intValue() / 100 != 2) {
                    WebSubHubSink.this.metrics.getTotalHttpErrorsMetric(this.publisherURL).inc();
                }
            }
            if (WebSubHubSink.this.executor != null) {
                WebSubHubSink.this.executor.execute(new Runnable() { // from class: io.siddhi.extension.io.http.sink.WebSubHubSink.HTTPWebSubResponseListener.1
                    @Override // java.lang.Runnable
                    public void run() {
                        WebSubHubSink.this.getStringFromInputStream(new HttpMessageDataStreamer(httpCarbonMessage).getInputStream());
                    }
                });
            }
        }

        @Override // org.wso2.transport.http.netty.contract.HttpConnectorListener
        public void onError(Throwable th) {
            if (WebSubHubSink.this.metrics != null) {
                WebSubHubSink.this.metrics.getTotalHttpErrorsMetric(this.publisherURL).inc();
                WebSubHubSink.this.metrics.setEndpointStatusMetric(this.publisherURL, EndpointStatus.OFFLINE);
            }
            this.httpSink.onError(this.payload, this.dynamicOptions, new ConnectionUnavailableException("HTTP sink on stream " + this.httpSink.streamID + " of Siddhi App '" + this.httpSink.siddhiAppContext.getName() + "' failed to publish events to endpoint '" + this.publisherURL + "'. " + th.getMessage(), th));
        }
    }

    /* loaded from: input_file:io/siddhi/extension/io/http/sink/WebSubHubSink$SubscriptionMapUpdate.class */
    private class SubscriptionMapUpdate implements Runnable {
        boolean isStateCheck;
        OnDemandQueryRuntime onDemandQueryRuntime;
        String hubId;
        boolean initialExecution = true;

        SubscriptionMapUpdate(boolean z, OnDemandQueryRuntime onDemandQueryRuntime, String str) {
            this.isStateCheck = z;
            this.onDemandQueryRuntime = onDemandQueryRuntime;
            this.hubId = str;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (!this.isStateCheck) {
                updateSubscriptionMap();
                return;
            }
            boolean isWebSubSinkUpdated = HttpIoUtil.isWebSubSinkUpdated(this.hubId);
            if (isWebSubSinkUpdated || this.initialExecution) {
                WebSubHubSink.log.debug("Running SubscriptionMapUpdate task mode Status Check: " + this.isStateCheck + " status : " + isWebSubSinkUpdated);
                updateSubscriptionMap();
                this.initialExecution = false;
            }
        }

        public void updateSubscriptionMap() {
            HashMap hashMap = new HashMap();
            try {
                Event[] execute = this.onDemandQueryRuntime.execute();
                if (execute != null && execute.length > 0) {
                    for (Event event : execute) {
                        String obj = event.getData(1).toString();
                        List list = (List) hashMap.getOrDefault(obj, new ArrayList());
                        list.add(new WebSubSubscriptionDTO(this.hubId, event.getData(0), obj, event.getData(2), event.getData(3), event.getData(4)));
                        hashMap.put(obj, list);
                    }
                }
                WebSubHubSink.this.setWebSubSubscriptionMap(hashMap);
            } catch (Exception e) {
                WebSubHubSink.log.error("Error occurred while updating webSubSubscriptionMap ", e);
            }
        }
    }

    /* loaded from: input_file:io/siddhi/extension/io/http/sink/WebSubHubSink$SubscriptionTableCleanupTask.class */
    private class SubscriptionTableCleanupTask implements Runnable {
        CompiledCondition compiledCondition;

        SubscriptionTableCleanupTask(Table table, SiddhiAppContext siddhiAppContext) {
            HashMap hashMap = new HashMap();
            hashMap.put(table.getTableDefinition().getId(), table);
            this.compiledCondition = HttpIoUtil.createTableDeleteResource(hashMap, table.getTableDefinition().getId(), new SiddhiQueryContext(siddhiAppContext, siddhiAppContext.getName()));
        }

        @Override // java.lang.Runnable
        public void run() {
            ArrayList arrayList = new ArrayList();
            if (WebSubHubSink.this.expiredSubscriptions.size() > 0) {
                for (WebSubSubscriptionDTO webSubSubscriptionDTO : WebSubHubSink.this.expiredSubscriptions) {
                    WebSubHubSink.log.debug("Deleting record " + webSubSubscriptionDTO.getCallback() + " : " + webSubSubscriptionDTO.getTopic());
                    try {
                        WebSubHubSink.this.subscriptionTable.deleteEvents(generateComplexEvent(webSubSubscriptionDTO), this.compiledCondition, 1);
                        arrayList.add(webSubSubscriptionDTO);
                    } catch (Throwable th) {
                        WebSubHubSink.log.error("Error occurred while deleting expired subscription record from database table. callbackUrl :" + webSubSubscriptionDTO.getCallback() + ", Topic " + webSubSubscriptionDTO.getTopic(), th);
                        WebSubHubSink.this.expiredSubscriptions.remove(webSubSubscriptionDTO);
                    }
                }
                WebSubHubSink.log.info(arrayList.size() + "Expired subscriptions deleted successfully.");
                arrayList.forEach(webSubSubscriptionDTO2 -> {
                    WebSubHubSink.this.expiredSubscriptions.remove(webSubSubscriptionDTO2);
                });
            }
        }

        private ComplexEventChunk generateComplexEvent(WebSubSubscriptionDTO webSubSubscriptionDTO) {
            Object[] objArr = {webSubSubscriptionDTO.getCallback(), webSubSubscriptionDTO.getTopic()};
            StreamEvent streamEvent = new StreamEvent(0, 0, 2);
            StateEvent stateEvent = new StateEvent(1, 2);
            streamEvent.setOutputData(objArr);
            stateEvent.addEvent(0, streamEvent);
            stateEvent.setType(ComplexEvent.Type.CURRENT);
            ComplexEventChunk complexEventChunk = new ComplexEventChunk();
            complexEventChunk.add(stateEvent);
            return complexEventChunk;
        }
    }

    public Class[] getSupportedInputEventClasses() {
        return new Class[]{Map.class};
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public String[] getSupportedDynamicOptions() {
        return new String[]{HttpConstants.HEADERS, HttpConstants.METHOD, HttpConstants.PUBLISHER_URL, HttpConstants.RECEIVER_REFRESH_TOKEN};
    }

    protected StateFactory init(StreamDefinition streamDefinition, OptionHolder optionHolder, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.outputStreamDefinition = streamDefinition;
        this.configReader = configReader;
        this.siddhiAppContext = siddhiAppContext;
        this.streamID = siddhiAppContext.getName() + ":" + streamDefinition.toString();
        this.httpHeaderOption = optionHolder.getOrCreateOption(HttpConstants.HEADERS, HttpConstants.DEFAULT_HEADER);
        this.httpMethodOption = optionHolder.getOrCreateOption(HttpConstants.METHOD, "POST");
        this.consumerKey = optionHolder.validateAndGetStaticValue(HttpConstants.CONSUMER_KEY, "");
        this.consumerSecret = optionHolder.validateAndGetStaticValue(HttpConstants.CONSUMER_SECRET, "");
        this.tokenURL = optionHolder.validateAndGetStaticValue(HttpConstants.TOKEN_URL, "");
        this.clientStoreFile = optionHolder.validateAndGetStaticValue(HttpConstants.CLIENT_TRUSTSTORE_PATH_PARAM, HttpSinkUtil.trustStorePath(this.configReader));
        this.webSubSubscriptionMapUpdateTimeInterval = Long.parseLong(optionHolder.validateAndGetStaticValue(HttpConstants.WEB_SUB_SUBSCRIPTION_MAP_UPDATE_TIMESTAMP, HttpConstants.DEFAULT_MIN_IDLE_CONNECTIONS_PER_POOL));
        this.clientStorePass = optionHolder.validateAndGetStaticValue(HttpConstants.CLIENT_TRUSTSTORE_PASSWORD_PARAM, HttpSinkUtil.trustStorePassword(this.configReader));
        this.socketIdleTimeout = Integer.parseInt(optionHolder.validateAndGetStaticValue(HttpConstants.SOCKET_IDEAL_TIMEOUT, "-1"));
        this.sslProtocol = optionHolder.validateAndGetStaticValue(HttpConstants.SSL_PROTOCOL, "");
        this.tlsStoreType = optionHolder.validateAndGetStaticValue(HttpConstants.TLS_STORE_TYPE, "");
        this.chunkDisabled = optionHolder.validateAndGetStaticValue(HttpConstants.CLIENT_CHUNK_DISABLED, "");
        this.hubId = optionHolder.validateAndGetStaticValue(HttpConstants.HUB_ID);
        this.subscriptionTable = getSubscriptionTable(optionHolder.validateAndGetStaticValue(HttpConstants.WEB_SUB_SUBSCRIPTION_DATA_TABLE_KEY));
        this.scheduledExecutorService = new ScheduledThreadPoolExecutor(10);
        setOnDemandQueryRuntimeForFindSubscription();
        this.connectionPoolConfiguration = HttpSinkUtil.createPoolConfigurations(optionHolder);
        this.parametersList = optionHolder.validateAndGetStaticValue("parameters", "");
        this.clientBootstrapConfiguration = optionHolder.validateAndGetStaticValue(HttpConstants.CLIENT_BOOTSTRAP_CONFIGURATION, "");
        this.hostnameVerificationEnabled = optionHolder.validateAndGetStaticValue(HttpConstants.HOSTNAME_VERIFICATION_ENABLED, "true");
        this.sslVerificationDisabled = optionHolder.validateAndGetStaticValue(HttpConstants.SSL_VERIFICATION_DISABLED, "false");
        this.proxyServerConfiguration = HttpSinkUtil.createProxyServerConfiguration(optionHolder, this.streamID, siddhiAppContext.getName());
        this.httpConnectorFactory = HttpSinkUtil.createConnectorFactory(this.configReader);
        initMetrics(streamDefinition.getId());
        return null;
    }

    public void publish(Object obj, DynamicOptions dynamicOptions, State state) throws ConnectionUnavailableException {
        List<WebSubSubscriptionDTO> list;
        this.startTime = System.currentTimeMillis();
        HashMap hashMap = (HashMap) obj;
        Object remove = hashMap.remove("topic");
        String value = this.httpHeaderOption.getValue(dynamicOptions);
        List<Header> headers = HttpSinkUtil.getHeaders(value) != null ? HttpSinkUtil.getHeaders(value) : new ArrayList<>();
        if (remove == null || (list = this.webSubSubscriptionMap.get(remove.toString())) == null) {
            return;
        }
        for (WebSubSubscriptionDTO webSubSubscriptionDTO : list) {
            if (System.currentTimeMillis() < webSubSubscriptionDTO.getTimestamp() + (webSubSubscriptionDTO.getLeaseSeconds() * 1000)) {
                if (!webSubSubscriptionDTO.getSecret().equalsIgnoreCase("")) {
                    headers.add(new Header(HttpConstants.X_HUB_SIGNATURE, HttpConstants.SHA256_HASHING + getHexValue(hashMap.get("payload").toString())));
                }
                String callback = webSubSubscriptionDTO.getCallback();
                ClientConnector createClientConnector = createClientConnector(callback);
                if (this.mapType == null) {
                    this.mapType = getMapper().getType();
                }
                sendRequest(obj, dynamicOptions, headers, createClientConnector, callback);
            } else {
                log.debug("Added to expired subscription list " + webSubSubscriptionDTO.getCallback() + " : " + webSubSubscriptionDTO.getTopic());
                this.expiredSubscriptions.add(webSubSubscriptionDTO);
            }
        }
        Iterator<WebSubSubscriptionDTO> it = this.expiredSubscriptions.iterator();
        while (it.hasNext()) {
            list.remove(it.next());
        }
        this.webSubSubscriptionMap.replace(remove.toString(), list);
    }

    protected int sendRequest(Object obj, DynamicOptions dynamicOptions, List<Header> list, ClientConnector clientConnector, String str) throws ConnectionUnavailableException {
        String value = "".equals(this.httpMethodOption.getValue(dynamicOptions)) ? "POST" : this.httpMethodOption.getValue(dynamicOptions);
        String contentType = HttpSinkUtil.getContentType(this.mapType, list);
        String messageBody = getMessageBody(obj);
        HttpCarbonMessage generateCarbonMessage = generateCarbonMessage(list, contentType, value, new HttpCarbonMessage(new DefaultHttpRequest(HttpVersion.HTTP_1_1, new HttpMethod(value), "")), clientConnector.getHttpURLProperties());
        if (this.metrics != null) {
            this.metrics.getTotalWritesMetric().inc();
            this.metrics.getTotalHttpWritesMetric(str).inc();
            this.metrics.getRequestSizeMetric(str).inc(HttpSinkUtil.getByteSize(messageBody));
        }
        if (!"GET".equals(value)) {
            generateCarbonMessage.addHttpContent(new DefaultLastHttpContent(Unpooled.wrappedBuffer(messageBody.getBytes(Charset.defaultCharset()))));
        }
        generateCarbonMessage.completeMessage();
        clientConnector.send(generateCarbonMessage).setHttpConnectorListener(new HTTPWebSubResponseListener(obj, dynamicOptions, this, clientConnector.getPublisherURL()));
        return 200;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getStringFromInputStream(InputStream inputStream) {
        BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream);
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            Throwable th = null;
            while (true) {
                try {
                    try {
                        int read = bufferedInputStream.read();
                        if (read == -1) {
                            break;
                        }
                        byteArrayOutputStream.write(read);
                    } finally {
                    }
                } finally {
                }
            }
            String byteArrayOutputStream2 = byteArrayOutputStream.toString(StandardCharsets.UTF_8.toString());
            if (byteArrayOutputStream != null) {
                if (0 != 0) {
                    try {
                        byteArrayOutputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    byteArrayOutputStream.close();
                }
            }
            return byteArrayOutputStream2;
        } catch (IOException e) {
            log.error("Couldn't read the complete input stream due to: " + e.getMessage(), e);
            return "";
        }
    }

    public void connect() throws ConnectionUnavailableException {
        this.subscriptionTable.connectWithRetry();
        if (this.webSubSubscriptionMapUpdateTimeInterval != 0) {
            this.scheduledExecutorService.scheduleAtFixedRate(new SubscriptionMapUpdate(false, this.onDemandQueryRuntime, this.hubId), 0L, this.webSubSubscriptionMapUpdateTimeInterval, TimeUnit.SECONDS);
        } else {
            this.scheduledExecutorService.scheduleAtFixedRate(new SubscriptionMapUpdate(true, this.onDemandQueryRuntime, this.hubId), 0L, 1L, TimeUnit.SECONDS);
        }
        this.scheduledExecutorService.scheduleAtFixedRate(new SubscriptionTableCleanupTask(this.subscriptionTable, this.siddhiAppContext), 0L, 10L, TimeUnit.SECONDS);
    }

    public void disconnect() {
        if (this.staticClientConnector != null) {
            String publisherURL = this.staticClientConnector.getPublisherURL();
            this.staticClientConnector = null;
            log.debug("Server connector for url " + publisherURL + " disconnected.");
        }
        if (this.httpConnectorFactory != null) {
            this.httpConnectorFactory.shutdownNow();
            this.httpConnectorFactory = null;
        }
        this.scheduledExecutorService.shutdownNow();
    }

    public void destroy() {
        if (this.staticClientConnector != null) {
            String publisherURL = this.staticClientConnector.getPublisherURL();
            this.staticClientConnector = null;
            log.debug("Server connector for url " + publisherURL + " disconnected.");
        }
    }

    protected void initMetrics(String str) {
        if (MetricsDataHolder.getInstance().getMetricService() == null || !MetricsDataHolder.getInstance().getMetricManagementService().isEnabled()) {
            return;
        }
        try {
            if (MetricsDataHolder.getInstance().getMetricManagementService().isReporterRunning(HttpConstants.PROMETHEUS_REPORTER_NAME)) {
                this.metrics = new SinkMetrics(this.siddhiAppContext.getName(), str);
            }
        } catch (IllegalArgumentException e) {
            log.debug("Prometheus reporter is not running. Hence http sink metrics will not be initialized for " + this.siddhiAppContext.getName());
        }
    }

    HttpCarbonMessage generateCarbonMessage(List<Header> list, String str, String str2, HttpCarbonMessage httpCarbonMessage, Map<String, String> map) {
        httpCarbonMessage.setProperty("PROTOCOL", map.get("PROTOCOL"));
        httpCarbonMessage.setProperty("TO", map.get("TO"));
        httpCarbonMessage.setProperty(Constants.HTTP_HOST, map.get(Constants.HTTP_HOST));
        httpCarbonMessage.setProperty("port", Integer.valueOf(map.get("port")));
        httpCarbonMessage.setHttpMethod(str2);
        httpCarbonMessage.setRequestUrl(map.get("REQUEST_URL"));
        HttpHeaders headers = httpCarbonMessage.getHeaders();
        headers.set(Constants.HTTP_HOST, httpCarbonMessage.getProperty(Constants.HTTP_HOST));
        if (list != null) {
            for (Header header : list) {
                headers.set(header.getName(), (Object) header.getValue());
            }
        }
        if (str.contains(this.mapType)) {
            headers.set("Content-Type", (Object) str);
        }
        httpCarbonMessage.setHttpMethod(str2);
        return httpCarbonMessage;
    }

    String getMessageBody(Object obj) {
        return HttpConstants.MAP_KEYVALUE.equals(this.mapType) ? (String) ((HashMap) obj).entrySet().stream().map(entry -> {
            return encodeMessage(entry.getKey()) + "=" + encodeMessage(entry.getValue());
        }).reduce("", (str, str2) -> {
            return str + "&" + str2;
        }) : (String) obj;
    }

    public ClientConnector createClientConnector(String str) {
        String scheme = HttpSinkUtil.getScheme(str);
        Map<String, String> uRLProperties = HttpSinkUtil.getURLProperties(str);
        SenderConfiguration senderConfigurations = HttpSinkUtil.getSenderConfigurations(uRLProperties, this.clientStoreFile, this.clientStorePass, this.configReader);
        if ("".equals(str)) {
            throw new SiddhiAppCreationException("Receiver URL found empty but it is Mandatory field in http sink in " + this.streamID);
        }
        if ("https".equals(scheme) && (this.clientStoreFile == null || this.clientStorePass == null)) {
            throw new ExceptionInInitializerError("Client trustStore file path or password are empty while default scheme is 'https'. Please provide client trustStore file path and password in " + this.streamID);
        }
        if (this.proxyServerConfiguration != null) {
            senderConfigurations.setProxyServerConfiguration(this.proxyServerConfiguration);
        }
        senderConfigurations.setPoolConfiguration(this.connectionPoolConfiguration);
        if (this.socketIdleTimeout != -1) {
            senderConfigurations.setSocketIdleTimeout(this.socketIdleTimeout);
        }
        if (!"".equals(this.sslProtocol)) {
            senderConfigurations.setSSLProtocol(this.sslProtocol);
        }
        if (!"".equals(this.tlsStoreType)) {
            senderConfigurations.setTLSStoreType(this.tlsStoreType);
        }
        if (!"".equals(this.chunkDisabled) && this.chunkDisabled != null) {
            if (Boolean.parseBoolean(this.chunkDisabled)) {
                senderConfigurations.setChunkingConfig(ChunkConfig.NEVER);
            } else {
                senderConfigurations.setChunkingConfig(ChunkConfig.ALWAYS);
            }
        }
        if (!"".equals(this.parametersList)) {
            senderConfigurations.setParameters(HttpIoUtil.populateParameters(this.parametersList));
        }
        if (!"true".equalsIgnoreCase(this.hostnameVerificationEnabled)) {
            senderConfigurations.setHostNameVerificationEnabled(false);
        }
        if ("true".equalsIgnoreCase(this.sslVerificationDisabled)) {
            senderConfigurations.disableSsl();
        }
        this.executor = Executors.newFixedThreadPool(senderConfigurations.getPoolConfiguration().getExecutorServiceThreads());
        return new ClientConnector(str, uRLProperties, this.httpConnectorFactory.createHttpClientConnector(HttpSinkUtil.populateTransportConfiguration(this.clientBootstrapConfiguration), senderConfigurations));
    }

    private String encodeMessage(Object obj) {
        try {
            return URLEncoder.encode((String) obj, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw new SiddhiAppRuntimeException("Execution of Siddhi app " + this.siddhiAppContext.getName() + " failed due to " + e.getMessage(), e);
        }
    }

    private Table getSubscriptionTable(String str) {
        TableDefinition id = TableDefinition.id(str);
        List annotations = this.outputStreamDefinition.getAnnotations();
        for (String str2 : this.mandatoryColumns) {
            if (str2.equals("lease_seconds") || str2.equals(HttpConstants.REQUEST_TIMESTAMP)) {
                id.attribute(str2, Attribute.Type.LONG);
            } else {
                id.attribute(str2, Attribute.Type.STRING);
            }
        }
        annotations.forEach(annotation -> {
            if (annotation.getName().equalsIgnoreCase("Store")) {
                id.annotation(annotation);
            }
        });
        SiddhiAppRuntimeBuilder siddhiAppRuntimeBuilder = new SiddhiAppRuntimeBuilder(this.siddhiAppContext);
        siddhiAppRuntimeBuilder.defineTable(id);
        siddhiAppRuntimeBuilder.getTableDefinitionMap().put(str, id);
        return (Table) siddhiAppRuntimeBuilder.getTableMap().get(str);
    }

    private void setOnDemandQueryRuntimeForFindSubscription() {
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        hashMap.put(this.subscriptionTable.getTableDefinition().getId(), this.subscriptionTable);
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        for (String str : this.outputColumns) {
            arrayList.add(new OutputAttribute(new Variable(str)));
        }
        OnDemandQuery select = new OnDemandQuery().from(InputStore.store(this.subscriptionTable.getTableDefinition().getId()).on(new Compare(new StringConstant(this.hubId), Compare.Operator.EQUAL, new Variable(HttpConstants.HUB_ID_COLUMN_NAME)))).select(new Selector().addSelectionList(arrayList));
        select.setType(OnDemandQuery.OnDemandQueryType.FIND);
        this.onDemandQueryRuntime = OnDemandQueryParser.parse(select, (String) null, this.siddhiAppContext, hashMap, hashMap2, hashMap3);
    }

    public void setWebSubSubscriptionMap(Map<String, List<WebSubSubscriptionDTO>> map) {
        this.webSubSubscriptionMap = map;
    }

    public void removeFromExpiredSubscriptions(WebSubSubscriptionDTO webSubSubscriptionDTO) {
        this.expiredSubscriptions.remove(webSubSubscriptionDTO);
    }

    String getHexValue(String str) {
        return DigestUtils.sha256Hex(str);
    }
}
