/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.http.sink;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.base64.Base64;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.SystemParameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.Option;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.http.metrics.EndpointStatus;
import io.siddhi.extension.io.http.metrics.SinkMetrics;
import io.siddhi.extension.io.http.sink.ClientConnector;
import io.siddhi.extension.io.http.sink.exception.HttpSinkAdaptorRuntimeException;
import io.siddhi.extension.io.http.sink.updatetoken.AccessTokenCache;
import io.siddhi.extension.io.http.sink.updatetoken.DefaultListener;
import io.siddhi.extension.io.http.sink.updatetoken.HttpsClient;
import io.siddhi.extension.io.http.sink.util.HttpSinkUtil;
import io.siddhi.extension.io.http.util.HttpConstants;
import io.siddhi.extension.io.http.util.HttpIoUtil;
import io.siddhi.query.api.definition.StreamDefinition;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.wso2.carbon.messaging.Header;
import org.wso2.carbon.si.metrics.core.internal.MetricsDataHolder;
import org.wso2.transport.http.netty.contract.HttpConnectorListener;
import org.wso2.transport.http.netty.contract.HttpResponseFuture;
import org.wso2.transport.http.netty.contract.config.ChunkConfig;
import org.wso2.transport.http.netty.contract.config.ProxyServerConfiguration;
import org.wso2.transport.http.netty.contract.config.SenderConfiguration;
import org.wso2.transport.http.netty.contractimpl.DefaultHttpWsConnectorFactory;
import org.wso2.transport.http.netty.contractimpl.sender.channel.pool.PoolConfiguration;
import org.wso2.transport.http.netty.message.HttpCarbonMessage;
import org.wso2.transport.http.netty.message.HttpMessageDataStreamer;

@Extension(name="http", namespace="sink", description="HTTP sink publishes messages via HTTP or HTTPS protocols using methods such as POST, GET, PUT, and DELETE on formats `text`, `XML` and `JSON`. It can also publish to endpoints protected by basic authentication or OAuth 2.0.", parameters={@Parameter(name="publisher.url", description="The URL to which the outgoing events should be published.\nExamples:\n`http://localhost:8080/endpoint`,\n`https://localhost:8080/endpoint`", type={DataType.STRING}), @Parameter(name="basic.auth.username", description="The username to be included in the authentication header when calling endpoints protected by basic authentication. `basic.auth.password` property should be also set when using this property.", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="basic.auth.password", description="The password to be included in the authentication header when calling endpoints protected by basic authentication. `basic.auth.username` property should be also set when using this property.", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="https.truststore.file", description="The file path of the client truststore when sending messages through `https` protocol.", type={DataType.STRING}, optional=true, defaultValue="`${carbon.home}/resources/security/client-truststore.jks`"), @Parameter(name="https.truststore.password", description="The password for the client-truststore.", type={DataType.STRING}, optional=true, defaultValue="wso2carbon"), @Parameter(name="oauth.username", description="The username to be included in the authentication header when calling endpoints protected by OAuth 2.0. `oauth.password` property should be also set when using this property.", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="oauth.password", description="The password to be included in the authentication header when calling endpoints protected by OAuth 2.0. `oauth.username` property should be also set when using this property.", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="consumer.key", description="Consumer key used for calling endpoints protected by OAuth 2.0", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="consumer.secret", description="Consumer secret used for calling endpoints protected by OAuth 2.0", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="token.url", description="Token URL to generate a new access tokens when calling endpoints protected by OAuth 2.0", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="refresh.token", description="Refresh token used for generating new access tokens when calling endpoints protected by OAuth 2.0", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="headers", description="HTTP request headers in format `\"'<key>:<value>','<key>:<value>'\"`.\nWhen `Content-Type` header is not provided the system derives the Content-Type based on the provided sink mapper as following: \n - `@map(type='xml')`: `application/xml`\n - `@map(type='json')`: `application/json`\n - `@map(type='text')`: `plain/text`\n - `@map(type='keyvalue')`: `application/x-www-form-urlencoded`\n - For all other cases system defaults to `plain/text`\nAlso the `Content-Length` header need not to be provided, as the system automatically defines it by calculating the size of the payload.", type={DataType.STRING}, optional=true, defaultValue="Content-Type and Content-Length headers"), @Parameter(name="method", description="The HTTP method used for calling the endpoint.", type={DataType.STRING}, optional=true, defaultValue="POST"), @Parameter(name="socket.idle.timeout", description="Socket timeout in millis.", type={DataType.INT}, optional=true, defaultValue="6000"), @Parameter(name="chunk.disabled", description="Disable chunked transfer encoding.", type={DataType.BOOL}, optional=true, defaultValue="false"), @Parameter(name="ssl.protocol", description="SSL/TLS protocol.", type={DataType.STRING}, optional=true, defaultValue="TLS"), @Parameter(name="ssl.verification.disabled", description="Disable SSL verification.", type={DataType.BOOL}, optional=true, defaultValue="false"), @Parameter(name="tls.store.type", description="TLS store type.", type={DataType.STRING}, optional=true, defaultValue="JKS"), @Parameter(name="ssl.configurations", description="SSL/TSL configurations in format `\"'<key>:<value>','<key>:<value>'\"`.\nSome supported parameters:\n - SSL/TLS protocols: `'sslEnabledProtocols:TLSv1.1,TLSv1.2'`\n - List of ciphers: `'ciphers:TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256'`\n - Enable session creation: `'client.enable.session.creation:true'`\n - Supported server names: `'server.suported.server.names:server'`\n - Add HTTP SNIMatcher: `'server.supported.snimatchers:SNIMatcher'`", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="proxy.host", description="Proxy server host", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="proxy.port", description="Proxy server port", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="proxy.username", description="Proxy server username", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="proxy.password", description="Proxy server password", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="client.bootstrap.configurations", description="Client bootstrap configurations in format `\"'<key>:<value>','<key>:<value>'\"`.\nSome supported configurations :\n - Client connect timeout in millis: `'client.bootstrap.connect.timeout:15000'`\n - Client socket timeout in seconds: `'client.bootstrap.socket.timeout:15'`\n - Client socket reuse: `'client.bootstrap.socket.reuse:true'`\n - Enable TCP no delay: `'client.bootstrap.nodelay:true'`\n - Enable client keep alive: `'client.bootstrap.keepalive:true'`\n - Send buffer size: `'client.bootstrap.sendbuffersize:1048576'`\n - Receive buffer size: `'client.bootstrap.recievebuffersize:1048576'`", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="max.pool.active.connections", description="Maximum possible number of active connection per client pool.", type={DataType.INT}, optional=true, defaultValue="-1"), @Parameter(name="min.pool.idle.connections", description="Minimum number of idle connections that can exist per client pool.", type={DataType.INT}, optional=true, defaultValue="0"), @Parameter(name="max.pool.idle.connections", description="Maximum number of idle connections that can exist per client pool.", type={DataType.INT}, optional=true, defaultValue="100"), @Parameter(name="min.evictable.idle.time", description="Minimum time (in millis) a connection may sit idle in the client pool before it become eligible for eviction.", type={DataType.STRING}, optional=true, defaultValue="300000"), @Parameter(name="time.between.eviction.runs", description="Time between two eviction operations (in millis) on the client pool.", type={DataType.STRING}, optional=true, defaultValue="30000"), @Parameter(name="max.wait.time", description="The maximum time (in millis) the pool will wait (when there are no available connections) for a connection to be returned to the pool.", type={DataType.STRING}, optional=true, defaultValue="60000"), @Parameter(name="test.on.borrow", description="Enable connections to be validated before being borrowed from the client pool.", type={DataType.BOOL}, optional=true, defaultValue="true"), @Parameter(name="test.while.idle", description="Enable connections to be validated during the eviction operation (if any).", type={DataType.BOOL}, optional=true, defaultValue="true"), @Parameter(name="exhausted.action", description="Action that should be taken when the maximum number of active connections are being used. This action should be indicated as an int and possible action values are following.\n0 - Fail the request.\n1 - Block the request, until a connection returns to the pool.\n2 - Grow the connection pool size.", type={DataType.INT}, optional=true, defaultValue="1 (Block when exhausted)"), @Parameter(name="hostname.verification.enabled", description="Enable hostname verification.", type={DataType.BOOL}, optional=true, defaultValue="true")}, examples={@Example(syntax="@sink(type = 'http', publisher.url = 'http://stocks.com/stocks',\n      @map(type = 'json'))\ndefine stream StockStream (symbol string, price float, volume long);", description="Events arriving on the StockStream will be published to the HTTP endpoint `http://stocks.com/stocks` using `POST` method with Content-Type `application/json` by converting those events to the default JSON format as following:\n```{\n  \"event\": {\n    \"symbol\": \"FB\",\n    \"price\": 24.5,\n    \"volume\": 5000\n  }\n}```"), @Example(syntax="@sink(type='http', publisher.url = 'http://localhost:8009/foo',\n      client.bootstrap.configurations = \"'client.bootstrap.socket.timeout:20'\",\n      max.pool.active.connections = '1', headers = \"{{headers}}\",\n      @map(type='xml', @payload(\"\"\"<stock>\n{{payloadBody}}\n</stock>\"\"\")))\ndefine stream FooStream (payloadBody String, headers string);", description="Events arriving on FooStream will be published to the HTTP endpoint `http://localhost:8009/foo` using `POST` method with Content-Type `application/xml` and setting `payloadBody` and `header` attribute values.\nIf the `payloadBody` contains\n```<symbol>WSO2</symbol>\n<price>55.6</price>\n<volume>100</volume>```and `header` contains `'topic:foobar'` values, then the system will generate an output with the body:\n```<stock>\n<symbol>WSO2</symbol>\n<price>55.6</price>\n<volume>100</volume>\n</stock>```and HTTP headers:\n`Content-Length:xxx`,\n`Content-Location:'xxx'`,\n`Content-Type:'application/xml'`,\n`HTTP_METHOD:'POST'`")}, systemParameter={@SystemParameter(name="clientBootstrapClientGroupSize", description="Number of client threads to perform non-blocking read and write to one or more channels.", defaultValue="(Number of available processors) * 2", possibleParameters={"Any positive integer"}), @SystemParameter(name="clientBootstrapBossGroupSize", description="Number of boss threads to accept incoming connections.", defaultValue="Number of available processors", possibleParameters={"Any positive integer"}), @SystemParameter(name="clientBootstrapWorkerGroupSize", description="Number of worker threads to accept the connections from boss threads and perform non-blocking read and write from one or more channels.", defaultValue="(Number of available processors) * 2", possibleParameters={"Any positive integer"}), @SystemParameter(name="trustStoreLocation", description="The default truststore file path.", defaultValue="`${carbon.home}/resources/security/client-truststore.jks`", possibleParameters={"Path to client truststore `.jks` file"}), @SystemParameter(name="trustStorePassword", description="The default truststore password.", defaultValue="wso2carbon", possibleParameters={"Truststore password as string"})})
public class HttpSink
extends Sink {
    private static final Logger log = Logger.getLogger(HttpSink.class);
    String mapType;
    Option httpHeaderOption;
    Option httpMethodOption;
    protected String streamID;
    protected String consumerKey;
    protected String consumerSecret;
    private String authorizationHeader;
    protected String userName;
    protected String userPassword;
    protected ClientConnector staticClientConnector;
    protected Option publisherURLOption;
    private String clientStoreFile;
    private String clientStorePass;
    private int socketIdleTimeout;
    private String sslProtocol;
    private String tlsStoreType;
    private String chunkDisabled;
    private String parametersList;
    private String clientBootstrapConfiguration;
    private ConfigReader configReader;
    protected SiddhiAppContext siddhiAppContext;
    protected String oauthUsername;
    protected String oauthUserPassword;
    private Option refreshToken;
    protected String authType;
    private AccessTokenCache accessTokenCache = AccessTokenCache.getInstance();
    protected String tokenURL;
    private String hostnameVerificationEnabled;
    private String sslVerificationDisabled;
    private Executor executor = null;
    private String publisherURL;
    protected SinkMetrics metrics;
    protected long startTime;
    protected long endTime;
    private DefaultHttpWsConnectorFactory httpConnectorFactory;
    private ProxyServerConfiguration proxyServerConfiguration;
    private PoolConfiguration connectionPoolConfiguration;

    public Class[] getSupportedInputEventClasses() {
        return new Class[]{String.class, Map.class};
    }

    public String[] getSupportedDynamicOptions() {
        return new String[]{"headers", "method", "publisher.url", "refresh.token"};
    }

    protected StateFactory init(StreamDefinition outputStreamDefinition, OptionHolder optionHolder, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.configReader = configReader;
        this.siddhiAppContext = siddhiAppContext;
        this.streamID = siddhiAppContext.getName() + ":" + outputStreamDefinition.toString();
        this.publisherURLOption = optionHolder.validateAndGetOption("publisher.url");
        this.httpHeaderOption = optionHolder.getOrCreateOption("headers", HttpConstants.DEFAULT_HEADER);
        this.httpMethodOption = optionHolder.getOrCreateOption("method", "POST");
        this.consumerKey = optionHolder.validateAndGetStaticValue("consumer.key", "");
        this.consumerSecret = optionHolder.validateAndGetStaticValue("consumer.secret", "");
        this.oauthUsername = optionHolder.validateAndGetStaticValue("oauth.username", "");
        this.oauthUserPassword = optionHolder.validateAndGetStaticValue("oauth.password", "");
        this.refreshToken = optionHolder.getOrCreateOption("refresh.token", "");
        this.tokenURL = optionHolder.validateAndGetStaticValue("token.url", "");
        this.clientStoreFile = optionHolder.validateAndGetStaticValue("https.truststore.file", HttpSinkUtil.trustStorePath(configReader));
        this.clientStorePass = optionHolder.validateAndGetStaticValue("https.truststore.password", HttpSinkUtil.trustStorePassword(configReader));
        this.socketIdleTimeout = Integer.parseInt(optionHolder.validateAndGetStaticValue("socket.idle.timeout", "-1"));
        this.sslProtocol = optionHolder.validateAndGetStaticValue("ssl.protocol", "");
        this.tlsStoreType = optionHolder.validateAndGetStaticValue("tls.store.type", "");
        this.chunkDisabled = optionHolder.validateAndGetStaticValue("chunk.disabled", "");
        this.connectionPoolConfiguration = HttpSinkUtil.createPoolConfigurations(optionHolder);
        this.parametersList = optionHolder.validateAndGetStaticValue("parameters", "");
        this.clientBootstrapConfiguration = optionHolder.validateAndGetStaticValue("client.bootstrap.configurations", "");
        this.hostnameVerificationEnabled = optionHolder.validateAndGetStaticValue("hostname.verification.enabled", "true");
        this.sslVerificationDisabled = optionHolder.validateAndGetStaticValue("ssl.verification.disabled", "false");
        this.userName = optionHolder.validateAndGetStaticValue("basic.auth.username", "");
        this.userPassword = optionHolder.validateAndGetStaticValue("basic.auth.password", "");
        this.authType = !"".equals(this.userName) && !"".equals(this.userPassword) ? "basic.auth" : (!"".equals(this.consumerKey) && !"".equals(this.consumerSecret) || !"".equals(this.oauthUsername) && !"".equals(this.oauthUserPassword) ? "oauth" : "no.auth");
        if ("".equals(this.userName) ^ "".equals(this.userPassword)) {
            throw new SiddhiAppCreationException("Please provide user name and password in http sink with the stream " + this.streamID + " in Siddhi app " + siddhiAppContext.getName());
        }
        if (!"".equals(this.userName)) {
            byte[] val = (this.userName + ":" + this.userPassword).getBytes(Charset.defaultCharset());
            this.authorizationHeader = "Basic " + Base64.encode(Unpooled.copiedBuffer(val));
        }
        this.proxyServerConfiguration = HttpSinkUtil.createProxyServerConfiguration(optionHolder, this.streamID, siddhiAppContext.getName());
        this.httpConnectorFactory = HttpSinkUtil.createConnectorFactory(configReader);
        if (this.publisherURLOption.isStatic()) {
            this.staticClientConnector = this.createClientConnector(null);
        }
        this.initMetrics(outputStreamDefinition.getId());
        return null;
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public void publish(Object payload, DynamicOptions dynamicOptions, State state) throws ConnectionUnavailableException {
        this.startTime = System.currentTimeMillis();
        String headers = this.httpHeaderOption.getValue(dynamicOptions);
        List<Header> headersList = HttpSinkUtil.getHeaders(headers);
        ClientConnector clientConnector = this.staticClientConnector != null ? this.staticClientConnector : this.createClientConnector(dynamicOptions);
        if (this.mapType == null) {
            this.mapType = this.getMapper().getType();
        }
        if (this.authType.equals("basic.auth") || this.authType.equals("no.auth")) {
            this.sendRequest(payload, dynamicOptions, headersList, clientConnector);
        } else {
            this.sendOauthRequest(payload, dynamicOptions, headersList, clientConnector);
        }
    }

    protected int sendRequest(Object payload, DynamicOptions dynamicOptions, List<Header> headersList, ClientConnector clientConnector) throws ConnectionUnavailableException {
        String httpMethod = "".equals(this.httpMethodOption.getValue(dynamicOptions)) ? "POST" : this.httpMethodOption.getValue(dynamicOptions);
        String contentType = HttpSinkUtil.getContentType(this.mapType, headersList);
        String messageBody = this.getMessageBody(payload);
        HttpMethod httpReqMethod = new HttpMethod(httpMethod);
        HttpCarbonMessage cMessage = new HttpCarbonMessage(new DefaultHttpRequest(HttpVersion.HTTP_1_1, httpReqMethod, ""));
        cMessage = this.generateCarbonMessage(headersList, contentType, httpMethod, cMessage, clientConnector.getHttpURLProperties());
        this.publisherURL = this.publisherURLOption.isStatic() ? this.publisherURLOption.getValue() : this.publisherURLOption.getValue(dynamicOptions);
        if (this.metrics != null) {
            this.metrics.getTotalWritesMetric().inc();
            this.metrics.getTotalHttpWritesMetric(this.publisherURL).inc();
            this.metrics.getRequestSizeMetric(this.publisherURL).inc(HttpSinkUtil.getByteSize(messageBody));
        }
        if (!"GET".equals(httpMethod)) {
            cMessage.addHttpContent(new DefaultLastHttpContent(Unpooled.wrappedBuffer(messageBody.getBytes(Charset.defaultCharset()))));
        }
        cMessage.completeMessage();
        if ("oauth".equals(this.authType)) {
            CountDownLatch latch = new CountDownLatch(1);
            DefaultListener listener = new DefaultListener(latch, this.authType);
            HttpResponseFuture responseFuture = clientConnector.send(cMessage);
            responseFuture.setHttpConnectorListener(listener);
            try {
                boolean latchCount = latch.await(30L, TimeUnit.SECONDS);
                if (!latchCount) {
                    log.debug((Object)("Time out due to getting getting response from " + clientConnector.getPublisherURL() + ". Message dropped."));
                    throw new ConnectionUnavailableException("Time out due to getting getting response from " + clientConnector.getPublisherURL() + ". Message dropped.");
                }
            }
            catch (InterruptedException e) {
                log.debug((Object)("Failed to get a response from " + clientConnector.getPublisherURL() + "," + e + ". Message dropped."));
                throw new ConnectionUnavailableException("Failed to get a response from " + clientConnector.getPublisherURL() + ", " + e + ". Message dropped.");
            }
            HttpCarbonMessage response = listener.getHttpResponseMessage();
            return response.getNettyHttpResponse().status().code();
        }
        HttpResponseFuture responseFuture = clientConnector.send(cMessage);
        HTTPResponseListener responseListener = new HTTPResponseListener(payload, dynamicOptions, this, clientConnector.getPublisherURL());
        responseFuture.setHttpConnectorListener(responseListener);
        return 200;
    }

    private String getStringFromInputStream(InputStream in) {
        String result;
        BufferedInputStream bis = new BufferedInputStream(in);
        try (ByteArrayOutputStream bos = new ByteArrayOutputStream();){
            int data;
            while ((data = bis.read()) != -1) {
                bos.write(data);
            }
            result = bos.toString(StandardCharsets.UTF_8.toString());
        }
        catch (IOException ioe) {
            log.error((Object)"Couldn't read the complete input stream");
            return "";
        }
        return result;
    }

    protected void sendOauthRequest(Object payload, DynamicOptions dynamicOptions, List<Header> headersList, ClientConnector clientConnector) throws ConnectionUnavailableException {
        String consumerKeyValue = this.consumerKey + ":" + this.consumerSecret;
        String encodedAuth = "Basic " + this.encodeBase64(consumerKeyValue).replaceAll("\n", "");
        this.setAccessToken(encodedAuth, dynamicOptions, headersList, clientConnector.getPublisherURL());
        int response = this.sendRequest(payload, dynamicOptions, headersList, clientConnector);
        if (response == 401) {
            this.handleOAuthFailure(payload, dynamicOptions, headersList, encodedAuth, clientConnector);
        } else if (200 <= response && response < 300) {
            log.info((Object)("Request sent successfully to " + clientConnector.getPublisherURL()));
        } else {
            if (response == 500) {
                log.error((Object)("Error at sending oauth request to API endpoint " + clientConnector.getPublisherURL() + "', with response code: " + response + "- Internal server error. Message dropped"));
                throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint, " + clientConnector.getPublisherURL() + "', with response code: " + response + "- Internal server error. Message dropped.");
            }
            log.error((Object)("Error at sending oauth request to API endpoint " + clientConnector.getPublisherURL() + "', with response code: " + response + ". Message dropped."));
            throw new ConnectionUnavailableException("Error at sending oauth request to API endpoint " + clientConnector.getPublisherURL() + "', and response code: " + response + ". Message dropped.");
        }
    }

    private void handleOAuthFailure(Object payload, DynamicOptions dynamicOptions, List<Header> headersList, String encodedAuth, ClientConnector clientConnector) throws ConnectionUnavailableException {
        boolean checkFromCache = this.accessTokenCache.checkAvailableKey(encodedAuth);
        if (checkFromCache) {
            this.getNewAccessTokenWithCache(payload, dynamicOptions, headersList, encodedAuth, clientConnector);
        } else {
            this.requestForNewAccessToken(payload, dynamicOptions, headersList, encodedAuth, clientConnector);
        }
    }

    private void getNewAccessTokenWithCache(Object payload, DynamicOptions dynamicOptions, List<Header> headersList, String encodedAuth, ClientConnector clientConnector) throws ConnectionUnavailableException {
        int response;
        String accessToken = this.accessTokenCache.getAccessToken(encodedAuth);
        for (Header header : headersList) {
            if (!header.getName().equals("Authorization")) continue;
            header.setValue(accessToken);
            break;
        }
        if ((response = this.sendRequest(payload, dynamicOptions, headersList, clientConnector)) == 200) {
            log.info((Object)("Request sent successfully to " + clientConnector.getPublisherURL()));
        } else if (response == 401) {
            this.requestForNewAccessToken(payload, dynamicOptions, headersList, encodedAuth, clientConnector);
        } else {
            if (response == 500) {
                log.error((Object)("Error at sending oauth request to API endpoint, " + clientConnector.getPublisherURL() + "', with response code: " + response + "- Internal server error. Message dropped"));
                throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint, " + clientConnector.getPublisherURL() + "', with response code: " + response + "- Internal server error. Message dropped");
            }
            log.error((Object)("Error at sending oauth request to API endpoint " + clientConnector.getPublisherURL() + "', with response code: " + response + ". Message dropped."));
            throw new ConnectionUnavailableException("Error at sending oauth request to API endpoint " + clientConnector.getPublisherURL() + "', with response code: " + response + ". Message dropped.");
        }
    }

    private void requestForNewAccessToken(Object payload, DynamicOptions dynamicOptions, List<Header> headersList, String encodedAuth, ClientConnector clientConnector) throws ConnectionUnavailableException {
        Boolean checkRefreshToken = this.accessTokenCache.checkRefreshAvailableKey(encodedAuth);
        if (checkRefreshToken.booleanValue()) {
            for (Header header : headersList) {
                if (!header.getName().equals("refresh.token")) continue;
                if (this.accessTokenCache.getRefreshtoken(encodedAuth) == null) break;
                header.setValue(this.accessTokenCache.getRefreshtoken(encodedAuth));
                break;
            }
        }
        this.getAccessToken(dynamicOptions, encodedAuth, this.tokenURL);
        if (this.accessTokenCache.getResponseCode(encodedAuth) == 200) {
            int n;
            String newAccessToken = this.accessTokenCache.getAccessToken(encodedAuth);
            this.accessTokenCache.setAccessToken(encodedAuth, newAccessToken);
            if (this.accessTokenCache.getRefreshtoken(encodedAuth) != null) {
                this.accessTokenCache.setRefreshtoken(encodedAuth, this.accessTokenCache.getRefreshtoken(encodedAuth));
            }
            for (Header header : headersList) {
                if (!header.getName().equals("Authorization")) continue;
                header.setValue(newAccessToken);
                break;
            }
            if ((n = this.sendRequest(payload, dynamicOptions, headersList, clientConnector)) != 200) {
                if (n == 401) {
                    log.error((Object)("Error at sending oauth request to API endpoint " + clientConnector.getPublisherURL() + "', with response code: " + n + "- Authentication Failure. Please provide a valid Consumer key, Consumer secret and token endpoint URL . Message dropped"));
                    throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint " + clientConnector.getPublisherURL() + "', with response code: " + n + "- Authentication Failure. Please provide a valid Consumer key, Consumer secret and token endpoint URL. Message dropped");
                }
                if (n == 500) {
                    log.error((Object)("Error at sending oauth request to API endpoint " + clientConnector.getPublisherURL() + "', with response code: " + n + "- Internal server error. Message dropped"));
                    throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint " + clientConnector.getPublisherURL() + "', with response code: " + n + "- Internal server error. Message dropped");
                }
                log.error((Object)("Error at sending oauth request to API endpoint " + clientConnector.getPublisherURL() + "', with response code: " + n + ". Message dropped."));
                throw new ConnectionUnavailableException("Error at sending oauth request to API endpoint " + clientConnector.getPublisherURL() + "', with response code: " + n + ". Message dropped.");
            }
        } else {
            if (this.accessTokenCache.getResponseCode(encodedAuth) == 401) {
                log.error((Object)("Failed to generate new access token for the expired access token to " + clientConnector.getPublisherURL() + "', " + this.accessTokenCache.getResponseCode(encodedAuth) + ": Authentication Failure.cPlease provide a valid Consumer key, Consumer secret and token endpoint URL . Message dropped"));
                throw new HttpSinkAdaptorRuntimeException("Failed to generate new access token for the expired access token to " + clientConnector.getPublisherURL() + "', " + this.accessTokenCache.getResponseCode(encodedAuth) + ": Authentication Failure.Please provide a valid Consumer key, Consumer secret and token endpoint URL . Message dropped");
            }
            log.error((Object)("Failed to generate new access token for the expired access token. Error code: " + this.accessTokenCache.getResponseCode(encodedAuth) + ". Message dropped."));
            throw new ConnectionUnavailableException("Failed to generate new access token for the expired access token. Error code: " + this.accessTokenCache.getResponseCode(encodedAuth) + ". Message dropped.");
        }
        log.info((Object)("Request sent successfully to " + clientConnector.getPublisherURL()));
    }

    void getAccessToken(DynamicOptions dynamicOptions, String encodedAuth, String tokenURL) {
        this.tokenURL = tokenURL;
        HttpsClient httpsClient = new HttpsClient();
        if (!"".equals(this.oauthUsername) && !"".equals(this.oauthUserPassword)) {
            httpsClient.getPasswordGrantAccessToken(tokenURL, this.clientStoreFile, this.clientStorePass, this.oauthUsername, this.oauthUserPassword, encodedAuth);
        } else if (!"".equals(this.refreshToken.getValue(dynamicOptions)) || this.accessTokenCache.getRefreshtoken(encodedAuth) != null) {
            httpsClient.getRefreshGrantAccessToken(tokenURL, this.clientStoreFile, this.clientStorePass, encodedAuth, this.refreshToken.getValue(dynamicOptions));
        } else {
            httpsClient.getClientGrantAccessToken(tokenURL, this.clientStoreFile, this.clientStorePass, encodedAuth);
        }
    }

    void setAccessToken(String encodedAuth, DynamicOptions dynamicOptions, List<Header> headersList, String publisherURL) throws ConnectionUnavailableException {
        block7: {
            block5: {
                block6: {
                    boolean authAvailability = false;
                    for (Header header : headersList) {
                        if (!header.getName().equals("Authorization")) continue;
                        authAvailability = true;
                        break;
                    }
                    if (authAvailability) break block5;
                    this.getAccessToken(dynamicOptions, encodedAuth, this.tokenURL);
                    if (this.accessTokenCache.getResponseCode(encodedAuth) != 200) break block6;
                    headersList.add(new Header("Authorization", this.accessTokenCache.getAccessToken(encodedAuth)));
                    if (this.accessTokenCache.getRefreshtoken(encodedAuth) == null) break block7;
                    headersList.add(new Header("refresh.token", this.accessTokenCache.getRefreshtoken(encodedAuth)));
                    break block7;
                }
                if (this.accessTokenCache.getResponseCode(encodedAuth) == 401) {
                    log.error((Object)("Failed to generate new access token for the expired access token to " + publisherURL + "', with response code: " + this.accessTokenCache.getResponseCode(encodedAuth) + "- Authentication Failure.Please provide a valid Consumer key, Consumer secret and token endpoint URL . Message dropped"));
                    throw new HttpSinkAdaptorRuntimeException("Failed to generate new access token for the expired access token to " + publisherURL + "', with response code: " + this.accessTokenCache.getResponseCode(encodedAuth) + "- Authentication Failure.Please provide a valid Consumer key, Consumer secret and token endpoint URL . Message dropped");
                }
                if (this.accessTokenCache.getResponseCode(encodedAuth) == 500) {
                    log.error((Object)("Failed to generate new access token for the expired access token to " + publisherURL + "', with response code: " + this.accessTokenCache.getResponseCode(encodedAuth) + "- Internal server error. Message dropped"));
                    throw new HttpSinkAdaptorRuntimeException("Failed to generate new access token for the expired access token to " + publisherURL + "', with response code: " + this.accessTokenCache.getResponseCode(encodedAuth) + "- Internal server error. Message dropped");
                }
                log.error((Object)("Failed to generate new access token for the expired access token. Error code: " + this.accessTokenCache.getResponseCode(encodedAuth) + ". Message dropped."));
                throw new ConnectionUnavailableException("Failed to generate new access token for the expired access token. Error code: " + this.accessTokenCache.getResponseCode(encodedAuth) + ". Message dropped.");
            }
            if (this.accessTokenCache.checkAvailableKey(encodedAuth)) {
                String accessToken = this.accessTokenCache.getAccessToken(encodedAuth);
                for (Header header : headersList) {
                    if (!header.getName().equals("Authorization")) continue;
                    header.setValue(accessToken);
                    break;
                }
            }
        }
    }

    public void connect() {
    }

    public void disconnect() {
        if (this.staticClientConnector != null) {
            String publisherURL = this.staticClientConnector.getPublisherURL();
            this.staticClientConnector = null;
            log.debug((Object)("Server connector for url " + publisherURL + " disconnected."));
        }
        if (this.httpConnectorFactory != null) {
            this.httpConnectorFactory.shutdownNow();
            this.httpConnectorFactory = null;
        }
    }

    public void destroy() {
        if (this.staticClientConnector != null) {
            String publisherURL = this.staticClientConnector.getPublisherURL();
            this.staticClientConnector = null;
            log.debug((Object)("Server connector for url " + publisherURL + " disconnected."));
        }
    }

    protected void initMetrics(String streamName) {
        if (MetricsDataHolder.getInstance().getMetricService() != null && MetricsDataHolder.getInstance().getMetricManagementService().isEnabled()) {
            try {
                if (MetricsDataHolder.getInstance().getMetricManagementService().isReporterRunning("prometheus")) {
                    this.metrics = new SinkMetrics(this.siddhiAppContext.getName(), streamName);
                }
            }
            catch (IllegalArgumentException e) {
                log.debug((Object)("Prometheus reporter is not running. Hence http sink metrics will not be initialized for " + this.siddhiAppContext.getName()));
            }
        }
    }

    HttpCarbonMessage generateCarbonMessage(List<Header> headers, String contentType, String httpMethod, HttpCarbonMessage cMessage, Map<String, String> httpURLProperties) {
        cMessage.setProperty("PROTOCOL", httpURLProperties.get("PROTOCOL"));
        cMessage.setProperty("TO", httpURLProperties.get("TO"));
        cMessage.setProperty("host", httpURLProperties.get("host"));
        cMessage.setProperty("port", Integer.valueOf(httpURLProperties.get("port")));
        cMessage.setHttpMethod(httpMethod);
        cMessage.setRequestUrl(httpURLProperties.get("REQUEST_URL"));
        HttpHeaders httpHeaders = cMessage.getHeaders();
        if (!this.userName.equals("") && !this.userPassword.equals("")) {
            httpHeaders.set("Authorization", (Object)this.authorizationHeader);
        } else if (!this.userName.equals("") || !this.userPassword.equals("")) {
            log.error((Object)"One of the basic authentication username or password missing. Hence basic authentication not supported.");
        }
        httpHeaders.set("host", cMessage.getProperty("host"));
        if (headers != null) {
            for (Header header : headers) {
                httpHeaders.set(header.getName(), (Object)header.getValue());
            }
        }
        if (contentType.contains(this.mapType)) {
            httpHeaders.set("Content-Type", (Object)contentType);
        }
        cMessage.setHttpMethod(httpMethod);
        return cMessage;
    }

    String getMessageBody(Object payload) {
        if ("keyvalue".equals(this.mapType)) {
            HashMap params = (HashMap)payload;
            return params.entrySet().stream().map(p -> this.encodeMessage(p.getKey()) + "=" + this.encodeMessage(p.getValue())).reduce("", (p1, p2) -> p1 + "&" + p2);
        }
        return (String)payload;
    }

    public ClientConnector createClientConnector(DynamicOptions dynamicOptions) {
        this.publisherURL = this.publisherURLOption.isStatic() ? this.publisherURLOption.getValue() : this.publisherURLOption.getValue(dynamicOptions);
        if (this.authType.equals("oauth")) {
            if ("".equals(this.consumerSecret) || "".equals(this.consumerKey)) {
                throw new SiddhiAppCreationException("consumer.key and consumer.secret found empty but it is Mandatory field in http sink in " + this.streamID);
            }
            if ("".equals(this.tokenURL)) {
                throw new SiddhiAppCreationException("token.url found empty but it is Mandatory field in http sink in " + this.streamID);
            }
        }
        String scheme = HttpSinkUtil.getScheme(this.publisherURL);
        Map<String, String> httpURLProperties = HttpSinkUtil.getURLProperties(this.publisherURL);
        SenderConfiguration senderConfig = HttpSinkUtil.getSenderConfigurations(httpURLProperties, this.clientStoreFile, this.clientStorePass, this.configReader);
        if ("".equals(this.publisherURL)) {
            throw new SiddhiAppCreationException("Receiver URL found empty but it is Mandatory field in http sink in " + this.streamID);
        }
        if ("https".equals(scheme) && (this.clientStoreFile == null || this.clientStorePass == null)) {
            throw new ExceptionInInitializerError("Client trustStore file path or password are empty while default scheme is 'https'. Please provide client trustStore file path and password in " + this.streamID);
        }
        if (this.proxyServerConfiguration != null) {
            senderConfig.setProxyServerConfiguration(this.proxyServerConfiguration);
        }
        senderConfig.setPoolConfiguration(this.connectionPoolConfiguration);
        if (this.socketIdleTimeout != -1) {
            senderConfig.setSocketIdleTimeout(this.socketIdleTimeout);
        }
        if (!"".equals(this.sslProtocol)) {
            senderConfig.setSSLProtocol(this.sslProtocol);
        }
        if (!"".equals(this.tlsStoreType)) {
            senderConfig.setTLSStoreType(this.tlsStoreType);
        }
        if (!"".equals(this.chunkDisabled) && this.chunkDisabled != null) {
            if (Boolean.parseBoolean(this.chunkDisabled)) {
                senderConfig.setChunkingConfig(ChunkConfig.NEVER);
            } else {
                senderConfig.setChunkingConfig(ChunkConfig.ALWAYS);
            }
        }
        if (!"".equals(this.parametersList)) {
            senderConfig.setParameters(HttpIoUtil.populateParameters(this.parametersList));
        }
        if (!"true".equalsIgnoreCase(this.hostnameVerificationEnabled)) {
            senderConfig.setHostNameVerificationEnabled(false);
        }
        if ("true".equalsIgnoreCase(this.sslVerificationDisabled)) {
            senderConfig.disableSsl();
        }
        this.executor = Executors.newFixedThreadPool(senderConfig.getPoolConfiguration().getExecutorServiceThreads());
        Map<String, Object> bootStrapProperties = HttpSinkUtil.populateTransportConfiguration(this.clientBootstrapConfiguration);
        return new ClientConnector(this.publisherURL, httpURLProperties, this.httpConnectorFactory.createHttpClientConnector(bootStrapProperties, senderConfig));
    }

    private String encodeMessage(Object s) {
        try {
            return URLEncoder.encode((String)s, "UTF-8");
        }
        catch (UnsupportedEncodingException e) {
            throw new SiddhiAppRuntimeException("Execution of Siddhi app " + this.siddhiAppContext.getName() + " failed due to " + e.getMessage(), (Throwable)e);
        }
    }

    private String encodeBase64(String consumerKeyValue) {
        ByteBuf byteBuf = Unpooled.wrappedBuffer(consumerKeyValue.getBytes(StandardCharsets.UTF_8));
        ByteBuf encodedByteBuf = Base64.encode(byteBuf);
        return encodedByteBuf.toString(StandardCharsets.UTF_8);
    }

    private class HTTPResponseListener
    implements HttpConnectorListener {
        Object payload;
        DynamicOptions dynamicOptions;
        HttpSink httpSink;
        private final String publisherURL;

        HTTPResponseListener(Object payload, DynamicOptions dynamicOptions, HttpSink httpSink2, String publisherURL) {
            this.payload = payload;
            this.dynamicOptions = dynamicOptions;
            this.httpSink = httpSink2;
            this.publisherURL = publisherURL;
        }

        @Override
        public void onMessage(final HttpCarbonMessage httpCarbonMessage) {
            HttpSink.this.endTime = System.currentTimeMillis();
            if (HttpSink.this.metrics != null) {
                HttpSink.this.metrics.setEndpointStatusMetric(this.publisherURL, EndpointStatus.ONLINE);
                HttpSink.this.metrics.setLatencyMetric(this.publisherURL, HttpSink.this.endTime - HttpSink.this.startTime);
                HttpSink.this.metrics.setLastEventTime(this.publisherURL, HttpSink.this.endTime);
                if (httpCarbonMessage.getHttpStatusCode() / 100 != 2) {
                    HttpSink.this.metrics.getTotalHttpErrorsMetric(this.publisherURL).inc();
                }
            }
            if (HttpSink.this.executor != null) {
                HttpSink.this.executor.execute(new Runnable(){

                    @Override
                    public void run() {
                        HttpSink.this.getStringFromInputStream(new HttpMessageDataStreamer(httpCarbonMessage).getInputStream());
                    }
                });
            }
        }

        @Override
        public void onError(Throwable throwable) {
            if (HttpSink.this.metrics != null) {
                HttpSink.this.metrics.getTotalHttpErrorsMetric(this.publisherURL).inc();
                HttpSink.this.metrics.setEndpointStatusMetric(this.publisherURL, EndpointStatus.OFFLINE);
            }
            this.httpSink.onError(this.payload, this.dynamicOptions, (Exception)new ConnectionUnavailableException("HTTP sink on stream " + this.httpSink.streamID + " of Siddhi App '" + this.httpSink.siddhiAppContext.getName() + "' failed to publish events to endpoint '" + this.publisherURL + "'. " + throwable.getMessage(), throwable));
        }
    }
}

