/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.websocket.source;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.websocket.source.WebSocketSourceHandshakeListener;
import io.siddhi.extension.io.websocket.util.WebSocketClientConnectorListener;
import io.siddhi.extension.io.websocket.util.WebSocketUtil;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
import org.wso2.transport.http.netty.contract.websocket.ClientHandshakeFuture;
import org.wso2.transport.http.netty.contract.websocket.ClientHandshakeListener;
import org.wso2.transport.http.netty.contract.websocket.WebSocketClientConnector;
import org.wso2.transport.http.netty.contract.websocket.WebSocketClientConnectorConfig;
import org.wso2.transport.http.netty.contract.websocket.WebSocketConnectorListener;
import org.wso2.transport.http.netty.contractimpl.DefaultHttpWsConnectorFactory;

@Extension(name="websocket", namespace="source", description="A Siddhi application can be configured to receive events via the WebSocket by adding the @Source(type = 'websocket') annotation at the top of an event stream definition.\nWhen this is defined the associated stream will receive events from the WebSocket server on the url defined in the system.", parameters={@Parameter(name="url", description="The URL of the remote endpoint.\nThe url scheme should be either 'ws' or 'wss'.", type={DataType.STRING}), @Parameter(name="sub.protocol", description="The negotiable sub-protocol if server is asking for it.\nThe sub.protocol should adhere to `subprotocol1, subprotocol2,...` format.", type={DataType.STRING}, optional=true, defaultValue="null"), @Parameter(name="headers", description="Any specific headers which need to send to the server.\nThe headers should adhere to `'key1:value1', 'key2:value2',...` format.", type={DataType.STRING}, optional=true, defaultValue="null"), @Parameter(name="idle.timeout", description="Idle timeout of the connection", type={DataType.INT}, optional=true, defaultValue="-1"), @Parameter(name="truststore.path", description="The file path to the location of the truststore. If a custom truststore is not specified, then the system uses the default truststore file - wso2carbon.jks in the `${carbon.home}/resources/security` directory.", type={DataType.STRING}, optional=true, defaultValue="${carbon.home}/resources/security/client-truststore.jks"), @Parameter(name="truststore.password", description="The password for the truststore. A custom password can be specified if required. If no custom password is specified, then the system uses `wso2carbon` as the default password.", type={DataType.STRING}, optional=true, defaultValue="wso2carbon")}, examples={@Example(syntax="@Source(type = 'websocket', url = 'ws://localhost:8025/websockets/abc', \n   @map(type='xml'))\ndefine stream Foo (attribute1 string, attribute2 int);", description="Under this configuration, events are received via the WebSocket server and they are passed to `Foo` stream for processing. ")})
public class WebSocketSource
extends Source {
    private String url;
    private String subProtocol;
    private String headers;
    private String idleTimeoutString;
    private int idleTimeout;
    private SourceEventListener sourceEventListener;
    private WebSocketClientConnectorListener connectorListener;
    private boolean sslEnabled = false;
    private String tlsstruststorePath;
    private String tlsstruststorePass;

    public StateFactory init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] strings, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.url = optionHolder.validateAndGetStaticValue("url");
        this.subProtocol = optionHolder.validateAndGetStaticValue("sub.protocol", null);
        this.headers = optionHolder.validateAndGetStaticValue("headers", null);
        this.idleTimeoutString = optionHolder.validateAndGetStaticValue("idle.timeout", null);
        this.sourceEventListener = sourceEventListener;
        if (this.idleTimeoutString != null) {
            try {
                this.idleTimeout = Integer.parseInt(this.idleTimeoutString);
                if (this.idleTimeout < -1) {
                    throw new SiddhiAppCreationException("The idle timeout defined in '" + sourceEventListener + "' should be greater than 0.");
                }
            }
            catch (NumberFormatException e) {
                throw new SiddhiAppCreationException("The idle timeout defined in '" + sourceEventListener + "' should be an Integer.");
            }
        }
        try {
            String scheme = new URI(this.url).getScheme();
            if (!Objects.equals("ws", scheme) && !Objects.equals("wss", scheme)) {
                throw new SiddhiAppCreationException("Invalid scheme in url = " + this.url + ". The scheme of the " + "url" + " for the websocket server should be either `ws` or `wss`.");
            }
            if (Objects.equals("wss", scheme)) {
                this.sslEnabled = true;
                this.tlsstruststorePath = optionHolder.validateAndGetStaticValue("truststore.path", configReader.readConfig("truststore.path", "${carbon.home}/resources/security/client-truststore.jks"));
                this.tlsstruststorePass = optionHolder.validateAndGetStaticValue("truststore.password", configReader.readConfig("truststore.password", "wso2carbon"));
            }
        }
        catch (URISyntaxException e) {
            throw new SiddhiAppCreationException("There is an syntax error in the 'url' of the websocket server.", (Throwable)e);
        }
        this.connectorListener = new WebSocketClientConnectorListener();
        return null;
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{String.class, ByteBuffer.class};
    }

    public void connect(Source.ConnectionCallback connectionCallback, State state) throws ConnectionUnavailableException {
        DefaultHttpWsConnectorFactory httpConnectorFactory = new DefaultHttpWsConnectorFactory();
        WebSocketClientConnectorConfig configuration = new WebSocketClientConnectorConfig(this.url);
        if (this.subProtocol != null) {
            String[] subProtocol1 = WebSocketUtil.getSubProtocol(this.subProtocol);
            configuration.setSubProtocols(subProtocol1);
        }
        if (this.headers != null) {
            Map<String, String> customHeaders = WebSocketUtil.getHeaders(this.headers);
            configuration.addHeaders(customHeaders);
        }
        if (this.idleTimeoutString != null) {
            configuration.setIdleTimeoutInMillis(this.idleTimeout);
        }
        if (this.sslEnabled) {
            configuration.setTrustStoreFile(this.tlsstruststorePath);
            configuration.setTrustStorePass(this.tlsstruststorePass);
        }
        configuration.setAutoRead(true);
        WebSocketClientConnector clientConnector = httpConnectorFactory.createWsClientConnector(configuration);
        ClientHandshakeFuture handshakeFuture = clientConnector.connect();
        handshakeFuture.setWebSocketConnectorListener((WebSocketConnectorListener)this.connectorListener);
        WebSocketSourceHandshakeListener handshakeListener = new WebSocketSourceHandshakeListener(this.connectorListener, this.sourceEventListener);
        handshakeFuture.setClientHandshakeListener((ClientHandshakeListener)handshakeListener);
    }

    public void disconnect() {
    }

    public void destroy() {
    }

    public void pause() {
    }

    public void resume() {
    }
}

