package io.siddhi.extension.io.nats.sink;

import io.nats.streaming.ConnectionLostHandler;
import io.nats.streaming.Options;
import io.nats.streaming.StreamingConnection;
import io.nats.streaming.StreamingConnectionFactory;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.Option;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.nats.sink.exception.NATSSinkAdaptorRuntimeException;
import io.siddhi.extension.io.nats.util.NATSConstants;
import io.siddhi.extension.io.nats.util.NATSUtils;
import io.siddhi.query.api.definition.StreamDefinition;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;

@Extension(name = "nats", namespace = "sink", description = "NATS Sink allows users to subscribe to a NATS broker and publish messages.", parameters = {@Parameter(name = NATSConstants.DESTINATION, description = "Subject name which NATS sink should publish to.", type = {DataType.STRING}, dynamic = true), @Parameter(name = NATSConstants.BOOTSTRAP_SERVERS, description = "The NATS based url of the NATS server.", type = {DataType.STRING}, optional = true, defaultValue = NATSConstants.DEFAULT_SERVER_URL), @Parameter(name = NATSConstants.CLIENT_ID, description = "The identifier of the client publishing/connecting to the NATS broker. Should be unique for each client connecting to the server/cluster.", type = {DataType.STRING}, optional = true, defaultValue = "None"), @Parameter(name = NATSConstants.CLUSTER_ID, description = "The identifier of the NATS server/cluster.", type = {DataType.STRING}, optional = true, defaultValue = NATSConstants.DEFAULT_CLUSTER_ID)}, examples = {@Example(description = "This example shows how to publish to a NATS subject with all supporting configurations. With the following configuration the sink identified as 'nats-client' will publish to a subject named as 'SP_NATS_OUTPUT_TEST' which resides in a nats instance with a cluster id of 'test-cluster', running in localhost and listening to the port 4222 for client connection.", syntax = "@sink(type='nats', @map(type='xml'), destination='SP_NATS_OUTPUT_TEST', bootstrap.servers='nats://localhost:4222',client.id='nats_client',server.id='test-cluster')\ndefine stream outputStream (name string, age int, country string);"), @Example(description = "This example shows how to publish to a NATS subject with mandatory configurations. With the following configuration the sink identified with an auto generated client id will publish to a subject named as 'SP_NATS_OUTPUT_TEST' which resides in a nats instance with a cluster id of 'test-cluster', running in localhost and listening to the port 4222 for client connection.", syntax = "@sink(type='nats', @map(type='xml'), destination='SP_NATS_OUTPUT_TEST')\ndefine stream outputStream (name string, age int, country string);")})
/* loaded from: input_file:io/siddhi/extension/io/nats/sink/NATSSink.class */
public class NATSSink extends Sink {
    private static final Logger log = Logger.getLogger(NATSSink.class);
    private StreamingConnection streamingConnection;
    private OptionHolder optionHolder;
    private StreamDefinition streamDefinition;
    private Option destination;
    private String clusterId;
    private String clientId;
    private String natsUrl;
    private String siddhiAppName;
    private AtomicBoolean isConnectionClosed = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/siddhi/extension/io/nats/sink/NATSSink$NATSConnectionLostHandler.class */
    public class NATSConnectionLostHandler implements ConnectionLostHandler {
        NATSConnectionLostHandler() {
        }

        public void connectionLost(StreamingConnection streamingConnection, Exception exc) {
            NATSSink.log.error("Exception occurred in Siddhi App " + NATSSink.this.siddhiAppName + " when publishing messages to NATS endpoint " + NATSSink.this.natsUrl + " . " + exc.getMessage(), exc);
            NATSSink.this.isConnectionClosed = new AtomicBoolean(true);
        }
    }

    public Class[] getSupportedInputEventClasses() {
        return new Class[]{String.class};
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public String[] getSupportedDynamicOptions() {
        return new String[]{NATSConstants.DESTINATION};
    }

    protected StateFactory init(StreamDefinition streamDefinition, OptionHolder optionHolder, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.siddhiAppName = siddhiAppContext.getName();
        this.optionHolder = optionHolder;
        this.streamDefinition = streamDefinition;
        validateAndInitNatsProperties();
        return null;
    }

    public void publish(Object obj, DynamicOptions dynamicOptions, State state) throws ConnectionUnavailableException {
        String str = (String) obj;
        String value = this.destination.getValue();
        try {
            if (this.isConnectionClosed.get()) {
                this.streamingConnection.close();
                connect();
            }
            this.streamingConnection.publish(value, str.getBytes(StandardCharsets.UTF_8), new AsyncAckHandler(this.siddhiAppName, this.natsUrl, obj, this, dynamicOptions));
        } catch (IOException e) {
            log.error("Error sending message to destination: " + value);
            throw new NATSSinkAdaptorRuntimeException("Error sending message to destination:" + value, e);
        } catch (InterruptedException e2) {
            log.error("Error sending message to destination: " + value + ".The calling thread is interrupted before the call completes.");
            throw new NATSSinkAdaptorRuntimeException("Error sending message to destination:" + value + ".The calling thread is interrupted before the call completes.", e2);
        } catch (TimeoutException e3) {
            log.error("Error sending message to destination: " + value + ".Timeout occured while trying to ack.");
            throw new NATSSinkAdaptorRuntimeException("Error sending message to destination:" + value + ".Timeout occured while trying to ack.", e3);
        }
    }

    public void connect() throws ConnectionUnavailableException {
        try {
            this.streamingConnection = new StreamingConnectionFactory(new Options.Builder().natsUrl(this.natsUrl).clientId(this.clientId).clusterId(this.clusterId).connectionLostHandler(new NATSConnectionLostHandler()).build()).createConnection();
            this.isConnectionClosed.set(false);
        } catch (IOException e) {
            String str = "Error in Siddhi App " + this.siddhiAppName + " while connecting to NATS server endpoint " + this.natsUrl + " at destination: " + this.destination.getValue();
            log.error(str);
            throw new ConnectionUnavailableException(str, e);
        } catch (InterruptedException e2) {
            String str2 = "Error in Siddhi App " + this.siddhiAppName + " while connecting to NATS server endpoint " + this.natsUrl + " at destination: " + this.destination.getValue() + ". The calling thread is interrupted before the connection can be established.";
            log.error(str2);
            throw new ConnectionUnavailableException(str2, e2);
        }
    }

    public void disconnect() {
        try {
            if (this.streamingConnection != null) {
                this.streamingConnection.close();
            }
        } catch (IOException | InterruptedException | TimeoutException e) {
            log.error("Error disconnecting the Stan receiver in Siddhi App " + this.siddhiAppName + " when publishing messages to NATS endpoint " + this.natsUrl + " . " + e.getMessage(), e);
        }
    }

    public void destroy() {
    }

    private void validateAndInitNatsProperties() {
        this.destination = this.optionHolder.validateAndGetOption(NATSConstants.DESTINATION);
        this.clusterId = this.optionHolder.validateAndGetStaticValue(NATSConstants.CLUSTER_ID, NATSConstants.DEFAULT_CLUSTER_ID);
        this.clientId = this.optionHolder.validateAndGetStaticValue(NATSConstants.CLIENT_ID, NATSUtils.createClientId());
        this.natsUrl = this.optionHolder.validateAndGetStaticValue(NATSConstants.BOOTSTRAP_SERVERS, NATSConstants.DEFAULT_SERVER_URL);
        NATSUtils.validateNatsUrl(this.natsUrl, this.streamDefinition.getId());
    }
}
