/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.websocket.sink;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.websocket.sink.WebSocketSinkHandshakeListener;
import io.siddhi.extension.io.websocket.util.WebSocketClientConnectorListener;
import io.siddhi.extension.io.websocket.util.WebSocketUtil;
import io.siddhi.query.api.definition.StreamDefinition;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.transport.http.netty.contract.websocket.ClientHandshakeFuture;
import org.wso2.transport.http.netty.contract.websocket.ClientHandshakeListener;
import org.wso2.transport.http.netty.contract.websocket.WebSocketClientConnector;
import org.wso2.transport.http.netty.contract.websocket.WebSocketClientConnectorConfig;
import org.wso2.transport.http.netty.contract.websocket.WebSocketConnection;
import org.wso2.transport.http.netty.contract.websocket.WebSocketConnectorListener;
import org.wso2.transport.http.netty.contractimpl.DefaultHttpWsConnectorFactory;

@Extension(name="websocket", namespace="sink", description="A Siddhi application can be configured to publish events via the Websocket transport by adding the @Sink(type = 'websocket') annotation at the top of an event stream definition.", parameters={@Parameter(name="url", description="The URL of the remote endpoint.\nThe url scheme should be either 'ws' or 'wss'.", type={DataType.STRING}), @Parameter(name="sub.protocol", description="The negotiable sub-protocol if server is asking for it.\nThe sub.protocol should adhere to `subprotocol1, subprotocol2,...` format.", type={DataType.STRING}, optional=true, defaultValue="null"), @Parameter(name="headers", description="Any specific headers which need to send to the server.\nThe headers should adhere to `'key1:value1', 'key2:value2',...` format.", type={DataType.STRING}, optional=true, defaultValue="null"), @Parameter(name="idle.timeout", description="Idle timeout of the connection", type={DataType.INT}, optional=true, defaultValue="-1"), @Parameter(name="truststore.path", description="The file path to the location of the truststore. If a custom truststore is not specified, then the system uses the default truststore file - wso2carbon.jks in the `${carbon.home}/resources/security` directory.", type={DataType.STRING}, optional=true, defaultValue="${carbon.home}/resources/security/client-truststore.jks"), @Parameter(name="truststore.password", description="The password for the truststore. A custom password can be specified if required. If no custom password is specified, then the system uses `wso2carbon` as the default password.", type={DataType.STRING}, optional=true, defaultValue="wso2carbon")}, examples={@Example(syntax="@Sink(type = 'websocket', url = 'ws://localhost:8025/abc', \n   @map(type='xml'))\ndefine stream Foo (attribute1 string, attribute2 int);", description="A sink of type 'websocket' has been defined.\nAll events arriving at Foo stream via websocket will be sent to the url ws://localhost:8025/abc.")})
public class WebSocketSink
extends Sink {
    private static final Logger log = LoggerFactory.getLogger(WebSocketSink.class);
    private static final String[] SUPPORTED_DYNAMIC_OPTIONS = new String[0];
    private StreamDefinition streamDefinition;
    private String url;
    private String subProtocol;
    private String headers;
    private String idleTimeoutString;
    private int idleTimeout;
    private WebSocketClientConnectorListener connectorListener;
    private WebSocketConnection webSocketConnection = null;
    private Semaphore semaphore = new Semaphore(0);
    private boolean sslEnabled = false;
    private String tlsstruststorePath;
    private String tlsstruststorePass;

    public Class[] getSupportedInputEventClasses() {
        return new Class[]{String.class, ByteBuffer.class};
    }

    public String[] getSupportedDynamicOptions() {
        return SUPPORTED_DYNAMIC_OPTIONS;
    }

    protected StateFactory init(StreamDefinition streamDefinition, OptionHolder optionHolder, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.streamDefinition = streamDefinition;
        this.url = optionHolder.validateAndGetStaticValue("url");
        this.subProtocol = optionHolder.validateAndGetStaticValue("sub.protocol", null);
        this.headers = optionHolder.validateAndGetStaticValue("headers", null);
        this.idleTimeoutString = optionHolder.validateAndGetStaticValue("idle.timeout", null);
        if (this.idleTimeoutString != null) {
            try {
                this.idleTimeout = Integer.parseInt(this.idleTimeoutString);
                if (this.idleTimeout < -1) {
                    throw new SiddhiAppCreationException("The idle timeout defined in '" + streamDefinition + "' should be greater than 0.");
                }
            }
            catch (NumberFormatException e) {
                throw new SiddhiAppCreationException("The idle timeout defined in '" + streamDefinition + "' should be an Integer.");
            }
        }
        this.connectorListener = new WebSocketClientConnectorListener();
        try {
            String scheme = new URI(this.url).getScheme();
            if (!Objects.equals("ws", scheme) && !Objects.equals("wss", scheme)) {
                throw new SiddhiAppCreationException("Invalid scheme in url = " + this.url + ". The scheme of the " + "url" + " for the websocket server should be either `ws` or `wss`.");
            }
            if (Objects.equals("wss", scheme)) {
                this.sslEnabled = true;
                this.tlsstruststorePath = optionHolder.validateAndGetStaticValue("truststore.path", configReader.readConfig("truststore.path", "${carbon.home}/resources/security/client-truststore.jks"));
                this.tlsstruststorePass = optionHolder.validateAndGetStaticValue("truststore.password", configReader.readConfig("truststore.password", "wso2carbon"));
            }
        }
        catch (URISyntaxException e) {
            throw new SiddhiAppCreationException("There is an syntax error in the '" + this.url + "' of the websocket server.", (Throwable)e);
        }
        return null;
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public void publish(Object payload, DynamicOptions dynamicOptions, State state) throws ConnectionUnavailableException {
        if (this.webSocketConnection != null) {
            if (payload instanceof ByteBuffer) {
                byte[] byteMessage = ((ByteBuffer)payload).array();
                ByteBuffer binaryMessage = ByteBuffer.wrap(byteMessage);
                this.webSocketConnection.pushBinary(binaryMessage);
            } else {
                this.webSocketConnection.pushText(payload.toString());
            }
        }
    }

    public void connect() throws ConnectionUnavailableException {
        DefaultHttpWsConnectorFactory httpConnectorFactory = new DefaultHttpWsConnectorFactory();
        WebSocketClientConnectorConfig configuration = new WebSocketClientConnectorConfig(this.url);
        if (this.subProtocol != null) {
            String[] subProtocol1 = WebSocketUtil.getSubProtocol(this.subProtocol);
            configuration.setSubProtocols(subProtocol1);
        }
        if (this.headers != null) {
            Map<String, String> customHeaders = WebSocketUtil.getHeaders(this.headers);
            configuration.addHeaders(customHeaders);
        }
        if (this.idleTimeoutString != null) {
            configuration.setIdleTimeoutInMillis(this.idleTimeout);
        }
        if (this.sslEnabled) {
            configuration.setTrustStoreFile(this.tlsstruststorePath);
            configuration.setTrustStorePass(this.tlsstruststorePass);
        }
        configuration.setAutoRead(true);
        WebSocketClientConnector clientConnector = httpConnectorFactory.createWsClientConnector(configuration);
        ClientHandshakeFuture handshakeFuture = clientConnector.connect();
        handshakeFuture.setWebSocketConnectorListener((WebSocketConnectorListener)this.connectorListener);
        WebSocketSinkHandshakeListener handshakeListener = new WebSocketSinkHandshakeListener(this.streamDefinition, this.semaphore);
        try {
            handshakeFuture.setClientHandshakeListener((ClientHandshakeListener)handshakeListener);
            this.semaphore.acquire();
        }
        catch (InterruptedException e) {
            log.error("Error occurs while connecting with the server defined in " + this.streamDefinition, (Throwable)e);
        }
        AtomicReference<WebSocketConnection> sessionAtomicReference = handshakeListener.getWebSocketConnectionAtomicReference();
        this.webSocketConnection = sessionAtomicReference.get();
    }

    public void disconnect() {
        if (this.webSocketConnection != null) {
            this.webSocketConnection.terminateConnection();
        }
    }

    public void destroy() {
    }
}

