package org.wso2.extension.siddhi.io.tcp.sink;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Map;
import org.apache.log4j.Logger;
import org.wso2.extension.siddhi.io.tcp.transport.TCPNettyClient;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.exception.SiddhiAppCreationException;
import org.wso2.siddhi.core.stream.output.sink.Sink;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.transport.DynamicOptions;
import org.wso2.siddhi.core.util.transport.Option;
import org.wso2.siddhi.core.util.transport.OptionHolder;
import org.wso2.siddhi.query.api.definition.StreamDefinition;

@Extension(name = "tcp", namespace = "sink", description = "A Siddhi application can be configured to publish events via the TCP transport by adding the @Sink(type = 'tcp') annotation at the top of an event stream definition.", parameters = {@Parameter(name = TCPSink.URL, description = "The URL to which outgoing events should be published via TCP.\nThe URL should adhere to `tcp://<host>:<port>/<context>` format.", type = {DataType.STRING}), @Parameter(name = TCPSink.SYNC, description = "This parameter defines whether the events should be published in a synchronized manner or not.\nIf sync = 'true', then the worker will wait for the ack after sending the message.\nElse it will not wait for an ack.", type = {DataType.STRING}, dynamic = true, optional = true, defaultValue = "false"), @Parameter(name = TCPSink.TCP_NO_DELAY, description = "This is to specify whether to disable Nagle algorithm during message passing.\nIf tcp.no.delay = 'true', the execution of Nagle algorithm will be disabled in the underlying TCP logic. Hence there will be no delay between two successive writes to the TCP connection.\nElse there can be a constant ack delay.", type = {DataType.BOOL}, optional = true, defaultValue = "true"), @Parameter(name = TCPSink.KEEP_ALIVE, description = "This property defines whether the server should be kept alive when there are no connections available.", type = {DataType.BOOL}, optional = true, defaultValue = "true"), @Parameter(name = TCPSink.WORKER_THREADS, description = "Number of threads to publish events.", type = {DataType.INT, DataType.LONG}, optional = true, defaultValue = "10")}, examples = {@Example(syntax = "@Sink(type = 'tcp', url='tcp://localhost:8080/abc', sync='true' \n   @map(type='binary'))\ndefine stream Foo (attribute1 string, attribute2 int);", description = "A sink of type 'tcp' has been defined.\nAll events arriving at Foo stream via TCP transport will be sent to the url tcp://localhost:8080/abc in a synchronous manner.")})
/* loaded from: input_file:org/wso2/extension/siddhi/io/tcp/sink/TCPSink.class */
public class TCPSink extends Sink {
    private static final String TCP_NO_DELAY = "tcp.no.delay";
    private static final String KEEP_ALIVE = "keep.alive";
    private static final String WORKER_THREADS = "worker.threads";
    private static final String DEFAULT_TCP_NO_DELAY = "true";
    private static final String DEFAULT_KEEP_ALIVE = "true";
    private static final String DEFAULT_WORKER_THREADS = "0";
    private static final String URL = "url";
    private static final String SYNC = "sync";
    private static final Logger log = Logger.getLogger(TCPSink.class);
    private TCPNettyClient tcpNettyClient;
    private String host;
    private int port;
    private String channelId;
    private Option syncOption;
    private Boolean sync = null;
    private String hostAndPort;

    protected void init(StreamDefinition streamDefinition, OptionHolder optionHolder, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        String validateAndGetStaticValue = optionHolder.validateAndGetStaticValue(URL);
        this.syncOption = optionHolder.getOrCreateOption(SYNC, "false");
        if (this.syncOption.isStatic()) {
            this.sync = Boolean.valueOf(Boolean.parseBoolean(this.syncOption.getValue()));
        }
        try {
            if (!validateAndGetStaticValue.startsWith("tcp:")) {
                throw new SiddhiAppCreationException("Malformed url '" + validateAndGetStaticValue + "' with wrong protocol found, expected in format 'tcp://<host>:<port>/<context>'");
            }
            URL url = new URL(validateAndGetStaticValue.replaceFirst("tcp", "http"));
            this.host = url.getHost();
            this.port = url.getPort();
            this.hostAndPort = this.host + ":" + this.port;
            String path = url.getPath();
            if (path.length() < 2) {
                throw new SiddhiAppCreationException("Malformed url '" + validateAndGetStaticValue + "' found with no context, expected in format 'tcp://<host>:<port>/<context>'");
            }
            this.channelId = path.substring(1);
            this.tcpNettyClient = new TCPNettyClient(Integer.parseInt(optionHolder.validateAndGetStaticValue(WORKER_THREADS, DEFAULT_WORKER_THREADS)), Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(KEEP_ALIVE, "true")), Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(TCP_NO_DELAY, "true")));
        } catch (MalformedURLException e) {
            throw new SiddhiAppCreationException("Malformed url '" + validateAndGetStaticValue + "' found, expected in format 'tcp://<host>:<port>/<context>'", e);
        }
    }

    public Class[] getSupportedInputEventClasses() {
        return new Class[]{String.class, byte[].class, ByteBuffer.class};
    }

    public String[] getSupportedDynamicOptions() {
        return new String[]{SYNC};
    }

    public void connect() throws ConnectionUnavailableException {
        this.tcpNettyClient.connect(this.host, this.port);
        log.info("'tcp' sink at '" + getStreamDefinition().getId() + "' stream successfully connected to '" + this.hostAndPort + "'.");
    }

    public void publish(Object obj, DynamicOptions dynamicOptions) throws ConnectionUnavailableException {
        try {
            byte[] bytes = obj instanceof String ? ((String) obj).getBytes(Charset.defaultCharset()) : obj instanceof ByteBuffer ? ((ByteBuffer) obj).array() : (byte[]) obj;
            if (this.sync != null ? this.sync.booleanValue() : Boolean.parseBoolean(this.syncOption.getValue(dynamicOptions))) {
                try {
                    ChannelFuture send = this.tcpNettyClient.send(this.channelId, bytes);
                    send.sync();
                    if (!send.isSuccess()) {
                        throw new ConnectionUnavailableException("Error sending events to '" + this.hostAndPort + "' on channel '" + this.channelId + "', " + this.hostAndPort + ", " + send.cause().getMessage(), send.cause());
                    }
                } catch (InterruptedException e) {
                    throw new ConnectionUnavailableException("Error sending events to '" + this.hostAndPort + "' on channel '" + this.channelId + "', " + this.hostAndPort + ", " + e.getMessage(), e);
                }
            } else {
                ChannelFuture send2 = this.tcpNettyClient.send(this.channelId, bytes);
                send2.addListener(new ChannelFutureListener() { // from class: org.wso2.extension.siddhi.io.tcp.sink.TCPSink.1
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        if (channelFuture.isSuccess()) {
                            return;
                        }
                        TCPSink.log.error("Error sending events to '" + TCPSink.this.hostAndPort + "' on channel '" + TCPSink.this.channelId + "', " + channelFuture.cause() + ", dropping events ", channelFuture.cause());
                    }
                });
                if (send2.isDone() && !send2.isSuccess()) {
                    throw new ConnectionUnavailableException("Error sending events to '" + this.hostAndPort + "' on channel '" + this.channelId + "', " + this.hostAndPort + ", " + send2.cause().getMessage(), send2.cause());
                }
            }
        } catch (Throwable th) {
            throw new ConnectionUnavailableException("Error sending events to '" + this.hostAndPort + "' on channel '" + this.channelId + "', " + this.hostAndPort + ", " + th.getMessage(), th);
        }
    }

    public void disconnect() {
        if (this.tcpNettyClient != null) {
            this.tcpNettyClient.disconnect();
        }
    }

    public void destroy() {
        if (this.tcpNettyClient != null) {
            this.tcpNettyClient.shutdown();
            this.tcpNettyClient = null;
        }
    }

    public Map<String, Object> currentState() {
        return null;
    }

    public void restoreState(Map<String, Object> map) {
    }
}
