/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.tcp.sink;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.concurrent.GenericFutureListener;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.Option;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.tcp.transport.TCPNettyClient;
import io.siddhi.query.api.definition.StreamDefinition;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import org.apache.log4j.Logger;

@Extension(name="tcp", namespace="sink", description="A Siddhi application can be configured to publish events via the TCP transport by adding the @Sink(type = 'tcp') annotation at the top of an event stream definition.", parameters={@Parameter(name="url", description="The URL to which outgoing events should be published via TCP.\nThe URL should adhere to `tcp://<host>:<port>/<context>` format.", type={DataType.STRING}), @Parameter(name="sync", description="This parameter defines whether the events should be published in a synchronized manner or not.\nIf sync = 'true', then the worker will wait for the ack after sending the message.\nElse it will not wait for an ack.", type={DataType.STRING}, dynamic=true, optional=true, defaultValue="false"), @Parameter(name="tcp.no.delay", description="This is to specify whether to disable Nagle algorithm during message passing.\nIf tcp.no.delay = 'true', the execution of Nagle algorithm will be disabled in the underlying TCP logic. Hence there will be no delay between two successive writes to the TCP connection.\nElse there can be a constant ack delay.", type={DataType.BOOL}, optional=true, defaultValue="true"), @Parameter(name="keep.alive", description="This property defines whether the server should be kept alive when there are no connections available.", type={DataType.BOOL}, optional=true, defaultValue="true"), @Parameter(name="worker.threads", description="Number of threads to publish events.", type={DataType.INT, DataType.LONG}, optional=true, defaultValue="10")}, examples={@Example(syntax="@Sink(type = 'tcp', url='tcp://localhost:8080/abc', sync='true' \n   @map(type='binary'))\ndefine stream Foo (attribute1 string, attribute2 int);", description="A sink of type 'tcp' has been defined.\nAll events arriving at Foo stream via TCP transport will be sent to the url tcp://localhost:8080/abc in a synchronous manner.")})
public class TCPSink
extends Sink {
    private static final String TCP_NO_DELAY = "tcp.no.delay";
    private static final String KEEP_ALIVE = "keep.alive";
    private static final String WORKER_THREADS = "worker.threads";
    private static final String DEFAULT_TCP_NO_DELAY = "true";
    private static final String DEFAULT_KEEP_ALIVE = "true";
    private static final String DEFAULT_WORKER_THREADS = "0";
    private static final String URL = "url";
    private static final String SYNC = "sync";
    private static final Logger log = Logger.getLogger(TCPSink.class);
    private TCPNettyClient tcpNettyClient;
    private String host;
    private int port;
    private String channelId;
    private Option syncOption;
    private Boolean sync = null;
    private String hostAndPort;

    protected StateFactory init(StreamDefinition outputStreamDefinition, OptionHolder optionHolder, ConfigReader sinkConfigReader, SiddhiAppContext siddhiAppContext) {
        String url = optionHolder.validateAndGetStaticValue(URL);
        this.syncOption = optionHolder.getOrCreateOption(SYNC, "false");
        if (this.syncOption.isStatic()) {
            this.sync = Boolean.parseBoolean(this.syncOption.getValue());
        }
        try {
            if (!url.startsWith("tcp:")) {
                throw new SiddhiAppCreationException("Malformed url '" + url + "' with wrong protocol found, expected in format 'tcp://<host>:<port>/<context>'");
            }
            URL aURL = new URL(url.replaceFirst("tcp", "http"));
            this.host = aURL.getHost();
            this.port = aURL.getPort();
            this.hostAndPort = this.host + ":" + this.port;
            String path = aURL.getPath();
            if (path.length() < 2) {
                throw new SiddhiAppCreationException("Malformed url '" + url + "' found with no context, expected in format 'tcp://<host>:<port>/<context>'");
            }
            this.channelId = path.substring(1);
        }
        catch (MalformedURLException e) {
            throw new SiddhiAppCreationException("Malformed url '" + url + "' found, expected in format 'tcp://<host>:<port>/<context>'", (Throwable)e);
        }
        boolean tcpNoDelay = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(TCP_NO_DELAY, "true"));
        boolean keepAlive = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(KEEP_ALIVE, "true"));
        int workerThreads = Integer.parseInt(optionHolder.validateAndGetStaticValue(WORKER_THREADS, DEFAULT_WORKER_THREADS));
        this.tcpNettyClient = new TCPNettyClient(workerThreads, keepAlive, tcpNoDelay);
        return null;
    }

    public Class[] getSupportedInputEventClasses() {
        return new Class[]{String.class, byte[].class, ByteBuffer.class};
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public String[] getSupportedDynamicOptions() {
        return new String[]{SYNC};
    }

    public void connect() throws ConnectionUnavailableException {
        this.tcpNettyClient.connect(this.host, this.port);
        log.info((Object)("'tcp' sink at '" + this.getStreamDefinition().getId() + "' stream successfully connected to '" + this.hostAndPort + "'."));
    }

    public void publish(Object payload, DynamicOptions dynamicOptions, State state) throws ConnectionUnavailableException {
        block7: {
            try {
                byte[] message = payload instanceof String ? ((String)payload).getBytes(Charset.defaultCharset()) : (payload instanceof ByteBuffer ? ((ByteBuffer)payload).array() : (byte[])payload);
                boolean isSync = this.sync != null ? this.sync : Boolean.parseBoolean(this.syncOption.getValue(dynamicOptions));
                if (isSync) {
                    try {
                        ChannelFuture future = this.tcpNettyClient.send(this.channelId, message);
                        future.sync();
                        if (!future.isSuccess()) {
                            throw new ConnectionUnavailableException("Error sending events to '" + this.hostAndPort + "' on channel '" + this.channelId + "', " + this.hostAndPort + ", " + future.cause().getMessage(), future.cause());
                        }
                        break block7;
                    }
                    catch (InterruptedException e) {
                        throw new ConnectionUnavailableException("Error sending events to '" + this.hostAndPort + "' on channel '" + this.channelId + "', " + this.hostAndPort + ", " + e.getMessage(), (Throwable)e);
                    }
                }
                ChannelFuture future = this.tcpNettyClient.send(this.channelId, message);
                future.addListener((GenericFutureListener)new ChannelFutureListener(){

                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            log.error((Object)("Error sending events to '" + TCPSink.this.hostAndPort + "' on channel '" + TCPSink.this.channelId + "', " + future.cause() + ", dropping events "), future.cause());
                        }
                    }
                });
                if (future.isDone() && !future.isSuccess()) {
                    throw new ConnectionUnavailableException("Error sending events to '" + this.hostAndPort + "' on channel '" + this.channelId + "', " + this.hostAndPort + ", " + future.cause().getMessage(), future.cause());
                }
            }
            catch (Throwable t) {
                throw new ConnectionUnavailableException("Error sending events to '" + this.hostAndPort + "' on channel '" + this.channelId + "', " + this.hostAndPort + ", " + t.getMessage(), t);
            }
        }
    }

    public void disconnect() {
        if (this.tcpNettyClient != null) {
            this.tcpNettyClient.disconnect();
        }
    }

    public void destroy() {
        if (this.tcpNettyClient != null) {
            this.tcpNettyClient.shutdown();
            this.tcpNettyClient = null;
        }
    }
}

