package io.siddhi.extension.io.http.sink;

import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.SystemParameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.Option;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.http.metrics.SinkMetrics;
import io.siddhi.extension.io.http.sink.util.HttpSinkUtil;
import io.siddhi.extension.io.http.source.exception.HttpSourceAdaptorRuntimeException;
import io.siddhi.extension.io.http.source.util.HttpSourceUtil;
import io.siddhi.extension.io.http.util.HTTPSinkRegistry;
import io.siddhi.extension.io.http.util.HttpConstants;
import io.siddhi.extension.io.http.util.HttpIoUtil;
import io.siddhi.query.api.annotation.Annotation;
import io.siddhi.query.api.annotation.Element;
import io.siddhi.query.api.definition.StreamDefinition;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import org.apache.log4j.Logger;
import org.wso2.carbon.messaging.Header;
import org.wso2.carbon.si.metrics.core.internal.MetricsDataHolder;
import org.wso2.transport.http.netty.contract.Constants;
import org.wso2.transport.http.netty.contract.HttpConnectorListener;
import org.wso2.transport.http.netty.contract.config.ChunkConfig;
import org.wso2.transport.http.netty.contract.config.KeepAliveConfig;
import org.wso2.transport.http.netty.contract.config.ListenerConfiguration;
import org.wso2.transport.http.netty.contract.exceptions.ServerConnectorException;
import org.wso2.transport.http.netty.message.HttpCarbonMessage;

@Extension(name = "sse-server", namespace = "sink", description = "HTTP SSE sink sends events to all subscribers.", parameters = {@Parameter(name = HttpConstants.SERVER_URL, description = "The listening URL of the SSE server which clients need to connect to receive events. If not provided url will be constructed using siddhi app name and stream name as the context by default with port 8280. eg :- http://0.0.0.0:8280/{app_name}/{stream_name}", type = {DataType.STRING}), @Parameter(name = HttpConstants.WORKER_COUNT, description = "The number of active worker threads to serve the incoming events. By default the value is set to `1` to ensure events are processed in the same order they arrived. By increasing this value, higher performance can be achieved in the expense of loosing event ordering.", type = {DataType.INT}, optional = true, defaultValue = "1"), @Parameter(name = HttpConstants.HEADERS, description = "HTTP request headers in format `\"'<key>:<value>','<key>:<value>'\"`.\nWhen the `Content-Type` header is not provided the system decides the Content-Type based on the provided sink mapper as following: \n - `@map(type='xml')`: `application/xml`\n - `@map(type='json')`: `application/json`\n - `@map(type='text')`: `plain/text`\n - `@map(type='keyvalue')`: `application/x-www-form-urlencoded`\n - For all other cases system defaults to `plain/text`\nAlso the `Content-Length` header need not to be provided, as the system automatically defines it by calculating the size of the payload.", type = {DataType.STRING}, optional = true, defaultValue = "Content-Type and Content-Length headers"), @Parameter(name = HttpConstants.CLIENT_TRUSTSTORE_PATH_PARAM, description = "The file path of the client truststore when sending messages through `https` protocol.", type = {DataType.STRING}, optional = true, defaultValue = "`${carbon.home}/resources/security/client-truststore.jks`"), @Parameter(name = HttpConstants.CLIENT_TRUSTSTORE_PASSWORD_PARAM, description = "The password for the client-truststore.", type = {DataType.STRING}, optional = true, defaultValue = "wso2carbon"), @Parameter(name = HttpConstants.CLIENT_BOOTSTRAP_CONFIGURATION, description = "Client bootstrap configurations in format `\"'<key>:<value>','<key>:<value>'\"`.\nSome supported configurations :\n - Client connect timeout in millis: `'client.bootstrap.connect.timeout:15000'`\n - Client socket timeout in seconds: `'client.bootstrap.socket.timeout:15'`\n - Client socket reuse: `'client.bootstrap.socket.reuse:true'`\n - Enable TCP no delay: `'client.bootstrap.nodelay:true'`\n - Enable client keep alive: `'client.bootstrap.keepalive:true'`\n - Send buffer size: `'client.bootstrap.sendbuffersize:1048576'`\n - Receive buffer size: `'client.bootstrap.recievebuffersize:1048576'`", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.PORT_CONTEXT_KEY_SEPARATOR)}, systemParameter = {@SystemParameter(name = HttpConstants.DEFAULT_SOURCE_SCHEME, description = "The default protocol.", defaultValue = "http", possibleParameters = {"http", "https"}), @SystemParameter(name = HttpConstants.HTTP_PORT, description = "The default HTTP port when default scheme is `http`.", defaultValue = HttpConstants.HTTP_PORT_VALUE, possibleParameters = {"Any valid port"}), @SystemParameter(name = HttpConstants.DEFAULT_HOST, description = "The default host of the transport.", defaultValue = "0.0.0.0", possibleParameters = {"Any valid host"})}, examples = {@Example(syntax = "@Source(type='sse-server', server.url='http://localhost:8080/sse', @map(type='json')) define stream PublishingStream (param1 string);", description = "External clients can listen to the server.url")})
/* loaded from: input_file:io/siddhi/extension/io/http/sink/SSEServerSink.class */
public class SSEServerSink extends Sink {
    private static final Logger logger = Logger.getLogger(SSEServerSink.class);
    private String siddhiAppName;
    private String streamId;
    private String listenerUrl;
    private int workerThread;
    private boolean isAuth;
    private boolean isSecured;
    private SinkMetrics metrics;
    private String[] requestedTransportPropertyNames;
    private ListenerConfiguration listenerConfiguration;
    private ServiceDeploymentInfo serviceDeploymentInfo;
    private SSESyncConnectorRegistry httpConnectorRegistry;
    private Option httpHeaderOption;
    private String mapType;
    private final List<HttpCarbonMessage> requestContainerList = new ArrayList();

    public Class[] getSupportedInputEventClasses() {
        return new Class[]{String.class, Map.class};
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public String[] getSupportedDynamicOptions() {
        return new String[0];
    }

    protected StateFactory init(StreamDefinition streamDefinition, OptionHolder optionHolder, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        initSource(streamDefinition, optionHolder, new String[10], configReader, siddhiAppContext);
        initConnectorRegistry(optionHolder, configReader);
        initMetrics(this.siddhiAppName, streamDefinition.getId());
        return null;
    }

    private void initSource(StreamDefinition streamDefinition, OptionHolder optionHolder, String[] strArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        String str;
        this.siddhiAppName = siddhiAppContext.getName();
        this.streamId = streamDefinition.getId();
        if ("https".equals(configReader.readConfig(HttpConstants.DEFAULT_SOURCE_SCHEME, "http"))) {
            str = "https://" + configReader.readConfig(HttpConstants.DEFAULT_HOST, "0.0.0.0") + ":" + Integer.parseInt(configReader.readConfig(HttpConstants.HTTPS_PORT, HttpConstants.HTTPS_PORT_VALUE)) + "/" + siddhiAppContext.getName() + "/" + this.streamId;
        } else {
            str = "http://" + configReader.readConfig(HttpConstants.DEFAULT_HOST, "0.0.0.0") + ":" + Integer.parseInt(configReader.readConfig(HttpConstants.HTTP_PORT, HttpConstants.HTTP_PORT_VALUE)) + "/" + siddhiAppContext.getName() + "/" + this.streamId;
        }
        this.listenerUrl = optionHolder.validateAndGetStaticValue(HttpConstants.SERVER_URL, str);
        try {
            if (new URL(this.listenerUrl).getPath().replace("/", "").trim().isEmpty()) {
                throw new SiddhiAppCreationException("Please provide a valid `server.url` with a context for SSE server with the stream " + this.streamId + " in Siddhi app " + siddhiAppContext.getName());
            }
            this.isAuth = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(HttpConstants.IS_AUTH, "false").toLowerCase(Locale.ENGLISH));
            this.workerThread = Integer.parseInt(optionHolder.validateAndGetStaticValue(HttpConstants.WORKER_COUNT, "1"));
            this.requestedTransportPropertyNames = (String[]) strArr.clone();
            this.httpHeaderOption = optionHolder.getOrCreateOption(HttpConstants.HEADERS, HttpConstants.DEFAULT_HEADER);
            this.mapType = ((Element) ((Annotation) ((Annotation) streamDefinition.getAnnotations().get(0)).getAnnotations().get(0)).getElements().get(0)).getValue();
            String validateAndGetStaticValue = optionHolder.validateAndGetStaticValue(HttpConstants.SSS_CONFIGS, "");
            if (validateAndGetStaticValue.equalsIgnoreCase("")) {
                validateAndGetStaticValue = optionHolder.validateAndGetStaticValue("parameters", "");
            }
            this.listenerConfiguration = HttpSourceUtil.getListenerConfiguration(this.listenerUrl, configReader);
            this.listenerConfiguration.setSocketIdleTimeout(-1);
            this.isSecured = this.listenerConfiguration.getScheme().equalsIgnoreCase("https");
            int port = this.listenerConfiguration.getPort();
            this.listenerConfiguration.setParameters(HttpIoUtil.populateParameters(validateAndGetStaticValue));
            this.serviceDeploymentInfo = new ServiceDeploymentInfo(port, this.isSecured);
            this.siddhiAppName = siddhiAppContext.getName();
        } catch (MalformedURLException e) {
            throw new SiddhiAppCreationException("Please provide a valid `server.url` for SSE server with the stream " + this.streamId + " in Siddhi app " + siddhiAppContext.getName());
        }
    }

    private void initConnectorRegistry(OptionHolder optionHolder, ConfigReader configReader) {
        String validateAndGetStaticValue = optionHolder.validateAndGetStaticValue(HttpConstants.REQUEST_SIZE_VALIDATION_CONFIGS, "");
        String validateAndGetStaticValue2 = optionHolder.validateAndGetStaticValue(HttpConstants.SERVER_BOOTSTRAP_CONFIGS, "");
        this.httpConnectorRegistry = SSESyncConnectorRegistry.getInstance();
        this.httpConnectorRegistry.initBootstrapConfigIfFirst(configReader);
        this.httpConnectorRegistry.setTransportConfig(validateAndGetStaticValue2, validateAndGetStaticValue);
    }

    public boolean matches(String str) {
        return Objects.equals(this.streamId, str);
    }

    public void registerCallback(HttpCarbonMessage httpCarbonMessage) {
        this.requestContainerList.add(httpCarbonMessage);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeCallback(HttpCarbonMessage httpCarbonMessage) {
        this.requestContainerList.remove(httpCarbonMessage);
    }

    public void handleCallback(String str, List<Header> list, String str2) {
        if (this.requestContainerList.isEmpty()) {
            logger.warn("No subscription found" + this.streamId);
        } else {
            this.requestContainerList.forEach(httpCarbonMessage -> {
                if (httpCarbonMessage != null) {
                    handleResponse(httpCarbonMessage, 200, str, list, str2);
                }
            });
        }
    }

    private void handleResponse(final HttpCarbonMessage httpCarbonMessage, HttpCarbonMessage httpCarbonMessage2) {
        try {
            httpCarbonMessage.respond(httpCarbonMessage2).setHttpConnectorListener(new HttpConnectorListener() { // from class: io.siddhi.extension.io.http.sink.SSEServerSink.1
                @Override // org.wso2.transport.http.netty.contract.HttpConnectorListener
                public void onMessage(HttpCarbonMessage httpCarbonMessage3) {
                }

                @Override // org.wso2.transport.http.netty.contract.HttpConnectorListener
                public void onError(Throwable th) {
                    SSEServerSink.this.removeCallback(httpCarbonMessage);
                }
            });
        } catch (ServerConnectorException e) {
            if (this.metrics != null) {
                this.metrics.getTotalHttpErrorsMetric(httpCarbonMessage.getRequestUrl()).inc();
            }
            throw new HttpSourceAdaptorRuntimeException("Error occurred during response", e);
        }
    }

    private void handleResponse(HttpCarbonMessage httpCarbonMessage, Integer num, String str, List<Header> list, String str2) {
        int intValue = num == null ? HttpConstants.INTERNAL_SERVER_FAIL_CODE : num.intValue();
        String str3 = str != null ? str : "";
        String requestUrl = httpCarbonMessage.getRequestUrl();
        if (this.metrics != null) {
            this.metrics.getTotalWritesMetric().inc();
            this.metrics.getTotalHttpWritesMetric(requestUrl).inc();
            this.metrics.getRequestSizeMetric(requestUrl).inc(HttpSinkUtil.getByteSize(str));
            this.metrics.setLastEventTime(requestUrl, System.currentTimeMillis());
        }
        handleResponse(httpCarbonMessage, createResponseMessage(str3, intValue, list, str2));
    }

    private HttpCarbonMessage createResponseMessage(String str, int i, List<Header> list, String str2) {
        HttpCarbonMessage httpCarbonMessage = new HttpCarbonMessage(new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK));
        httpCarbonMessage.addHttpContent(new DefaultHttpContent(Unpooled.wrappedBuffer(str.getBytes(Charset.defaultCharset()))));
        HttpHeaders headers = httpCarbonMessage.getHeaders();
        httpCarbonMessage.setProperty(HttpConstants.HTTP_STATUS_CODE, Integer.valueOf(i));
        httpCarbonMessage.setProperty(Constants.DIRECTION, Constants.DIRECTION_RESPONSE);
        httpCarbonMessage.setStreaming(true);
        if (str2 != null) {
            headers.set("Content-Type", (Object) str2);
        }
        if (list != null) {
            for (Header header : list) {
                headers.set(header.getName(), (Object) header.getValue());
            }
        }
        return httpCarbonMessage;
    }

    private void initMetrics(String str, String str2) {
        if (MetricsDataHolder.getInstance().getMetricService() == null || !MetricsDataHolder.getInstance().getMetricManagementService().isEnabled()) {
            return;
        }
        try {
            if (MetricsDataHolder.getInstance().getMetricManagementService().isReporterRunning(HttpConstants.PROMETHEUS_REPORTER_NAME)) {
                this.metrics = new SinkMetrics(str, str2);
            }
        } catch (IllegalArgumentException e) {
            logger.debug("Prometheus reporter is not running. Hence sse sink metrics will not be initialized for " + str);
        }
    }

    public void publish(Object obj, DynamicOptions dynamicOptions, State state) throws ConnectionUnavailableException {
        List<Header> headers = HttpSinkUtil.getHeaders(this.httpHeaderOption.getValue(dynamicOptions));
        HTTPSinkRegistry.findAndGetSSESource(this.streamId).handleCallback((String) obj, headers, HttpSinkUtil.getContentType(this.mapType, headers));
    }

    public void connect() throws ConnectionUnavailableException {
        this.listenerConfiguration.setChunkConfig(ChunkConfig.ALWAYS);
        this.listenerConfiguration.setKeepAliveConfig(KeepAliveConfig.ALWAYS);
        this.httpConnectorRegistry.createHttpServerConnector(this.listenerConfiguration);
        this.httpConnectorRegistry.registerSourceListener(this.listenerUrl, this.workerThread, Boolean.valueOf(this.isAuth), this.streamId, this.siddhiAppName);
        HTTPSinkRegistry.registerSSESink(this.streamId, this);
    }

    public void disconnect() {
        this.httpConnectorRegistry.unregisterSourceListener(this.listenerUrl, this.siddhiAppName);
        this.httpConnectorRegistry.unregisterServerConnector(this.listenerUrl);
        HTTPSinkRegistry.removeSSESink(this.streamId);
        if (this.requestContainerList.isEmpty()) {
            return;
        }
        this.requestContainerList.forEach(httpCarbonMessage -> {
            if (httpCarbonMessage != null) {
                handleResponse(httpCarbonMessage, 200, null, null, null);
            }
        });
        this.requestContainerList.clear();
    }

    public void destroy() {
        this.httpConnectorRegistry.clearBootstrapConfigIfLast();
        HTTPSinkRegistry.removeSSESink(this.streamId);
    }
}
