/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.tcp.source;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.SystemParameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.tcp.source.TCPServer;
import io.siddhi.extension.io.tcp.transport.callback.StreamListener;
import io.siddhi.extension.io.tcp.transport.config.ServerConfig;
import java.util.Arrays;

@Extension(name="tcp", namespace="source", description="A Siddhi application can be configured to receive events via the TCP transport by adding the @Source(type = 'tcp') annotation at the top of an event stream definition.\n\nWhen this is defined the associated stream will receive events from the TCP transport on the host and port defined in the system.", parameters={@Parameter(name="context", description="The URL 'context' that should be used to receive the events.", defaultValue="<execution plan name>/<stream name>", optional=true, type={DataType.STRING})}, systemParameter={@SystemParameter(name="host", description="Tcp server host.", defaultValue="0.0.0.0", possibleParameters={"Any valid host or IP"}), @SystemParameter(name="port", description="Tcp server port.", defaultValue="9892", possibleParameters={"Any integer representing valid port"}), @SystemParameter(name="receiver.threads", description="Number of threads to receive connections.", defaultValue="10", possibleParameters={"Any positive integer"}), @SystemParameter(name="worker.threads", description="Number of threads to serve events.", defaultValue="10", possibleParameters={"Any positive integer"}), @SystemParameter(name="tcp.no.delay", description="This is to specify whether to disable Nagle algorithm during message passing.\nIf tcp.no.delay = 'true', the execution of Nagle algorithm  will be disabled in the underlying TCP logic. Hence there will be no delay between two successive writes to the TCP connection.\nElse there can be a constant ack delay.", defaultValue="true", possibleParameters={"true", "false"}), @SystemParameter(name="keep.alive", description="This property defines whether the server should be kept alive when there are no connections available.", defaultValue="true", possibleParameters={"true", "false"})}, examples={@Example(syntax="@Source(type = 'tcp', context='abc', @map(type='binary'))\ndefine stream Foo (attribute1 string, attribute2 int );", description="Under this configuration, events are received via the TCP transport on default host,port, `abc` context, and they are passed to `Foo` stream for processing. ")})
public class TCPSource
extends Source {
    private static final String RECEIVER_THREADS = "receiver.threads";
    private static final String WORKER_THREADS = "worker.threads";
    private static final String PORT = "port";
    private static final String HOST = "host";
    private static final String TCP_NO_DELAY = "tcp.no.delay";
    private static final String KEEP_ALIVE = "keep.alive";
    private static final String CONTEXT = "context";
    private SourceEventListener sourceEventListener;
    private String context;
    private ServerConfig serverConfig;
    private ServiceDeploymentInfo serviceDeploymentInfo;

    public StateFactory init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] requestedTransportProperties, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        if (requestedTransportProperties != null && requestedTransportProperties.length > 0) {
            throw new SiddhiAppCreationException("'tcp' source does not support requestedTransportProperties, but at stream '" + this.getStreamDefinition().getId() + "' '" + Arrays.deepToString(requestedTransportProperties) + "' transport properties are requested");
        }
        this.sourceEventListener = sourceEventListener;
        this.context = optionHolder.validateAndGetStaticValue(CONTEXT, siddhiAppContext.getName() + "/" + sourceEventListener.getStreamDefinition().getId());
        this.serverConfig = new ServerConfig();
        this.serverConfig.setHost(configReader.readConfig(HOST, "0.0.0.0"));
        this.serverConfig.setPort(Integer.parseInt(configReader.readConfig(PORT, "9892")));
        this.serverConfig.setKeepAlive(Boolean.parseBoolean(configReader.readConfig(KEEP_ALIVE, "true")));
        this.serverConfig.setTcpNoDelay(Boolean.parseBoolean(configReader.readConfig(TCP_NO_DELAY, "true")));
        this.serverConfig.setReceiverThreads(Integer.parseInt(configReader.readConfig(RECEIVER_THREADS, "10")));
        this.serverConfig.setWorkerThreads(Integer.parseInt(configReader.readConfig(WORKER_THREADS, "10")));
        this.serviceDeploymentInfo = new ServiceDeploymentInfo(this.serverConfig.getPort(), false);
        return null;
    }

    public void connect(Source.ConnectionCallback connectionCallback, State state) throws ConnectionUnavailableException {
        TCPServer.getInstance().addStreamListener(new StreamListener(){

            @Override
            public String getChannelId() {
                return TCPSource.this.context;
            }

            @Override
            public void onMessage(byte[] message) {
                TCPSource.this.sourceEventListener.onEvent((Object)message, null);
            }
        });
        TCPServer.getInstance().start(this.serverConfig);
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return this.serviceDeploymentInfo;
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{byte[].class};
    }

    public void disconnect() {
        TCPServer.getInstance().removeStreamListener(this.context);
        TCPServer.getInstance().stop();
    }

    public void destroy() {
    }

    public void pause() {
        TCPServer.getInstance().pause();
    }

    public void resume() {
        TCPServer.getInstance().resume();
    }
}

