/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.http.sink;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.Option;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.http.sink.util.HttpSinkUtil;
import io.siddhi.extension.io.http.util.HTTPSourceRegistry;
import io.siddhi.extension.io.http.util.HttpConstants;
import io.siddhi.query.api.annotation.Annotation;
import io.siddhi.query.api.annotation.Element;
import io.siddhi.query.api.definition.StreamDefinition;
import java.util.List;
import org.apache.log4j.Logger;
import org.wso2.carbon.messaging.Header;

@Extension(name="http-service-response", namespace="sink", description="The http-service-response sink send responses of the requests consumed by its corresponding http-service source, by mapping the response messages to formats such as `text`, `XML` and `JSON`.", parameters={@Parameter(name="source.id", description="Identifier to correlate the http-service-response sink to its corresponding http-service source which consumed the request.", type={DataType.STRING}), @Parameter(name="message.id", description="Identifier to correlate the response with the request received by http-service source.", dynamic=true, type={DataType.STRING}), @Parameter(name="headers", description="HTTP request headers in format `\"'<key>:<value>','<key>:<value>'\"`.\nWhen the `Content-Type` header is not provided the system decides the Content-Type based on the provided sink mapper as following: \n - `@map(type='xml')`: `application/xml`\n - `@map(type='json')`: `application/json`\n - `@map(type='text')`: `plain/text`\n - `@map(type='keyvalue')`: `application/x-www-form-urlencoded`\n - For all other cases system defaults to `plain/text`\nAlso the `Content-Length` header need not to be provided, as the system automatically defines it by calculating the size of the payload.", type={DataType.STRING}, optional=true, defaultValue="Content-Type and Content-Length headers")}, examples={@Example(syntax="@source(type='http-service', receiver.url='http://localhost:5005/add',\n        source.id='adder',\n        @map(type='json, @attributes(messageId='trp:messageId',\n                                     value1='$.event.value1',\n                                     value2='$.event.value2')))\ndefine stream AddStream (messageId string, value1 long, value2 long);\n\n@sink(type='http-service-response', source.id='adder',\n      message.id='{{messageId}}', @map(type = 'json'))\ndefine stream ResultStream (messageId string, results long);\n\n@info(name = 'query1')\nfrom AddStream \nselect messageId, value1 + value2 as results \ninsert into ResultStream;", description="The http-service source on stream `AddStream` listens on url `http://localhost:5005/stocks` for JSON messages with format:\n```{\n  \"event\": {\n    \"value1\": 3,\n    \"value2\": 4\n  }\n}```\nand when events arrive it maps to `AddStream` events and pass them to query `query1` for processing. The query results produced on `ResultStream` are sent as a response via http-service-response sink with format:```{\n  \"event\": {\n    \"results\": 7\n  }\n}```Here the request and response are correlated by passing the `messageId` produced by the http-service to the respective http-service-response sink.")})
public class HttpServiceResponseSink
extends Sink {
    private static final Logger log = Logger.getLogger(HttpServiceResponseSink.class);
    private Option messageIdOption;
    private String sourceId;
    private Option httpHeaderOption;
    private String mapType;

    public Class[] getSupportedInputEventClasses() {
        return new Class[]{String.class};
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public String[] getSupportedDynamicOptions() {
        return new String[]{"headers", "message.id"};
    }

    protected StateFactory init(StreamDefinition outputStreamDefinition, OptionHolder optionHolder, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.messageIdOption = optionHolder.validateAndGetOption("message.id");
        this.sourceId = optionHolder.validateAndGetStaticValue("source.id");
        this.httpHeaderOption = optionHolder.getOrCreateOption("headers", HttpConstants.DEFAULT_HEADER);
        this.mapType = ((Element)((Annotation)((Annotation)outputStreamDefinition.getAnnotations().get(0)).getAnnotations().get(0)).getElements().get(0)).getValue();
        return null;
    }

    public void publish(Object payload, DynamicOptions dynamicOptions, State state) throws ConnectionUnavailableException {
        String headers = this.httpHeaderOption.getValue(dynamicOptions);
        List<Header> headersList = HttpSinkUtil.getHeaders(headers);
        String messageId = this.messageIdOption.getValue(dynamicOptions);
        String contentType = HttpSinkUtil.getContentType(this.mapType, headersList);
        HTTPSourceRegistry.getServiceSource(this.sourceId).handleCallback(messageId, (String)payload, headersList, contentType);
    }

    public void connect() throws ConnectionUnavailableException {
    }

    public void disconnect() {
    }

    public void destroy() {
    }
}

