package io.siddhi.extension.io.http.sink;

import io.netty.buffer.Unpooled;
import io.netty.handler.codec.base64.Base64;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.SystemParameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.Option;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.http.sink.exception.HttpSinkAdaptorRuntimeException;
import io.siddhi.extension.io.http.sink.updatetoken.AccessTokenCache;
import io.siddhi.extension.io.http.sink.updatetoken.DefaultListener;
import io.siddhi.extension.io.http.sink.updatetoken.HttpsClient;
import io.siddhi.extension.io.http.sink.util.HttpSinkUtil;
import io.siddhi.extension.io.http.util.HttpConstants;
import io.siddhi.extension.io.http.util.HttpIoUtil;
import io.siddhi.query.api.definition.StreamDefinition;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.wso2.carbon.messaging.Header;
import org.wso2.transport.http.netty.contract.Constants;
import org.wso2.transport.http.netty.contract.HttpConnectorListener;
import org.wso2.transport.http.netty.contract.config.ChunkConfig;
import org.wso2.transport.http.netty.contract.config.ProxyServerConfiguration;
import org.wso2.transport.http.netty.contract.config.SenderConfiguration;
import org.wso2.transport.http.netty.contractimpl.DefaultHttpWsConnectorFactory;
import org.wso2.transport.http.netty.contractimpl.sender.channel.pool.PoolConfiguration;
import org.wso2.transport.http.netty.message.HttpCarbonMessage;

@Extension(name = "http", namespace = "sink", description = "HTTP sink publishes messages via HTTP or HTTPS protocols using methods such as POST, GET, PUT, and DELETE on formats `text`, `XML` and `JSON`. It can also publish to endpoints protected by basic authentication or OAuth 2.0.", parameters = {@Parameter(name = HttpConstants.PUBLISHER_URL, description = "The URL to which the outgoing events should be published.\nExamples:\n`http://localhost:8080/endpoint`,\n`https://localhost:8080/endpoint`", type = {DataType.STRING}), @Parameter(name = HttpConstants.RECEIVER_USERNAME, description = "The username to be included in the authentication header when calling endpoints protected by basic authentication. `basic.auth.password` property should be also set when using this property.", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.RECEIVER_PASSWORD, description = "The password to be included in the authentication header when calling endpoints protected by basic authentication. `basic.auth.username` property should be also set when using this property.", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.CLIENT_TRUSTSTORE_PATH_PARAM, description = "The file path of the client truststore when sending messages through `https` protocol.", type = {DataType.STRING}, optional = true, defaultValue = "`${carbon.home}/resources/security/client-truststore.jks`"), @Parameter(name = HttpConstants.CLIENT_TRUSTSTORE_PASSWORD_PARAM, description = "The password for the client-truststore.", type = {DataType.STRING}, optional = true, defaultValue = "wso2carbon"), @Parameter(name = HttpConstants.RECEIVER_OAUTH_USERNAME, description = "The username to be included in the authentication header when calling endpoints protected by OAuth 2.0. `oauth.password` property should be also set when using this property.", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.RECEIVER_OAUTH_PASSWORD, description = "The password to be included in the authentication header when calling endpoints protected by OAuth 2.0. `oauth.username` property should be also set when using this property.", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.CONSUMER_KEY, description = "Consumer key used for calling endpoints protected by OAuth 2.0", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.CONSUMER_SECRET, description = "Consumer secret used for calling endpoints protected by OAuth 2.0", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.TOKEN_URL, description = "Token URL to generate a new access tokens when calling endpoints protected by OAuth 2.0", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.RECEIVER_REFRESH_TOKEN, description = "Refresh token used for generating new access tokens when calling endpoints protected by OAuth 2.0", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.HEADERS, description = "HTTP request headers in format `\"'<key>:<value>','<key>:<value>'\"`.\nWhen `Content-Type` header is not provided the system derives the Content-Type based on the provided sink mapper as following: \n - `@map(type='xml')`: `application/xml`\n - `@map(type='json')`: `application/json`\n - `@map(type='text')`: `plain/text`\n - `@map(type='keyvalue')`: `application/x-www-form-urlencoded`\n - For all other cases system defaults to `plain/text`\nAlso the `Content-Length` header need not to be provided, as the system automatically defines it by calculating the size of the payload.", type = {DataType.STRING}, optional = true, defaultValue = "Content-Type and Content-Length headers"), @Parameter(name = HttpConstants.METHOD, description = "The HTTP method used for calling the endpoint.", type = {DataType.STRING}, optional = true, defaultValue = "POST"), @Parameter(name = HttpConstants.SOCKET_IDEAL_TIMEOUT, description = "Socket timeout in millis.", type = {DataType.INT}, optional = true, defaultValue = "6000"), @Parameter(name = HttpConstants.CLIENT_CHUNK_DISABLED, description = "Disable chunked transfer encoding.", type = {DataType.BOOL}, optional = true, defaultValue = "false"), @Parameter(name = HttpConstants.SSL_PROTOCOL, description = "SSL/TLS protocol.", type = {DataType.STRING}, optional = true, defaultValue = Constants.TLS_PROTOCOL), @Parameter(name = HttpConstants.SSL_VERIFICATION_DISABLED, description = "Disable SSL verification.", type = {DataType.BOOL}, optional = true, defaultValue = "false"), @Parameter(name = HttpConstants.TLS_STORE_TYPE, description = "TLS store type.", type = {DataType.STRING}, optional = true, defaultValue = Constants.JKS), @Parameter(name = HttpConstants.SSS_CONFIGS, description = "SSL/TSL configurations in format `\"'<key>:<value>','<key>:<value>'\"`.\nSome supported parameters:\n - SSL/TLS protocols: `'sslEnabledProtocols:TLSv1.1,TLSv1.2'`\n - List of ciphers: `'ciphers:TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256'`\n - Enable session creation: `'client.enable.session.creation:true'`\n - Supported server names: `'server.suported.server.names:server'`\n - Add HTTP SNIMatcher: `'server.supported.snimatchers:SNIMatcher'`", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.PROXY_HOST, description = "Proxy server host", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.PROXY_PORT, description = "Proxy server port", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.PROXY_USERNAME, description = "Proxy server username", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.PROXY_PASSWORD, description = "Proxy server password", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.CLIENT_BOOTSTRAP_CONFIGURATION, description = "Client bootstrap configurations in format `\"'<key>:<value>','<key>:<value>'\"`.\nSome supported configurations :\n - Client connect timeout in millis: `'client.bootstrap.connect.timeout:15000'`\n - Client socket timeout in seconds: `'client.bootstrap.socket.timeout:15'`\n - Client socket reuse: `'client.bootstrap.socket.reuse:true'`\n - Enable TCP no delay: `'client.bootstrap.nodelay:true'`\n - Enable client keep alive: `'client.bootstrap.keepalive:true'`\n - Send buffer size: `'client.bootstrap.sendbuffersize:1048576'`\n - Receive buffer size: `'client.bootstrap.recievebuffersize:1048576'`", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.MAX_ACTIVE_CONNECTIONS_PER_POOL, description = "Maximum possible number of active connection per client pool.", type = {DataType.INT}, optional = true, defaultValue = "-1"), @Parameter(name = HttpConstants.MIN_IDLE_CONNECTIONS_PER_POOL, description = "Minimum number of idle connections that can exist per client pool.", type = {DataType.INT}, optional = true, defaultValue = HttpConstants.DEFAULT_MIN_IDLE_CONNECTIONS_PER_POOL), @Parameter(name = HttpConstants.MAX_IDLE_CONNECTIONS_PER_POOL, description = "Maximum number of idle connections that can exist per client pool.", type = {DataType.INT}, optional = true, defaultValue = HttpConstants.DEFAULT_MAX_IDLE_CONNECTIONS_PER_POOL), @Parameter(name = HttpConstants.MIN_EVICTABLE_IDLE_TIME, description = "Minimum time (in millis) a connection may sit idle in the client pool before it become eligible for eviction.", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.DEFAULT_MIN_EVICTABLE_IDLE_TIME), @Parameter(name = HttpConstants.TIME_BETWEEN_EVICTION_RUNS, description = "Time between two eviction operations (in millis) on the client pool.", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.DEFAULT_TIME_BETWEEN_EVICTION_RUNS), @Parameter(name = HttpConstants.MAX_WAIT_TIME, description = "The maximum time (in millis) the pool will wait (when there are no available connections) for a connection to be returned to the pool.", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.DEFAULT_MAX_WAIT_TIME), @Parameter(name = HttpConstants.TEST_ON_BORROW, description = "Enable connections to be validated before being borrowed from the client pool.", type = {DataType.BOOL}, optional = true, defaultValue = "true"), @Parameter(name = HttpConstants.TEST_WHILE_IDLE, description = "Enable connections to be validated during the eviction operation (if any).", type = {DataType.BOOL}, optional = true, defaultValue = "true"), @Parameter(name = HttpConstants.EXHAUSTED_ACTION, description = "Action that should be taken when the maximum number of active connections are being used. This action should be indicated as an int and possible action values are following.\n0 - Fail the request.\n1 - Block the request, until a connection returns to the pool.\n2 - Grow the connection pool size.", type = {DataType.INT}, optional = true, defaultValue = "1 (Block when exhausted)"), @Parameter(name = HttpConstants.HOSTNAME_VERIFICATION_ENABLED, description = "Enable hostname verification.", type = {DataType.BOOL}, optional = true, defaultValue = "true")}, examples = {@Example(syntax = "@sink(type = 'http', publisher.url = 'http://stocks.com/stocks',\n      @map(type = 'json'))\ndefine stream StockStream (symbol string, price float, volume long);", description = "Events arriving on the StockStream will be published to the HTTP endpoint `http://stocks.com/stocks` using `POST` method with Content-Type `application/json` by converting those events to the default JSON format as following:\n```{\n  \"event\": {\n    \"symbol\": \"FB\",\n    \"price\": 24.5,\n    \"volume\": 5000\n  }\n}```"), @Example(syntax = "@sink(type='http', publisher.url = 'http://localhost:8009/foo',\n      client.bootstrap.configurations = \"'client.bootstrap.socket.timeout:20'\",\n      max.pool.active.connections = '1', headers = \"{{headers}}\",\n      @map(type='xml', @payload(\"\"\"<stock>\n{{payloadBody}}\n</stock>\"\"\")))\ndefine stream FooStream (payloadBody String, headers string);", description = "Events arriving on FooStream will be published to the HTTP endpoint `http://localhost:8009/foo` using `POST` method with Content-Type `application/xml` and setting `payloadBody` and `header` attribute values.\nIf the `payloadBody` contains\n```<symbol>WSO2</symbol>\n<price>55.6</price>\n<volume>100</volume>```and `header` contains `'topic:foobar'` values, then the system will generate an output with the body:\n```<stock>\n<symbol>WSO2</symbol>\n<price>55.6</price>\n<volume>100</volume>\n</stock>```and HTTP headers:\n`Content-Length:xxx`,\n`Content-Location:'xxx'`,\n`Content-Type:'application/xml'`,\n`HTTP_METHOD:'POST'`")}, systemParameter = {@SystemParameter(name = HttpConstants.CLIENT_BOOTSTRAP_CLIENT_GROUP_SIZE, description = "Number of client threads to perform non-blocking read and write to one or more channels.", defaultValue = "(Number of available processors) * 2", possibleParameters = {"Any positive integer"}), @SystemParameter(name = HttpConstants.CLIENT_BOOTSTRAP_BOSS_GROUP_SIZE, description = "Number of boss threads to accept incoming connections.", defaultValue = "Number of available processors", possibleParameters = {"Any positive integer"}), @SystemParameter(name = HttpConstants.CLIENT_BOOTSTRAP_WORKER_GROUP_SIZE, description = "Number of worker threads to accept the connections from boss threads and perform non-blocking read and write from one or more channels.", defaultValue = "(Number of available processors) * 2", possibleParameters = {"Any positive integer"}), @SystemParameter(name = HttpConstants.CLIENT_TRUSTSTORE_PATH, description = "The default truststore file path.", defaultValue = "`${carbon.home}/resources/security/client-truststore.jks`", possibleParameters = {"Path to client truststore `.jks` file"}), @SystemParameter(name = "trustStorePassword", description = "The default truststore password.", defaultValue = "wso2carbon", possibleParameters = {"Truststore password as string"})})
/* loaded from: input_file:io/siddhi/extension/io/http/sink/HttpSink.class */
public class HttpSink extends Sink {
    private static final Logger log = Logger.getLogger(HttpSink.class);
    String mapType;
    Option httpHeaderOption;
    Option httpMethodOption;
    protected String streamID;
    protected String consumerKey;
    protected String consumerSecret;
    private String authorizationHeader;
    protected String userName;
    protected String userPassword;
    protected ClientConnector staticClientConnector;
    protected Option publisherURLOption;
    private String clientStoreFile;
    private String clientStorePass;
    private int socketIdleTimeout;
    private String sslProtocol;
    private String tlsStoreType;
    private String chunkDisabled;
    private String parametersList;
    private String clientBootstrapConfiguration;
    private ConfigReader configReader;
    protected SiddhiAppContext siddhiAppContext;
    protected String oauthUsername;
    protected String oauthUserPassword;
    private Option refreshToken;
    protected String authType;
    private AccessTokenCache accessTokenCache = AccessTokenCache.getInstance();
    protected String tokenURL;
    private String hostnameVerificationEnabled;
    private String sslVerificationDisabled;
    private DefaultHttpWsConnectorFactory httpConnectorFactory;
    private ProxyServerConfiguration proxyServerConfiguration;
    private PoolConfiguration connectionPoolConfiguration;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/siddhi/extension/io/http/sink/HttpSink$HTTPResponseListener.class */
    public static class HTTPResponseListener implements HttpConnectorListener {
        Object payload;
        DynamicOptions dynamicOptions;
        HttpSink httpSink;
        private String publisherURL;

        HTTPResponseListener(Object obj, DynamicOptions dynamicOptions, HttpSink httpSink, String str) {
            this.payload = obj;
            this.dynamicOptions = dynamicOptions;
            this.httpSink = httpSink;
            this.publisherURL = str;
        }

        @Override // org.wso2.transport.http.netty.contract.HttpConnectorListener
        public void onMessage(HttpCarbonMessage httpCarbonMessage) {
        }

        @Override // org.wso2.transport.http.netty.contract.HttpConnectorListener
        public void onError(Throwable th) {
            this.httpSink.onError(this.payload, this.dynamicOptions, new ConnectionUnavailableException("HTTP sink on stream " + this.httpSink.streamID + " of Siddhi App '" + this.httpSink.siddhiAppContext.getName() + "' failed to publish events to endpoint '" + this.publisherURL + "'. " + th.getMessage(), th));
        }
    }

    public Class[] getSupportedInputEventClasses() {
        return new Class[]{String.class, Map.class};
    }

    public String[] getSupportedDynamicOptions() {
        return new String[]{HttpConstants.HEADERS, HttpConstants.METHOD, HttpConstants.PUBLISHER_URL, HttpConstants.RECEIVER_REFRESH_TOKEN};
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StateFactory init(StreamDefinition streamDefinition, OptionHolder optionHolder, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.configReader = configReader;
        this.siddhiAppContext = siddhiAppContext;
        this.streamID = siddhiAppContext.getName() + ":" + streamDefinition.toString();
        this.publisherURLOption = optionHolder.validateAndGetOption(HttpConstants.PUBLISHER_URL);
        this.httpHeaderOption = optionHolder.getOrCreateOption(HttpConstants.HEADERS, HttpConstants.DEFAULT_HEADER);
        this.httpMethodOption = optionHolder.getOrCreateOption(HttpConstants.METHOD, "POST");
        this.consumerKey = optionHolder.validateAndGetStaticValue(HttpConstants.CONSUMER_KEY, "");
        this.consumerSecret = optionHolder.validateAndGetStaticValue(HttpConstants.CONSUMER_SECRET, "");
        this.oauthUsername = optionHolder.validateAndGetStaticValue(HttpConstants.RECEIVER_OAUTH_USERNAME, "");
        this.oauthUserPassword = optionHolder.validateAndGetStaticValue(HttpConstants.RECEIVER_OAUTH_PASSWORD, "");
        this.refreshToken = optionHolder.getOrCreateOption(HttpConstants.RECEIVER_REFRESH_TOKEN, "");
        this.tokenURL = optionHolder.validateAndGetStaticValue(HttpConstants.TOKEN_URL, "");
        this.clientStoreFile = optionHolder.validateAndGetStaticValue(HttpConstants.CLIENT_TRUSTSTORE_PATH_PARAM, HttpSinkUtil.trustStorePath(configReader));
        this.clientStorePass = optionHolder.validateAndGetStaticValue(HttpConstants.CLIENT_TRUSTSTORE_PASSWORD_PARAM, HttpSinkUtil.trustStorePassword(configReader));
        this.socketIdleTimeout = Integer.parseInt(optionHolder.validateAndGetStaticValue(HttpConstants.SOCKET_IDEAL_TIMEOUT, "-1"));
        this.sslProtocol = optionHolder.validateAndGetStaticValue(HttpConstants.SSL_PROTOCOL, "");
        this.tlsStoreType = optionHolder.validateAndGetStaticValue(HttpConstants.TLS_STORE_TYPE, "");
        this.chunkDisabled = optionHolder.validateAndGetStaticValue(HttpConstants.CLIENT_CHUNK_DISABLED, "");
        this.connectionPoolConfiguration = HttpSinkUtil.createPoolConfigurations(optionHolder);
        this.parametersList = optionHolder.validateAndGetStaticValue("parameters", "");
        this.clientBootstrapConfiguration = optionHolder.validateAndGetStaticValue(HttpConstants.CLIENT_BOOTSTRAP_CONFIGURATION, "");
        this.hostnameVerificationEnabled = optionHolder.validateAndGetStaticValue(HttpConstants.HOSTNAME_VERIFICATION_ENABLED, "true");
        this.sslVerificationDisabled = optionHolder.validateAndGetStaticValue(HttpConstants.SSL_VERIFICATION_DISABLED, "false");
        this.userName = optionHolder.validateAndGetStaticValue(HttpConstants.RECEIVER_USERNAME, "");
        this.userPassword = optionHolder.validateAndGetStaticValue(HttpConstants.RECEIVER_PASSWORD, "");
        if (!"".equals(this.userName) && !"".equals(this.userPassword)) {
            this.authType = HttpConstants.BASIC_AUTH;
        } else if (("".equals(this.consumerKey) || "".equals(this.consumerSecret)) && ("".equals(this.oauthUsername) || "".equals(this.oauthUserPassword))) {
            this.authType = HttpConstants.NO_AUTH;
        } else {
            this.authType = HttpConstants.OAUTH;
        }
        if ("".equals(this.userName) ^ "".equals(this.userPassword)) {
            throw new SiddhiAppCreationException("Please provide user name and password in http sink with the stream " + this.streamID + " in Siddhi app " + siddhiAppContext.getName());
        }
        if (!"".equals(this.userName)) {
            this.authorizationHeader = HttpConstants.AUTHORIZATION_METHOD + Base64.encode(Unpooled.copiedBuffer((this.userName + ":" + this.userPassword).getBytes(Charset.defaultCharset())));
        }
        this.proxyServerConfiguration = HttpSinkUtil.createProxyServerConfiguration(optionHolder, this.streamID, siddhiAppContext.getName());
        this.httpConnectorFactory = HttpSinkUtil.createConnectorFactory(configReader);
        if (!this.publisherURLOption.isStatic()) {
            return null;
        }
        this.staticClientConnector = createClientConnector(null);
        return null;
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public void publish(Object obj, DynamicOptions dynamicOptions, State state) throws ConnectionUnavailableException {
        List<Header> headers = HttpSinkUtil.getHeaders(this.httpHeaderOption.getValue(dynamicOptions));
        ClientConnector createClientConnector = this.staticClientConnector != null ? this.staticClientConnector : createClientConnector(dynamicOptions);
        if (this.mapType == null) {
            this.mapType = getMapper().getType();
        }
        if (this.authType.equals(HttpConstants.BASIC_AUTH) || this.authType.equals(HttpConstants.NO_AUTH)) {
            sendRequest(obj, dynamicOptions, headers, createClientConnector);
        } else {
            sendOauthRequest(obj, dynamicOptions, headers, createClientConnector);
        }
    }

    protected int sendRequest(Object obj, DynamicOptions dynamicOptions, List<Header> list, ClientConnector clientConnector) throws ConnectionUnavailableException {
        String value = "".equals(this.httpMethodOption.getValue(dynamicOptions)) ? "POST" : this.httpMethodOption.getValue(dynamicOptions);
        String contentType = HttpSinkUtil.getContentType(this.mapType, list);
        String messageBody = getMessageBody(obj);
        HttpCarbonMessage generateCarbonMessage = generateCarbonMessage(list, contentType, value, new HttpCarbonMessage(new DefaultHttpRequest(HttpVersion.HTTP_1_1, new HttpMethod(value), "")), clientConnector.getHttpURLProperties());
        if (!Constants.HTTP_GET_METHOD.equals(value)) {
            generateCarbonMessage.addHttpContent(new DefaultLastHttpContent(Unpooled.wrappedBuffer(messageBody.getBytes(Charset.defaultCharset()))));
        }
        generateCarbonMessage.completeMessage();
        if (!HttpConstants.OAUTH.equals(this.authType)) {
            clientConnector.send(generateCarbonMessage).setHttpConnectorListener(new HTTPResponseListener(obj, dynamicOptions, this, clientConnector.getPublisherURL()));
            return 200;
        }
        CountDownLatch countDownLatch = new CountDownLatch(1);
        DefaultListener defaultListener = new DefaultListener(countDownLatch, this.authType);
        clientConnector.send(generateCarbonMessage).setHttpConnectorListener(defaultListener);
        try {
            if (countDownLatch.await(30L, TimeUnit.SECONDS)) {
                return defaultListener.getHttpResponseMessage().getNettyHttpResponse().status().code();
            }
            log.debug("Time out due to getting getting response from " + clientConnector.getPublisherURL() + ". Message dropped.");
            throw new ConnectionUnavailableException("Time out due to getting getting response from " + clientConnector.getPublisherURL() + ". Message dropped.");
        } catch (InterruptedException e) {
            log.debug("Failed to get a response from " + clientConnector.getPublisherURL() + "," + e + ". Message dropped.");
            throw new ConnectionUnavailableException("Failed to get a response from " + clientConnector.getPublisherURL() + ", " + e + ". Message dropped.");
        }
    }

    protected void sendOauthRequest(Object obj, DynamicOptions dynamicOptions, List<Header> list, ClientConnector clientConnector) throws ConnectionUnavailableException {
        String str = HttpConstants.AUTHORIZATION_METHOD + encodeBase64(this.consumerKey + ":" + this.consumerSecret).replaceAll(HttpConstants.NEW_LINE, "");
        setAccessToken(str, dynamicOptions, list, clientConnector.getPublisherURL());
        int sendRequest = sendRequest(obj, dynamicOptions, list, clientConnector);
        if (sendRequest == 401) {
            handleOAuthFailure(obj, dynamicOptions, list, str, clientConnector);
            return;
        }
        if (200 <= sendRequest && sendRequest < 300) {
            log.info("Request sent successfully to " + clientConnector.getPublisherURL());
        } else {
            if (sendRequest == 500) {
                log.error("Error at sending oauth request to API endpoint " + clientConnector.getPublisherURL() + "', with response code: " + sendRequest + "- Internal server error. Message dropped");
                throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint, " + clientConnector.getPublisherURL() + "', with response code: " + sendRequest + "- Internal server error. Message dropped.");
            }
            log.error("Error at sending oauth request to API endpoint " + clientConnector.getPublisherURL() + "', with response code: " + sendRequest + ". Message dropped.");
            throw new ConnectionUnavailableException("Error at sending oauth request to API endpoint " + clientConnector.getPublisherURL() + "', and response code: " + sendRequest + ". Message dropped.");
        }
    }

    private void handleOAuthFailure(Object obj, DynamicOptions dynamicOptions, List<Header> list, String str, ClientConnector clientConnector) throws ConnectionUnavailableException {
        if (this.accessTokenCache.checkAvailableKey(str)) {
            getNewAccessTokenWithCache(obj, dynamicOptions, list, str, clientConnector);
        } else {
            requestForNewAccessToken(obj, dynamicOptions, list, str, clientConnector);
        }
    }

    private void getNewAccessTokenWithCache(Object obj, DynamicOptions dynamicOptions, List<Header> list, String str, ClientConnector clientConnector) throws ConnectionUnavailableException {
        String accessToken = this.accessTokenCache.getAccessToken(str);
        Iterator<Header> it = list.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Header next = it.next();
            if (next.getName().equals("Authorization")) {
                next.setValue(accessToken);
                break;
            }
        }
        int sendRequest = sendRequest(obj, dynamicOptions, list, clientConnector);
        if (sendRequest == 200) {
            log.info("Request sent successfully to " + clientConnector.getPublisherURL());
            return;
        }
        if (sendRequest == 401) {
            requestForNewAccessToken(obj, dynamicOptions, list, str, clientConnector);
        } else {
            if (sendRequest == 500) {
                log.error("Error at sending oauth request to API endpoint, " + clientConnector.getPublisherURL() + "', with response code: " + sendRequest + "- Internal server error. Message dropped");
                throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint, " + clientConnector.getPublisherURL() + "', with response code: " + sendRequest + "- Internal server error. Message dropped");
            }
            log.error("Error at sending oauth request to API endpoint " + clientConnector.getPublisherURL() + "', with response code: " + sendRequest + ". Message dropped.");
            throw new ConnectionUnavailableException("Error at sending oauth request to API endpoint " + clientConnector.getPublisherURL() + "', with response code: " + sendRequest + ". Message dropped.");
        }
    }

    private void requestForNewAccessToken(Object obj, DynamicOptions dynamicOptions, List<Header> list, String str, ClientConnector clientConnector) throws ConnectionUnavailableException {
        if (Boolean.valueOf(this.accessTokenCache.checkRefreshAvailableKey(str)).booleanValue()) {
            Iterator<Header> it = list.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Header next = it.next();
                if (next.getName().equals(HttpConstants.RECEIVER_REFRESH_TOKEN)) {
                    if (this.accessTokenCache.getRefreshtoken(str) != null) {
                        next.setValue(this.accessTokenCache.getRefreshtoken(str));
                    }
                }
            }
        }
        getAccessToken(dynamicOptions, str, this.tokenURL);
        if (this.accessTokenCache.getResponseCode(str) != 200) {
            if (this.accessTokenCache.getResponseCode(str) == 401) {
                log.error("Failed to generate new access token for the expired access token to " + clientConnector.getPublisherURL() + "', " + this.accessTokenCache.getResponseCode(str) + ": Authentication Failure.cPlease provide a valid Consumer key, Consumer secret and token endpoint URL . Message dropped");
                throw new HttpSinkAdaptorRuntimeException("Failed to generate new access token for the expired access token to " + clientConnector.getPublisherURL() + "', " + this.accessTokenCache.getResponseCode(str) + ": Authentication Failure.Please provide a valid Consumer key, Consumer secret and token endpoint URL . Message dropped");
            }
            log.error("Failed to generate new access token for the expired access token. Error code: " + this.accessTokenCache.getResponseCode(str) + ". Message dropped.");
            throw new ConnectionUnavailableException("Failed to generate new access token for the expired access token. Error code: " + this.accessTokenCache.getResponseCode(str) + ". Message dropped.");
        }
        String accessToken = this.accessTokenCache.getAccessToken(str);
        this.accessTokenCache.setAccessToken(str, accessToken);
        if (this.accessTokenCache.getRefreshtoken(str) != null) {
            this.accessTokenCache.setRefreshtoken(str, this.accessTokenCache.getRefreshtoken(str));
        }
        Iterator<Header> it2 = list.iterator();
        while (true) {
            if (!it2.hasNext()) {
                break;
            }
            Header next2 = it2.next();
            if (next2.getName().equals("Authorization")) {
                next2.setValue(accessToken);
                break;
            }
        }
        int sendRequest = sendRequest(obj, dynamicOptions, list, clientConnector);
        if (sendRequest == 200) {
            log.info("Request sent successfully to " + clientConnector.getPublisherURL());
            return;
        }
        if (sendRequest == 401) {
            log.error("Error at sending oauth request to API endpoint " + clientConnector.getPublisherURL() + "', with response code: " + sendRequest + "- Authentication Failure. Please provide a valid Consumer key, Consumer secret and token endpoint URL . Message dropped");
            throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint " + clientConnector.getPublisherURL() + "', with response code: " + sendRequest + "- Authentication Failure. Please provide a valid Consumer key, Consumer secret and token endpoint URL. Message dropped");
        }
        if (sendRequest == 500) {
            log.error("Error at sending oauth request to API endpoint " + clientConnector.getPublisherURL() + "', with response code: " + sendRequest + "- Internal server error. Message dropped");
            throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint " + clientConnector.getPublisherURL() + "', with response code: " + sendRequest + "- Internal server error. Message dropped");
        }
        log.error("Error at sending oauth request to API endpoint " + clientConnector.getPublisherURL() + "', with response code: " + sendRequest + ". Message dropped.");
        throw new ConnectionUnavailableException("Error at sending oauth request to API endpoint " + clientConnector.getPublisherURL() + "', with response code: " + sendRequest + ". Message dropped.");
    }

    void getAccessToken(DynamicOptions dynamicOptions, String str, String str2) {
        this.tokenURL = str2;
        HttpsClient httpsClient = new HttpsClient();
        if (!"".equals(this.oauthUsername) && !"".equals(this.oauthUserPassword)) {
            httpsClient.getPasswordGrantAccessToken(str2, this.clientStoreFile, this.clientStorePass, this.oauthUsername, this.oauthUserPassword, str);
        } else if ("".equals(this.refreshToken.getValue(dynamicOptions)) && this.accessTokenCache.getRefreshtoken(str) == null) {
            httpsClient.getClientGrantAccessToken(str2, this.clientStoreFile, this.clientStorePass, str);
        } else {
            httpsClient.getRefreshGrantAccessToken(str2, this.clientStoreFile, this.clientStorePass, str, this.refreshToken.getValue(dynamicOptions));
        }
    }

    void setAccessToken(String str, DynamicOptions dynamicOptions, List<Header> list, String str2) throws ConnectionUnavailableException {
        boolean z = false;
        Iterator<Header> it = list.iterator();
        while (true) {
            if (it.hasNext()) {
                if (it.next().getName().equals("Authorization")) {
                    z = true;
                    break;
                }
            } else {
                break;
            }
        }
        if (z) {
            if (this.accessTokenCache.checkAvailableKey(str)) {
                String accessToken = this.accessTokenCache.getAccessToken(str);
                for (Header header : list) {
                    if (header.getName().equals("Authorization")) {
                        header.setValue(accessToken);
                        return;
                    }
                }
                return;
            }
            return;
        }
        getAccessToken(dynamicOptions, str, this.tokenURL);
        if (this.accessTokenCache.getResponseCode(str) == 200) {
            list.add(new Header("Authorization", this.accessTokenCache.getAccessToken(str)));
            if (this.accessTokenCache.getRefreshtoken(str) != null) {
                list.add(new Header(HttpConstants.RECEIVER_REFRESH_TOKEN, this.accessTokenCache.getRefreshtoken(str)));
                return;
            }
            return;
        }
        if (this.accessTokenCache.getResponseCode(str) == 401) {
            log.error("Failed to generate new access token for the expired access token to " + str2 + "', with response code: " + this.accessTokenCache.getResponseCode(str) + "- Authentication Failure.Please provide a valid Consumer key, Consumer secret and token endpoint URL . Message dropped");
            throw new HttpSinkAdaptorRuntimeException("Failed to generate new access token for the expired access token to " + str2 + "', with response code: " + this.accessTokenCache.getResponseCode(str) + "- Authentication Failure.Please provide a valid Consumer key, Consumer secret and token endpoint URL . Message dropped");
        }
        if (this.accessTokenCache.getResponseCode(str) == 500) {
            log.error("Failed to generate new access token for the expired access token to " + str2 + "', with response code: " + this.accessTokenCache.getResponseCode(str) + "- Internal server error. Message dropped");
            throw new HttpSinkAdaptorRuntimeException("Failed to generate new access token for the expired access token to " + str2 + "', with response code: " + this.accessTokenCache.getResponseCode(str) + "- Internal server error. Message dropped");
        }
        log.error("Failed to generate new access token for the expired access token. Error code: " + this.accessTokenCache.getResponseCode(str) + ". Message dropped.");
        throw new ConnectionUnavailableException("Failed to generate new access token for the expired access token. Error code: " + this.accessTokenCache.getResponseCode(str) + ". Message dropped.");
    }

    public void connect() {
    }

    public void disconnect() {
        if (this.staticClientConnector != null) {
            String publisherURL = this.staticClientConnector.getPublisherURL();
            this.staticClientConnector = null;
            log.debug("Server connector for url " + publisherURL + " disconnected.");
        }
        if (this.httpConnectorFactory != null) {
            this.httpConnectorFactory.shutdownNow();
            this.httpConnectorFactory = null;
        }
    }

    public void destroy() {
        if (this.staticClientConnector != null) {
            String publisherURL = this.staticClientConnector.getPublisherURL();
            this.staticClientConnector = null;
            log.debug("Server connector for url " + publisherURL + " disconnected.");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public HttpCarbonMessage generateCarbonMessage(List<Header> list, String str, String str2, HttpCarbonMessage httpCarbonMessage, Map<String, String> map) {
        httpCarbonMessage.setProperty("PROTOCOL", map.get("PROTOCOL"));
        httpCarbonMessage.setProperty("TO", map.get("TO"));
        httpCarbonMessage.setProperty(Constants.HTTP_HOST, map.get(Constants.HTTP_HOST));
        httpCarbonMessage.setProperty("port", Integer.valueOf(map.get("port")));
        httpCarbonMessage.setHttpMethod(str2);
        httpCarbonMessage.setRequestUrl(map.get("REQUEST_URL"));
        HttpHeaders headers = httpCarbonMessage.getHeaders();
        if (!this.userName.equals("") && !this.userPassword.equals("")) {
            headers.set("Authorization", (Object) this.authorizationHeader);
        } else if (!this.userName.equals("") || !this.userPassword.equals("")) {
            log.error("One of the basic authentication username or password missing. Hence basic authentication not supported.");
        }
        headers.set(Constants.HTTP_HOST, httpCarbonMessage.getProperty(Constants.HTTP_HOST));
        if (list != null) {
            for (Header header : list) {
                headers.set(header.getName(), (Object) header.getValue());
            }
        }
        if (str.contains(this.mapType)) {
            headers.set("Content-Type", (Object) str);
        }
        httpCarbonMessage.setHttpMethod(str2);
        return httpCarbonMessage;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getMessageBody(Object obj) {
        return HttpConstants.MAP_KEYVALUE.equals(this.mapType) ? (String) ((HashMap) obj).entrySet().stream().map(entry -> {
            return encodeMessage(entry.getKey()) + "=" + encodeMessage(entry.getValue());
        }).reduce("", (str, str2) -> {
            return str + "&" + str2;
        }) : (String) obj;
    }

    public ClientConnector createClientConnector(DynamicOptions dynamicOptions) {
        String value = this.publisherURLOption.isStatic() ? this.publisherURLOption.getValue() : this.publisherURLOption.getValue(dynamicOptions);
        if (this.authType.equals(HttpConstants.OAUTH)) {
            if ("".equals(this.consumerSecret) || "".equals(this.consumerKey)) {
                throw new SiddhiAppCreationException("consumer.key and consumer.secret found empty but it is Mandatory field in http sink in " + this.streamID);
            }
            if ("".equals(this.tokenURL)) {
                throw new SiddhiAppCreationException("token.url found empty but it is Mandatory field in http sink in " + this.streamID);
            }
        }
        String scheme = HttpSinkUtil.getScheme(value);
        Map<String, String> uRLProperties = HttpSinkUtil.getURLProperties(value);
        SenderConfiguration senderConfigurations = HttpSinkUtil.getSenderConfigurations(uRLProperties, this.clientStoreFile, this.clientStorePass, this.configReader);
        if ("".equals(value)) {
            throw new SiddhiAppCreationException("Receiver URL found empty but it is Mandatory field in http sink in " + this.streamID);
        }
        if ("https".equals(scheme) && (this.clientStoreFile == null || this.clientStorePass == null)) {
            throw new ExceptionInInitializerError("Client trustStore file path or password are empty while default scheme is 'https'. Please provide client trustStore file path and password in " + this.streamID);
        }
        if (this.proxyServerConfiguration != null) {
            senderConfigurations.setProxyServerConfiguration(this.proxyServerConfiguration);
        }
        senderConfigurations.setPoolConfiguration(this.connectionPoolConfiguration);
        if (this.socketIdleTimeout != -1) {
            senderConfigurations.setSocketIdleTimeout(this.socketIdleTimeout);
        }
        if (!"".equals(this.sslProtocol)) {
            senderConfigurations.setSSLProtocol(this.sslProtocol);
        }
        if (!"".equals(this.tlsStoreType)) {
            senderConfigurations.setTLSStoreType(this.tlsStoreType);
        }
        if (!"".equals(this.chunkDisabled) && this.chunkDisabled != null) {
            if (Boolean.parseBoolean(this.chunkDisabled)) {
                senderConfigurations.setChunkingConfig(ChunkConfig.NEVER);
            } else {
                senderConfigurations.setChunkingConfig(ChunkConfig.ALWAYS);
            }
        }
        if (!"".equals(this.parametersList)) {
            senderConfigurations.setParameters(HttpIoUtil.populateParameters(this.parametersList));
        }
        if (!"true".equalsIgnoreCase(this.hostnameVerificationEnabled)) {
            senderConfigurations.setHostNameVerificationEnabled(false);
        }
        if ("true".equalsIgnoreCase(this.sslVerificationDisabled)) {
            senderConfigurations.disableSsl();
        }
        return new ClientConnector(value, uRLProperties, this.httpConnectorFactory.createHttpClientConnector(HttpSinkUtil.populateTransportConfiguration(this.clientBootstrapConfiguration), senderConfigurations));
    }

    private String encodeMessage(Object obj) {
        try {
            return URLEncoder.encode((String) obj, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw new SiddhiAppRuntimeException("Execution of Siddhi app " + this.siddhiAppContext.getName() + " failed due to " + e.getMessage(), e);
        }
    }

    private String encodeBase64(String str) {
        return Base64.encode(Unpooled.wrappedBuffer(str.getBytes(StandardCharsets.UTF_8))).toString(StandardCharsets.UTF_8);
    }
}
