/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.http.source;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.http.source.HttpCallResponseConnectorListener;
import io.siddhi.extension.io.http.source.HttpCallResponseSourceConnectorRegistry;
import io.siddhi.extension.io.http.util.HTTPSourceRegistry;

@Extension(name="http-call-response", namespace="source", description="The http-call-response source receives the responses for the calls made by its corresponding http-call sink, and maps them from formats such as `text`, `XML` and `JSON`.\nTo handle messages with different http status codes having different formats, multiple http-call-response sources are allowed to associate with a single http-call sink.\nIt allows accessing the attributes of the event that initiated the call, and the response headers and properties via transport properties in the format `trp:<attribute name>` and `trp:<header/property>` respectively.", parameters={@Parameter(name="sink.id", description="Identifier to correlate the http-call-response source with its corresponding http-call sink that published the messages.", type={DataType.STRING}), @Parameter(name="http.status.code", description="The matching http responses status code regex, that is used to filter the the messages which will be processed by the source.Eg: `http.status.code = '200'`,\n`http.status.code = '4\\\\d+'`", type={DataType.STRING}, optional=true, defaultValue="200"), @Parameter(name="allow.streaming.responses", description="Enable consuming responses on a streaming manner.", type={DataType.BOOL}, optional=true, defaultValue="false")}, examples={@Example(syntax="@sink(type='http-call', method='POST',\n      publisher.url='http://localhost:8005/registry/employee',\n      sink.id='employee-info', @map(type='json')) \ndefine stream EmployeeRequestStream (name string, id int);\n\n@source(type='http-call-response', sink.id='employee-info',\n        http.status.code='2\\\\d+',\n        @map(type='json',\n             @attributes(name='trp:name', id='trp:id',\n                         location='$.town', age='$.age')))\ndefine stream EmployeeResponseStream(name string, id int,\n                                     location string, age int);\n\n@source(type='http-call-response', sink.id='employee-info',\n        http.status.code='4\\\\d+',\n        @map(type='text', regex.A='((.|\\n)*)',\n             @attributes(error='A[1]')))\ndefine stream EmployeeErrorStream(error string);", description="When events arrive in `EmployeeRequestStream`, http-call sink makes calls to endpoint on url `http://localhost:8005/registry/employee` with `POST` method and Content-Type `application/json`.\nIf the arriving event has attributes `name`:`John` and `id`:`1423` it will send a message with default JSON mapping as follows:\n```{\n  \"event\": {\n    \"name\": \"John\",\n    \"id\": 1423\n  }\n}```When the endpoint responds with status code in the range of 200 the message will be received by the http-call-response source associated with the `EmployeeResponseStream` stream, because it is correlated with the sink by the same `sink.id` `employee-info` and as that expects messages with `http.status.code` in regex format `2\\\\d+`. If the response message is in the format\n```{\n  \"town\": \"NY\",\n  \"age\": 24\n}```the source maps the `location` and `age` attributes by executing JSON path on the message and maps the `name` and `id` attributes by extracting them from the request event via as transport properties.\nIf the response status code is in the range of 400 then the message will be received by the http-call-response source associated with the `EmployeeErrorStream` stream, because it is correlated with the sink by the same `sink.id` `employee-info` and it expects messages with `http.status.code` in regex format `4\\\\d+`, and maps the error response to the `error` attribute of the event.")})
public class HttpCallResponseSource
extends Source {
    private String sinkId;
    private SourceEventListener sourceEventListener;
    private String[] requestedTransportPropertyNames;
    private String siddhiAppName;
    private String workerThread;
    private HttpCallResponseConnectorListener httpCallResponseSourceListener;
    private HttpCallResponseSourceConnectorRegistry httpConnectorRegistry;
    private String httpStatusCode;
    private boolean shouldAllowStreamingResponses;

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public StateFactory init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] requestedTransportPropertyNames, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.sourceEventListener = sourceEventListener;
        this.requestedTransportPropertyNames = (String[])requestedTransportPropertyNames.clone();
        this.sinkId = optionHolder.validateAndGetStaticValue("sink.id");
        this.httpConnectorRegistry = HttpCallResponseSourceConnectorRegistry.getInstance();
        this.siddhiAppName = siddhiAppContext.getName();
        this.workerThread = optionHolder.validateAndGetStaticValue("worker.count", "1");
        this.httpStatusCode = optionHolder.validateAndGetStaticValue("http.status.code", "200");
        this.shouldAllowStreamingResponses = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue("allow.streaming.responses", "false"));
        return null;
    }

    public Class[] getOutputEventClasses() {
        return new Class[0];
    }

    public void connect(Source.ConnectionCallback connectionCallback, State state) throws ConnectionUnavailableException {
        this.httpCallResponseSourceListener = new HttpCallResponseConnectorListener(Integer.parseInt(this.workerThread), this.sourceEventListener, this.shouldAllowStreamingResponses, this.sinkId, this.requestedTransportPropertyNames, this.siddhiAppName, null);
        this.httpConnectorRegistry.registerSourceListener(this.httpCallResponseSourceListener, this.sinkId, this.httpStatusCode);
        HTTPSourceRegistry.registerCallResponseSource(this.sinkId, this.httpStatusCode, this);
    }

    public void disconnect() {
        this.httpConnectorRegistry.unregisterSourceListener(this.sinkId, this.httpStatusCode, this.siddhiAppName);
        HTTPSourceRegistry.removeCallResponseSource(this.sinkId, this.httpStatusCode);
    }

    public void destroy() {
    }

    public void pause() {
    }

    public void resume() {
    }

    public HttpCallResponseConnectorListener getConnectorListener() {
        return this.httpCallResponseSourceListener;
    }

    public boolean matches(String thatSinkId, String thatStatusCode) {
        return (this.sinkId != null ? this.sinkId.equals(thatSinkId) : thatSinkId == null) && (this.httpStatusCode != null ? thatStatusCode != null && thatStatusCode.matches(this.httpStatusCode) : thatStatusCode == null);
    }
}

