package io.siddhi.extension.io.nats.sink;

import com.google.protobuf.GeneratedMessageV3;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.nats.sink.nats.NATSCore;
import io.siddhi.extension.io.nats.sink.nats.NATSStreaming;
import io.siddhi.extension.io.nats.util.NATSConstants;
import io.siddhi.query.api.definition.StreamDefinition;
import java.nio.charset.StandardCharsets;
import java.util.Map;

@Extension(name = "nats", namespace = "sink", description = "NATS Sink allows users to subscribe to a Nats or Nats streaming broker and publish messages.", parameters = {@Parameter(name = NATSConstants.DESTINATION, description = "Subject name which NATS sink should publish to.", type = {DataType.STRING}, dynamic = true), @Parameter(name = NATSConstants.BOOTSTRAP_SERVERS, description = "Deprecated, use `server.urls` instead, The NATS based urls of the NATS server. Can be provided multiple urls separated by commas(`,`).", type = {DataType.STRING}, optional = true, defaultValue = NATSConstants.DEFAULT_SERVER_URL), @Parameter(name = NATSConstants.SERVER_URLS, description = "The NATS based urls of the NATS server. Can be provided multiple urls separated by commas(`,`).", type = {DataType.STRING}, optional = true, defaultValue = NATSConstants.DEFAULT_SERVER_URL), @Parameter(name = NATSConstants.CLIENT_ID, description = "The identifier of the client publishing/connecting to the NATS streaming broker. Should be unique for each client connecting to the server/cluster.(supported only for nats streaming connections).", type = {DataType.STRING}, optional = true, defaultValue = "None"), @Parameter(name = NATSConstants.CLUSTER_ID, description = "Deprecated, use `streaming.cluster.id` instead. The identifier of the NATS server/cluster. Should be provided when using nats streaming broker.", type = {DataType.STRING}), @Parameter(name = NATSConstants.STREAMING_CLUSTER_ID, description = "The identifier of the NATS server/cluster. Should be provided when using nats streaming broker", type = {DataType.STRING}), @Parameter(name = NATSConstants.ACK_WAIT, description = "Ack timeout in seconds for nats publisher, Supported only with nats streaming broker.", type = {DataType.LONG}), @Parameter(name = NATSConstants.OPTIONAL_CONFIGURATION, description = "This parameter contains all the other possible configurations that the nats client can be created with. \n `io.nats.client.reconnect.max:1, io.nats.client.timeout:1000`", optional = true, type = {DataType.STRING}, defaultValue = "-"), @Parameter(name = NATSConstants.AUTH_TYPE, description = "Set the authentication type. Should be provided when using secure connection. Supported authentication types: `user, token, tls`", optional = true, type = {DataType.STRING}, defaultValue = "-"), @Parameter(name = NATSConstants.USERNAME, description = "Set the username, should be provided if `auth.type` is set as `user`", optional = true, type = {DataType.STRING}, defaultValue = "-"), @Parameter(name = NATSConstants.PASSWORD, description = "Set the password, should be provided if `auth.type` is set as `user`", optional = true, type = {DataType.STRING}, defaultValue = "-"), @Parameter(name = NATSConstants.TOKEN, description = "Set the token, should be provided if `auth.type` is set as `token`", optional = true, type = {DataType.STRING}, defaultValue = "-"), @Parameter(name = NATSConstants.TRUSTSTORE_FILE, description = "Configure the truststore file", optional = true, type = {DataType.STRING}, defaultValue = "`${carbon.home}/resources/security/client-truststore.jks`"), @Parameter(name = NATSConstants.STORE_TYPE, description = "TLS store type.", type = {DataType.STRING}, optional = true, defaultValue = NATSConstants.DEFAULT_STORE_TYPE), @Parameter(name = NATSConstants.TRUSTSTORE_PASSWORD, description = "The password for the client truststore", optional = true, type = {DataType.STRING}, defaultValue = "wso2carbon"), @Parameter(name = NATSConstants.TRUSTSTORE_ALGORITHM, description = "The encryption algorithm of the truststore.", optional = true, type = {DataType.STRING}, defaultValue = "SunX509"), @Parameter(name = NATSConstants.CLIENT_VERIFY, description = "Enable the client verification, should be set to `true` if client needs to be verify by the server.", optional = true, type = {DataType.BOOL}, defaultValue = "false"), @Parameter(name = NATSConstants.KEYSTORE_FILE, description = "Configure the Keystore file, only if client verification is needed.", optional = true, type = {DataType.STRING}, defaultValue = "`${carbon.home}/resources/security/wso2carbon.jks`"), @Parameter(name = NATSConstants.KEYSTORE_ALGORITHM, description = "The encryption algorithm of the keystore.", optional = true, type = {DataType.STRING}, defaultValue = "SunX509"), @Parameter(name = NATSConstants.KEYSTORE_PASSWORD, description = "The password for the keystore.", optional = true, type = {DataType.STRING}, defaultValue = "wso2carbon")}, examples = {@Example(syntax = "@sink(type='nats', @map(type='xml'), destination='SP_NATS_OUTPUT_TEST', server.urls='nats://localhost:4222',client.id='nats_client',streaming.cluster.id='test-cluster')\ndefine stream outputStream (name string, age int, country string);", description = "This example shows how to publish events to a `nats streaming` broker with basic configurations. Here the nats sink will publish events into the `SP_NATS_OUTPUT_TEST` subject. Nats streaming server should be runs on the `localhost:4222` address. `streaming.cluster.id` should be provided if wer want to publish events into a nats streaming broker."), @Example(syntax = "@sink(type='nats', @map(type='xml'), destination='nats-test1', server.urls='nats://localhost:4222')\ndefine stream inputStream (name string, age int, country string)", description = "This example shows how to publish events into a nats broker with basic configurations. Nats server should be running on `localhost:4222` and this sink will publish events to the `nats-test1` subject."), @Example(syntax = "@sink(type='nats',@map(type='protobuf', class='io.siddhi.extension.io.nats.utils.protobuf.Person'),\n destination='nats-test1', server.urls='nats://localhost:4222')\ndefine stream inputStream (nic long, name string)", description = "Above query shows how to use nats sink to publish protobuf messages into a nats broker.")})
/* loaded from: input_file:io/siddhi/extension/io/nats/sink/NATSSink.class */
public class NATSSink extends Sink {
    private NATSCore nats;

    public Class[] getSupportedInputEventClasses() {
        return new Class[]{String.class, Map.class, GeneratedMessageV3.class};
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public String[] getSupportedDynamicOptions() {
        return new String[]{NATSConstants.DESTINATION};
    }

    protected StateFactory init(StreamDefinition streamDefinition, OptionHolder optionHolder, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        if (optionHolder.isOptionExists(NATSConstants.CLUSTER_ID) || optionHolder.isOptionExists(NATSConstants.STREAMING_CLUSTER_ID)) {
            this.nats = new NATSStreaming(this);
        } else {
            this.nats = new NATSCore();
        }
        this.nats.initiateClient(optionHolder, siddhiAppContext.getName(), streamDefinition.getId());
        return null;
    }

    public void publish(Object obj, DynamicOptions dynamicOptions, State state) throws ConnectionUnavailableException {
        this.nats.publishMessages(obj, obj instanceof byte[] ? (byte[]) obj : ((String) obj).getBytes(StandardCharsets.UTF_8), dynamicOptions);
    }

    public void connect() throws ConnectionUnavailableException {
        this.nats.createNATSClient();
    }

    public void disconnect() {
        this.nats.disconnect();
    }

    public void destroy() {
    }
}
