/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.http.sink;

import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.SystemParameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.Option;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.http.metrics.SinkMetrics;
import io.siddhi.extension.io.http.sink.SSESyncConnectorRegistry;
import io.siddhi.extension.io.http.sink.util.HttpSinkUtil;
import io.siddhi.extension.io.http.source.exception.HttpSourceAdaptorRuntimeException;
import io.siddhi.extension.io.http.source.util.HttpSourceUtil;
import io.siddhi.extension.io.http.util.HTTPSinkRegistry;
import io.siddhi.extension.io.http.util.HttpConstants;
import io.siddhi.extension.io.http.util.HttpIoUtil;
import io.siddhi.query.api.annotation.Annotation;
import io.siddhi.query.api.annotation.Element;
import io.siddhi.query.api.definition.StreamDefinition;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.wso2.carbon.messaging.Header;
import org.wso2.carbon.si.metrics.core.internal.MetricsDataHolder;
import org.wso2.transport.http.netty.contract.HttpConnectorListener;
import org.wso2.transport.http.netty.contract.config.ChunkConfig;
import org.wso2.transport.http.netty.contract.config.KeepAliveConfig;
import org.wso2.transport.http.netty.contract.config.ListenerConfiguration;
import org.wso2.transport.http.netty.contract.exceptions.ServerConnectorException;
import org.wso2.transport.http.netty.message.HttpCarbonMessage;

@Extension(name="sse-server", namespace="sink", description="HTTP SSE sink sends events to all subscribers.", parameters={@Parameter(name="server.url", description="The listening URL of the SSE server which clients need to connect to receive events. If not provided url will be constructed using siddhi app name and stream name as the context by default with port 8280. eg :- http://0.0.0.0:8280/{app_name}/{stream_name}", type={DataType.STRING}), @Parameter(name="worker.count", description="The number of active worker threads to serve the incoming events. By default the value is set to `1` to ensure events are processed in the same order they arrived. By increasing this value, higher performance can be achieved in the expense of loosing event ordering.", type={DataType.INT}, optional=true, defaultValue="1"), @Parameter(name="headers", description="HTTP request headers in format `\"'<key>:<value>','<key>:<value>'\"`.\nWhen the `Content-Type` header is not provided the system decides the Content-Type based on the provided sink mapper as following: \n - `@map(type='xml')`: `application/xml`\n - `@map(type='json')`: `application/json`\n - `@map(type='text')`: `plain/text`\n - `@map(type='keyvalue')`: `application/x-www-form-urlencoded`\n - For all other cases system defaults to `plain/text`\nAlso the `Content-Length` header need not to be provided, as the system automatically defines it by calculating the size of the payload.", type={DataType.STRING}, optional=true, defaultValue="Content-Type and Content-Length headers"), @Parameter(name="https.truststore.file", description="The file path of the client truststore when sending messages through `https` protocol.", type={DataType.STRING}, optional=true, defaultValue="`${carbon.home}/resources/security/client-truststore.jks`"), @Parameter(name="https.truststore.password", description="The password for the client-truststore.", type={DataType.STRING}, optional=true, defaultValue="wso2carbon"), @Parameter(name="client.bootstrap.configurations", description="Client bootstrap configurations in format `\"'<key>:<value>','<key>:<value>'\"`.\nSome supported configurations :\n - Client connect timeout in millis: `'client.bootstrap.connect.timeout:15000'`\n - Client socket timeout in seconds: `'client.bootstrap.socket.timeout:15'`\n - Client socket reuse: `'client.bootstrap.socket.reuse:true'`\n - Enable TCP no delay: `'client.bootstrap.nodelay:true'`\n - Enable client keep alive: `'client.bootstrap.keepalive:true'`\n - Send buffer size: `'client.bootstrap.sendbuffersize:1048576'`\n - Receive buffer size: `'client.bootstrap.recievebuffersize:1048576'`", type={DataType.STRING}, optional=true, defaultValue="-")}, systemParameter={@SystemParameter(name="defaultScheme", description="The default protocol.", defaultValue="http", possibleParameters={"http", "https"}), @SystemParameter(name="defaultHttpPort", description="The default HTTP port when default scheme is `http`.", defaultValue="8280", possibleParameters={"Any valid port"}), @SystemParameter(name="defaultHost", description="The default host of the transport.", defaultValue="0.0.0.0", possibleParameters={"Any valid host"})}, examples={@Example(syntax="@Source(type='sse-server', server.url='http://localhost:8080/sse', @map(type='json')) define stream PublishingStream (param1 string);", description="External clients can listen to the server.url")})
public class SSEServerSink
extends Sink {
    private static final Logger logger = LogManager.getLogger(SSEServerSink.class);
    private String siddhiAppName;
    private String streamId;
    private String listenerUrl;
    private int workerThread;
    private boolean isAuth;
    private boolean isSecured;
    private SinkMetrics metrics;
    private String[] requestedTransportPropertyNames;
    private ListenerConfiguration listenerConfiguration;
    private ServiceDeploymentInfo serviceDeploymentInfo;
    private SSESyncConnectorRegistry httpConnectorRegistry;
    private Option httpHeaderOption;
    private String mapType;
    private final List<HttpCarbonMessage> requestContainerList = new ArrayList<HttpCarbonMessage>();

    public Class[] getSupportedInputEventClasses() {
        return new Class[]{String.class, Map.class};
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public String[] getSupportedDynamicOptions() {
        return new String[0];
    }

    protected StateFactory init(StreamDefinition streamDefinition, OptionHolder optionHolder, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        String[] requestedTransportPropertyNames = new String[10];
        this.initSource(streamDefinition, optionHolder, requestedTransportPropertyNames, configReader, siddhiAppContext);
        this.initConnectorRegistry(optionHolder, configReader);
        this.initMetrics(this.siddhiAppName, streamDefinition.getId());
        return null;
    }

    private void initSource(StreamDefinition streamDefinition, OptionHolder optionHolder, String[] requestedTransportPropertyNames, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        String defaultURL;
        int port;
        this.siddhiAppName = siddhiAppContext.getName();
        this.streamId = streamDefinition.getId();
        String scheme = configReader.readConfig("defaultScheme", "http");
        if ("https".equals(scheme)) {
            port = Integer.parseInt(configReader.readConfig("defaultHttpsPort", "8243"));
            defaultURL = "https://" + configReader.readConfig("defaultHost", "0.0.0.0") + ":" + port + "/" + siddhiAppContext.getName() + "/" + this.streamId;
        } else {
            port = Integer.parseInt(configReader.readConfig("defaultHttpPort", "8280"));
            defaultURL = "http://" + configReader.readConfig("defaultHost", "0.0.0.0") + ":" + port + "/" + siddhiAppContext.getName() + "/" + this.streamId;
        }
        this.listenerUrl = optionHolder.validateAndGetStaticValue("server.url", defaultURL);
        try {
            if (new URL(this.listenerUrl).getPath().replace("/", "").trim().isEmpty()) {
                throw new SiddhiAppCreationException("Please provide a valid `server.url` with a context for SSE server with the stream " + this.streamId + " in Siddhi app " + siddhiAppContext.getName());
            }
        }
        catch (MalformedURLException e) {
            throw new SiddhiAppCreationException("Please provide a valid `server.url` for SSE server with the stream " + this.streamId + " in Siddhi app " + siddhiAppContext.getName());
        }
        this.isAuth = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue("basic.auth.enabled", "false").toLowerCase(Locale.ENGLISH));
        this.workerThread = Integer.parseInt(optionHolder.validateAndGetStaticValue("worker.count", "1"));
        this.requestedTransportPropertyNames = (String[])requestedTransportPropertyNames.clone();
        this.httpHeaderOption = optionHolder.getOrCreateOption("headers", HttpConstants.DEFAULT_HEADER);
        this.mapType = ((Element)((Annotation)((Annotation)streamDefinition.getAnnotations().get(0)).getAnnotations().get(0)).getElements().get(0)).getValue();
        String sslConfigs = optionHolder.validateAndGetStaticValue("ssl.configurations", "");
        if (sslConfigs.equalsIgnoreCase("")) {
            sslConfigs = optionHolder.validateAndGetStaticValue("parameters", "");
        }
        this.listenerConfiguration = HttpSourceUtil.getListenerConfiguration(this.listenerUrl, configReader);
        this.listenerConfiguration.setSocketIdleTimeout(-1);
        this.isSecured = this.listenerConfiguration.getScheme().equalsIgnoreCase("https");
        port = this.listenerConfiguration.getPort();
        this.listenerConfiguration.setParameters(HttpIoUtil.populateParameters(sslConfigs));
        this.serviceDeploymentInfo = new ServiceDeploymentInfo(port, this.isSecured);
        this.siddhiAppName = siddhiAppContext.getName();
    }

    private void initConnectorRegistry(OptionHolder optionHolder, ConfigReader configReader) {
        String requestSizeValidationConfigList = optionHolder.validateAndGetStaticValue("request.size.validation.configurations", "");
        String serverBootstrapPropertiesList = optionHolder.validateAndGetStaticValue("server.bootstrap.configurations", "");
        this.httpConnectorRegistry = SSESyncConnectorRegistry.getInstance();
        this.httpConnectorRegistry.initBootstrapConfigIfFirst(configReader);
        this.httpConnectorRegistry.setTransportConfig(serverBootstrapPropertiesList, requestSizeValidationConfigList);
    }

    public boolean matches(String thatSinkId) {
        return Objects.equals(this.streamId, thatSinkId);
    }

    public void registerCallback(HttpCarbonMessage carbonMessage) {
        this.requestContainerList.add(carbonMessage);
    }

    private void removeCallback(HttpCarbonMessage carbonMessage) {
        this.requestContainerList.remove(carbonMessage);
    }

    public void handleCallback(String payload, List<Header> headersList, String contentType) {
        if (!this.requestContainerList.isEmpty()) {
            this.requestContainerList.forEach(carbonMessage -> {
                if (carbonMessage != null) {
                    this.handleResponse((HttpCarbonMessage)carbonMessage, 200, payload, headersList, contentType);
                }
            });
        } else {
            logger.warn("No subscription found" + this.streamId);
        }
    }

    private void handleResponse(final HttpCarbonMessage requestMsg, HttpCarbonMessage responseMsg) {
        try {
            requestMsg.respond(responseMsg).setHttpConnectorListener(new HttpConnectorListener(){

                @Override
                public void onMessage(HttpCarbonMessage httpMessage) {
                }

                @Override
                public void onError(Throwable throwable) {
                    SSEServerSink.this.removeCallback(requestMsg);
                }
            });
        }
        catch (ServerConnectorException e) {
            if (this.metrics != null) {
                this.metrics.getTotalHttpErrorsMetric(requestMsg.getRequestUrl()).inc();
            }
            throw new HttpSourceAdaptorRuntimeException("Error occurred during response", e);
        }
    }

    private void handleResponse(HttpCarbonMessage requestMessage, Integer code, String payload, List<Header> headers, String contentType) {
        int statusCode = code == null ? 500 : code;
        String responsePayload = payload != null ? payload : "";
        String publisherUrl = requestMessage.getRequestUrl();
        if (this.metrics != null) {
            this.metrics.getTotalWritesMetric().inc();
            this.metrics.getTotalHttpWritesMetric(publisherUrl).inc();
            this.metrics.getRequestSizeMetric(publisherUrl).inc(HttpSinkUtil.getByteSize(payload));
            this.metrics.setLastEventTime(publisherUrl, System.currentTimeMillis());
        }
        this.handleResponse(requestMessage, this.createResponseMessage(responsePayload, statusCode, headers, contentType));
    }

    private HttpCarbonMessage createResponseMessage(String payload, int statusCode, List<Header> headers, String contentType) {
        HttpCarbonMessage response = new HttpCarbonMessage(new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK));
        response.addHttpContent(new DefaultHttpContent(Unpooled.wrappedBuffer(payload.getBytes(Charset.defaultCharset()))));
        HttpHeaders httpHeaders = response.getHeaders();
        response.setProperty("http.status.code", statusCode);
        response.setProperty("DIRECTION", "DIRECTION_RESPONSE");
        response.setStreaming(true);
        if (contentType != null) {
            httpHeaders.set("Content-Type", (Object)contentType);
        }
        if (headers != null) {
            for (Header header : headers) {
                httpHeaders.set(header.getName(), (Object)header.getValue());
            }
        }
        return response;
    }

    private void initMetrics(String appName, String streamName) {
        if (MetricsDataHolder.getInstance().getMetricService() != null && MetricsDataHolder.getInstance().getMetricManagementService().isEnabled()) {
            try {
                if (MetricsDataHolder.getInstance().getMetricManagementService().isReporterRunning("prometheus")) {
                    this.metrics = new SinkMetrics(appName, streamName);
                }
            }
            catch (IllegalArgumentException e) {
                logger.debug("Prometheus reporter is not running. Hence sse sink metrics will not be initialized for " + appName);
            }
        }
    }

    public void publish(Object payload, DynamicOptions dynamicOptions, State state) throws ConnectionUnavailableException {
        String headers = this.httpHeaderOption.getValue(dynamicOptions);
        List<Header> headersList = HttpSinkUtil.getHeaders(headers);
        String contentType = HttpSinkUtil.getContentType(this.mapType, headersList);
        HTTPSinkRegistry.findAndGetSSESource(this.streamId).handleCallback((String)payload, headersList, contentType);
    }

    public void connect() throws ConnectionUnavailableException {
        this.listenerConfiguration.setChunkConfig(ChunkConfig.ALWAYS);
        this.listenerConfiguration.setKeepAliveConfig(KeepAliveConfig.ALWAYS);
        this.httpConnectorRegistry.createHttpServerConnector(this.listenerConfiguration);
        this.httpConnectorRegistry.registerSourceListener(this.listenerUrl, this.workerThread, (Boolean)this.isAuth, this.streamId, this.siddhiAppName);
        HTTPSinkRegistry.registerSSESink(this.streamId, this);
    }

    public void disconnect() {
        this.httpConnectorRegistry.unregisterSourceListener(this.listenerUrl, this.siddhiAppName);
        this.httpConnectorRegistry.unregisterServerConnector(this.listenerUrl);
        HTTPSinkRegistry.removeSSESink(this.streamId);
        if (!this.requestContainerList.isEmpty()) {
            this.requestContainerList.forEach(carbonMessage -> {
                if (carbonMessage != null) {
                    this.handleResponse((HttpCarbonMessage)carbonMessage, 200, null, null, null);
                }
            });
            this.requestContainerList.clear();
        }
    }

    public void destroy() {
        this.httpConnectorRegistry.clearBootstrapConfigIfLast();
        HTTPSinkRegistry.removeSSESink(this.streamId);
    }
}

