package io.siddhi.extension.io.http.sink;

import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.SystemParameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.Option;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.http.sink.util.HttpSinkUtil;
import io.siddhi.extension.io.http.source.HttpResponseMessageListener;
import io.siddhi.extension.io.http.util.HttpConstants;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.definition.StreamDefinition;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.wso2.carbon.messaging.Header;
import org.wso2.transport.http.netty.contract.Constants;
import org.wso2.transport.http.netty.contract.HttpResponseFuture;
import org.wso2.transport.http.netty.message.HttpCarbonMessage;

@Extension(name = "http-call", namespace = "sink", description = "The http-call sink publishes messages to endpoints via HTTP or HTTPS protocols using methods such as POST, GET, PUT, and DELETE on formats `text`, `XML` or `JSON` and consume responses through its corresponding http-call-response source. It also supports calling endpoints protected with basic authentication or OAuth 2.0.", parameters = {@Parameter(name = HttpConstants.PUBLISHER_URL, description = "The URL which should be called.\nExamples:\n`http://localhost:8080/endpoint`,\n`https://localhost:8080/endpoint`", type = {DataType.STRING}), @Parameter(name = HttpConstants.SINK_ID, description = "Identifier to correlate the http-call sink to its corresponding http-call-response sources to retrieved the responses.", type = {DataType.STRING}), @Parameter(name = HttpConstants.RECEIVER_USERNAME, description = "The username to be included in the authentication header when calling endpoints protected by basic authentication. `basic.auth.password` property should be also set when using this property.", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.RECEIVER_PASSWORD, description = "The password to be included in the authentication header when calling endpoints protected by basic authentication. `basic.auth.username` property should be also set when using this property.", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.CLIENT_TRUSTSTORE_PATH_PARAM, description = "The file path of the client truststore when sending messages through `https` protocol.", type = {DataType.STRING}, optional = true, defaultValue = "`${carbon.home}/resources/security/client-truststore.jks`"), @Parameter(name = HttpConstants.CLIENT_TRUSTSTORE_PASSWORD_PARAM, description = "The password for the client-truststore.", type = {DataType.STRING}, optional = true, defaultValue = "wso2carbon"), @Parameter(name = HttpConstants.RECEIVER_OAUTH_USERNAME, description = "The username to be included in the authentication header when calling endpoints protected by OAuth 2.0. `oauth.password` property should be also set when using this property.", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.RECEIVER_OAUTH_PASSWORD, description = "The password to be included in the authentication header when calling endpoints protected by OAuth 2.0. `oauth.username` property should be also set when using this property.", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.CONSUMER_KEY, description = "Consumer key used for calling endpoints protected by OAuth 2.0", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.CONSUMER_SECRET, description = "Consumer secret used for calling endpoints protected by OAuth 2.0", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.TOKEN_URL, description = "Token URL to generate a new access tokens when calling endpoints protected by OAuth 2.0", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.RECEIVER_REFRESH_TOKEN, description = "Refresh token used for generating new access tokens when calling endpoints protected by OAuth 2.0", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.HEADERS, description = "HTTP request headers in format `\"'<key>:<value>','<key>:<value>'\"`.\nWhen the `Content-Type` header is not provided the system decides the Content-Type based on the provided sink mapper as following: \n - `@map(type='xml')`: `application/xml`\n - `@map(type='json')`: `application/json`\n - `@map(type='text')`: `plain/text`\n - `@map(type='keyvalue')`: `application/x-www-form-urlencoded`\n - For all other cases system defaults to `plain/text`\nAlso the `Content-Length` header need not to be provided, as the system automatically defines it by calculating the size of the payload.", type = {DataType.STRING}, optional = true, defaultValue = "Content-Type and Content-Length headers"), @Parameter(name = HttpConstants.METHOD, description = "The HTTP method used for calling the endpoint.", type = {DataType.STRING}, optional = true, defaultValue = "POST"), @Parameter(name = HttpConstants.DOWNLOAD_ENABLED, description = "Enable response received by the http-call-response source to be written to a file. When this is enabled the `download.path` property should be also set.", type = {DataType.BOOL}, optional = true, defaultValue = "false"), @Parameter(name = HttpConstants.DOWNLOAD_PATH, description = "The absolute file path along with the file name where the downloads should be saved.", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR, dynamic = true), @Parameter(name = HttpConstants.BLOCKING_IO, description = "Blocks the request thread until a response it received from HTTP call-response source before sending any other request.", type = {DataType.BOOL}, optional = true, defaultValue = "false"), @Parameter(name = HttpConstants.SOCKET_IDEAL_TIMEOUT, description = "Socket timeout in millis.", type = {DataType.INT}, optional = true, defaultValue = "6000"), @Parameter(name = HttpConstants.CLIENT_CHUNK_DISABLED, description = "Disable chunked transfer encoding.", type = {DataType.BOOL}, optional = true, defaultValue = "false"), @Parameter(name = HttpConstants.SSL_PROTOCOL, description = "SSL/TLS protocol.", type = {DataType.STRING}, optional = true, defaultValue = Constants.TLS_PROTOCOL), @Parameter(name = HttpConstants.SSL_VERIFICATION_DISABLED, description = "Disable SSL verification.", type = {DataType.BOOL}, optional = true, defaultValue = "false"), @Parameter(name = HttpConstants.SSS_CONFIGS, description = "SSL/TSL configurations.\nExpected format `\"'<key>:<value>','<key>:<value>'\"`.\nSome supported parameters:\n - SSL/TLS protocols: `'sslEnabledProtocols:TLSv1.1,TLSv1.2'`\n - List of ciphers: `'ciphers:TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256'`\n - Enable session creation: `'client.enable.session.creation:true'`\n - Supported server names: `'server.suported.server.names:server'`\n - Add HTTP SNIMatcher: `'server.supported.snimatchers:SNIMatcher'`", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.PROXY_HOST, description = "Proxy server host", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.PROXY_PORT, description = "Proxy server port", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.PROXY_USERNAME, description = "Proxy server username", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.PROXY_PASSWORD, description = "Proxy server password", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.CLIENT_BOOTSTRAP_CONFIGURATION, description = "Client bootstrap configurations in format `\"'<key>:<value>','<key>:<value>'\"`.\nSome supported configurations :\n - Client connect timeout in millis: `'client.bootstrap.connect.timeout:15000'`\n - Client socket timeout in seconds: `'client.bootstrap.socket.timeout:15'`\n - Client socket reuse: `'client.bootstrap.socket.reuse:true'`\n - Enable TCP no delay: `'client.bootstrap.nodelay:true'`\n - Enable client keep alive: `'client.bootstrap.keepalive:true'`\n - Send buffer size: `'client.bootstrap.sendbuffersize:1048576'`\n - Receive buffer size: `'client.bootstrap.recievebuffersize:1048576'`", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR), @Parameter(name = HttpConstants.MAX_ACTIVE_CONNECTIONS_PER_POOL, description = "Maximum possible number of active connection per client pool.", type = {DataType.INT}, optional = true, defaultValue = "-1"), @Parameter(name = HttpConstants.MIN_IDLE_CONNECTIONS_PER_POOL, description = "Minimum number of idle connections that can exist per client pool.", type = {DataType.INT}, optional = true, defaultValue = HttpConstants.DEFAULT_MIN_IDLE_CONNECTIONS_PER_POOL), @Parameter(name = HttpConstants.MAX_IDLE_CONNECTIONS_PER_POOL, description = "Maximum number of idle connections that can exist per client pool.", type = {DataType.INT}, optional = true, defaultValue = HttpConstants.DEFAULT_MAX_IDLE_CONNECTIONS_PER_POOL), @Parameter(name = HttpConstants.MIN_EVICTABLE_IDLE_TIME, description = "Minimum time (in millis) a connection may sit idle in the client pool before it become eligible for eviction.", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.DEFAULT_MIN_EVICTABLE_IDLE_TIME), @Parameter(name = HttpConstants.TIME_BETWEEN_EVICTION_RUNS, description = "Time between two eviction operations (in millis) on the client pool.", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.DEFAULT_TIME_BETWEEN_EVICTION_RUNS), @Parameter(name = HttpConstants.MAX_WAIT_TIME, description = "The maximum time (in millis) the pool will wait (when there are no available connections) for a connection to be returned to the pool.", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.DEFAULT_MAX_WAIT_TIME), @Parameter(name = HttpConstants.TEST_ON_BORROW, description = "Enable connections to be validated before being borrowed from the client pool.", type = {DataType.BOOL}, optional = true, defaultValue = "true"), @Parameter(name = HttpConstants.TEST_WHILE_IDLE, description = "Enable connections to be validated during the eviction operation (if any).", type = {DataType.BOOL}, optional = true, defaultValue = "true"), @Parameter(name = HttpConstants.EXHAUSTED_ACTION, description = "Action that should be taken when the maximum number of active connections are being used. This action should be indicated as an int and possible action values are following.\n0 - Fail the request.\n1 - Block the request, until a connection returns to the pool.\n2 - Grow the connection pool size.", type = {DataType.INT}, optional = true, defaultValue = "1 (Block when exhausted)"), @Parameter(name = HttpConstants.HOSTNAME_VERIFICATION_ENABLED, description = "Enable hostname verification", type = {DataType.BOOL}, optional = true, defaultValue = "true")}, examples = {@Example(syntax = "@sink(type='http-call', sink.id='foo',\n      publisher.url='http://localhost:8009/foo',\n      @map(type='xml', @payload('{{payloadBody}}')))\ndefine stream FooStream (payloadBody string);\n\n@source(type='http-call-response', sink.id='foo',\n        @map(type='text', regex.A='((.|\\n)*)',\n             @attributes(headers='trp:headers', message='A[1]')))\ndefine stream ResponseStream(message string, headers string);", description = "When events arrive in `FooStream`, http-call sink makes calls to endpoint on url `http://localhost:8009/foo` with `POST` method and Content-Type `application/xml`.\nIf the event `payloadBody` attribute contains following XML:\n```<item>\n    <name>apple</name>\n    <price>55</price>\n    <quantity>5</quantity>\n</item>```the http-call sink maps that and sends it to the endpoint.\nWhen endpoint sends a response it will be consumed by the corresponding http-call-response source correlated via the same `sink.id` `foo` and that will map the response message and send it via `ResponseStream` steam by assigning the message body as `message` attribute and response headers as `headers` attribute of the event."), @Example(syntax = "@sink(type='http-call', publisher.url='http://localhost:8005/files/{{name}}'\n      downloading.enabled='true', download.path='{{downloadPath}}{{name}}',\n      method='GET', sink.id='download', @map(type='json'))\ndefine stream DownloadRequestStream(name String, id int, downloadPath string);\n\n@source(type='http-call-response', sink.id='download',\n        http.status.code='2\\\\d+',\n        @map(type='text', regex.A='((.|\\n)*)',\n             @attributes(name='trp:name', id='trp:id', file='A[1]')))\ndefine stream ResponseStream2xx(name string, id string, file string);\n\n@source(type='http-call-response', sink.id='download',\n        http.status.code='4\\\\d+',\n        @map(type='text', regex.A='((.|\\n)*)', @attributes(errorMsg='A[1]')))\ndefine stream ResponseStream4xx(errorMsg string);", description = "When events arrive in `DownloadRequestStream` with `name`:`foo.txt`, `id`:`75` and `downloadPath`:`/user/download/` the http-call sink sends a GET request to the url `http://localhost:8005/files/foo.txt` to download the file to the given path `/user/download/foo.txt` and capture the response via its corresponding http-call-response source based on the response status code.\nIf the response status code is in the range of 200 the message will be received by the http-call-response source associated with the `ResponseStream2xx` stream which expects `http.status.code` with regex `2\\\\d+` while downloading the file to the local file system on the path `/user/download/foo.txt` and mapping the response message having the absolute file path to event's `file` attribute.\nIf the response status code is in the range of 400 then the message will be received by the http-call-response source associated with the `ResponseStream4xx` stream which expects `http.status.code` with regex `4\\\\d+` while mapping the error response to the `errorMsg` attribute of the event.")}, systemParameter = {@SystemParameter(name = HttpConstants.CLIENT_BOOTSTRAP_CLIENT_GROUP_SIZE, description = "Number of client threads to perform non-blocking read and write to one or more channels.", defaultValue = "(Number of available processors) * 2", possibleParameters = {"Any positive integer"}), @SystemParameter(name = HttpConstants.CLIENT_BOOTSTRAP_BOSS_GROUP_SIZE, description = "Number of boss threads to accept incoming connections.", defaultValue = "Number of available processors", possibleParameters = {"Any positive integer"}), @SystemParameter(name = HttpConstants.CLIENT_BOOTSTRAP_WORKER_GROUP_SIZE, description = "Number of worker threads to accept the connections from boss threads and perform non-blocking read and write from one or more channels.", defaultValue = "(Number of available processors) * 2", possibleParameters = {"Any positive integer"}), @SystemParameter(name = HttpConstants.CLIENT_TRUSTSTORE_PATH, description = "The default truststore file path.", defaultValue = "`${carbon.home}/resources/security/client-truststore.jks`", possibleParameters = {"Path to client truststore `.jks` file"}), @SystemParameter(name = "trustStorePassword", description = "The default truststore password.", defaultValue = "wso2carbon", possibleParameters = {"Truststore password as string"})})
/* loaded from: input_file:io/siddhi/extension/io/http/sink/HttpCallSink.class */
public class HttpCallSink extends HttpSink {
    private static final Logger log = Logger.getLogger(HttpCallSink.class);
    private String sinkId;
    private boolean isDownloadEnabled;
    private Option downloadPath;
    private boolean isBlockingIO;
    private StreamDefinition outputStreamDefinition;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.siddhi.extension.io.http.sink.HttpSink
    public StateFactory init(StreamDefinition streamDefinition, OptionHolder optionHolder, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        StateFactory init = super.init(streamDefinition, optionHolder, configReader, siddhiAppContext);
        this.outputStreamDefinition = streamDefinition;
        this.sinkId = optionHolder.validateAndGetStaticValue(HttpConstants.SINK_ID);
        this.isDownloadEnabled = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(HttpConstants.DOWNLOAD_ENABLED, "false"));
        if (this.isDownloadEnabled) {
            this.downloadPath = optionHolder.validateAndGetOption(HttpConstants.DOWNLOAD_PATH);
        }
        this.isBlockingIO = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(HttpConstants.BLOCKING_IO, "false"));
        return init;
    }

    @Override // io.siddhi.extension.io.http.sink.HttpSink
    protected int sendRequest(Object obj, DynamicOptions dynamicOptions, List<Header> list, ClientConnector clientConnector) throws ConnectionUnavailableException {
        if (!this.publisherURLOption.isStatic()) {
            super.createClientConnector(dynamicOptions);
        }
        if (this.mapType == null) {
            this.mapType = getMapper().getType();
        }
        String value = "".equals(this.httpMethodOption.getValue(dynamicOptions)) ? "POST" : this.httpMethodOption.getValue(dynamicOptions);
        String contentType = HttpSinkUtil.getContentType(this.mapType, list);
        String messageBody = getMessageBody(obj);
        HttpCarbonMessage generateCarbonMessage = generateCarbonMessage(list, contentType, value, new HttpCarbonMessage(new DefaultHttpRequest(HttpVersion.HTTP_1_1, new HttpMethod(value), "")), clientConnector.getHttpURLProperties());
        if (!Constants.HTTP_GET_METHOD.equals(value)) {
            generateCarbonMessage.addHttpContent(new DefaultLastHttpContent(Unpooled.wrappedBuffer(messageBody.getBytes(Charset.defaultCharset()))));
        }
        generateCarbonMessage.completeMessage();
        HttpResponseFuture send = clientConnector.send(generateCarbonMessage);
        CountDownLatch countDownLatch = null;
        if (this.isBlockingIO || HttpConstants.OAUTH.equals(this.authType)) {
            countDownLatch = new CountDownLatch(1);
        }
        HttpResponseMessageListener httpResponseMessageListener = new HttpResponseMessageListener(this, getTrpProperties(dynamicOptions), this.sinkId, this.isDownloadEnabled, countDownLatch, obj, dynamicOptions, this.siddhiAppContext.getName(), clientConnector.getPublisherURL());
        send.setHttpConnectorListener(httpResponseMessageListener);
        if (countDownLatch == null) {
            return 200;
        }
        try {
            if (!countDownLatch.await(30L, TimeUnit.SECONDS)) {
                log.debug("Timeout due to getting response from " + clientConnector.getPublisherURL() + ". Message dropped.");
                throw new ConnectionUnavailableException("Time out due to getting response from " + clientConnector.getPublisherURL() + ". Message dropped.");
            }
            if (this.isBlockingIO) {
                return 200;
            }
            return httpResponseMessageListener.getHttpResponseStatusCode();
        } catch (InterruptedException e) {
            log.debug("Failed to get a response from " + clientConnector.getPublisherURL() + Constants.COMMA + e + ". Message dropped.");
            throw new ConnectionUnavailableException("Failed to get a response from " + clientConnector.getPublisherURL() + ", " + e + ". Message dropped.");
        }
    }

    @Override // io.siddhi.extension.io.http.sink.HttpSink
    public String[] getSupportedDynamicOptions() {
        return new String[]{HttpConstants.HEADERS, HttpConstants.METHOD, HttpConstants.PUBLISHER_URL, HttpConstants.DOWNLOAD_PATH, HttpConstants.PUBLISHER_URL, HttpConstants.RECEIVER_REFRESH_TOKEN};
    }

    private Map<String, Object> getTrpProperties(DynamicOptions dynamicOptions) {
        Object[] data = dynamicOptions.getEvent().getData();
        List attributeList = this.outputStreamDefinition.getAttributeList();
        HashMap hashMap = new HashMap();
        for (int i = 0; i < attributeList.size(); i++) {
            hashMap.put(((Attribute) attributeList.get(i)).getName(), data[i]);
        }
        if (this.isDownloadEnabled) {
            hashMap.put(HttpConstants.DOWNLOAD_PATH, this.downloadPath.getValue(dynamicOptions));
        }
        return hashMap;
    }
}
