package io.siddhi.extension.io.nats.source;

import io.nats.streaming.ConnectionLostHandler;
import io.nats.streaming.Options;
import io.nats.streaming.StreamingConnection;
import io.nats.streaming.StreamingConnectionFactory;
import io.nats.streaming.Subscription;
import io.nats.streaming.SubscriptionOptions;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.nats.source.exception.NATSInputAdaptorRuntimeException;
import io.siddhi.extension.io.nats.util.NATSConstants;
import io.siddhi.extension.io.nats.util.NATSUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;

@Extension(name = "nats", namespace = "source", description = "NATS Source allows users to subscribe to a NATS broker and receive messages. It has the ability to receive all the message types supported by NATS.", parameters = {@Parameter(name = NATSConstants.DESTINATION, description = "Subject name which NATS Source should subscribe to.", type = {DataType.STRING}), @Parameter(name = NATSConstants.BOOTSTRAP_SERVERS, description = "The NATS based url of the NATS server.", type = {DataType.STRING}, optional = true, defaultValue = NATSConstants.DEFAULT_SERVER_URL), @Parameter(name = NATSConstants.CLIENT_ID, description = "The identifier of the client subscribing/connecting to the NATS broker.", type = {DataType.STRING}, optional = true, defaultValue = "None"), @Parameter(name = NATSConstants.CLUSTER_ID, description = "The identifier of the NATS server/cluster.", type = {DataType.STRING}, optional = true, defaultValue = NATSConstants.DEFAULT_CLUSTER_ID), @Parameter(name = NATSConstants.QUEUE_GROUP_NAME, description = "This can be used when there is a requirement to share the load of a NATS subject. Clients belongs to the same queue group share the subscription load.", type = {DataType.STRING}, optional = true, defaultValue = "None"), @Parameter(name = NATSConstants.DURABLE_NAME, description = "This can be used to subscribe to a subject from the last acknowledged message when a client or connection failure happens. The client can be uniquely identified using the tuple (client.id, durable.name).", type = {DataType.STRING}, optional = true, defaultValue = "None"), @Parameter(name = NATSConstants.SUBSCRIPTION_SEQUENCE, description = "This can be used to subscribe to a subject from a given number of message sequence. All the messages from the given point of sequence number will be passed to the client. If not provided then the either the persisted value or 0 will be used.", type = {DataType.STRING}, optional = true, defaultValue = "None")}, examples = {@Example(description = "This example shows how to subscribe to a NATS subject with all supporting configurations.With the following configuration the source identified as 'nats-client' will subscribes to a subject named as 'SP_NATS_INPUT_TEST' which resides in a nats instance with a cluster id of 'test-cluster', running in localhost and listening to the port 4222 for client connection. This subscription will receive all the messages from 100th in the subject.", syntax = "@source(type='nats', @map(type='text'), destination='SP_NATS_INPUT_TEST', bootstrap.servers='nats://localhost:4222',client.id='nats_client',server.id='test-cluster',queue.group.name = 'group_nats',durable.name = 'nats-durable',subscription.sequence = '100')\ndefine stream inputStream (name string, age int, country string);"), @Example(description = "This example shows how to subscribe to a NATS subject with mandatory configurations.With the following configuration the source identified with an auto generated client id will subscribes to a subject named as 'SP_NATS_INTPUT_TEST' which resides in a nats instance with a cluster id of 'test-cluster', running in localhost and listening to the port 4222 for client connection. This will receive all available messages in the subject", syntax = "@source(type='nats', @map(type='text'), destination='SP_NATS_INPUT_TEST', )\ndefine stream inputStream (name string, age int, country string);")})
/* loaded from: input_file:io/siddhi/extension/io/nats/source/NATSSource.class */
public class NATSSource extends Source<NATSSourceState> {
    private static final Logger log = Logger.getLogger(NATSSource.class);
    private SourceEventListener sourceEventListener;
    private OptionHolder optionHolder;
    private StreamingConnection streamingConnection;
    private String destination;
    private String clusterId;
    private String clientId;
    private String natsUrl;
    private String queueGroupName;
    private String durableName;
    private String sequenceNumber;
    private Subscription subscription;
    private NATSMessageProcessor natsMessageProcessor;
    private String siddhiAppName;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/siddhi/extension/io/nats/source/NATSSource$NATSConnectionLostHandler.class */
    public class NATSConnectionLostHandler implements ConnectionLostHandler {
        private Source<NATSSourceState>.ConnectionCallback connectionCallback;

        NATSConnectionLostHandler(Source<NATSSourceState>.ConnectionCallback connectionCallback) {
            this.connectionCallback = connectionCallback;
        }

        public void connectionLost(StreamingConnection streamingConnection, final Exception exc) {
            NATSSource.log.error("Exception occurred in Siddhi App" + NATSSource.this.siddhiAppName + " when consuming messages from NATS endpoint " + NATSSource.this.natsUrl + " . " + exc.getMessage(), exc);
            new Thread() { // from class: io.siddhi.extension.io.nats.source.NATSSource.NATSConnectionLostHandler.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    NATSConnectionLostHandler.this.connectionCallback.onError(new ConnectionUnavailableException(exc));
                }
            }.start();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/siddhi/extension/io/nats/source/NATSSource$NATSSourceState.class */
    public class NATSSourceState extends State {
        private AtomicInteger lastSentSequenceNo = new AtomicInteger(0);

        NATSSourceState() {
        }

        public boolean canDestroy() {
            return this.lastSentSequenceNo.intValue() == 0;
        }

        public Map<String, Object> snapshot() {
            HashMap hashMap = new HashMap();
            hashMap.put(NATSSource.this.siddhiAppName, Integer.valueOf(this.lastSentSequenceNo.get()));
            return hashMap;
        }

        public void restore(Map<String, Object> map) {
            Object obj = map.get(NATSSource.this.siddhiAppName);
            if (obj == null || NATSSource.this.sequenceNumber != null) {
                return;
            }
            this.lastSentSequenceNo.set(((Integer) obj).intValue());
        }
    }

    public StateFactory<NATSSourceState> init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] strArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.sourceEventListener = sourceEventListener;
        this.optionHolder = optionHolder;
        this.siddhiAppName = siddhiAppContext.getName();
        initNATSProperties();
        return () -> {
            return new NATSSourceState();
        };
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{String.class, Map.class};
    }

    public void connect(Source<NATSSourceState>.ConnectionCallback connectionCallback, NATSSourceState nATSSourceState) throws ConnectionUnavailableException {
        try {
            this.streamingConnection = new StreamingConnectionFactory(new Options.Builder().natsUrl(this.natsUrl).clientId(this.clientId).clusterId(this.clusterId).connectionLostHandler(new NATSConnectionLostHandler(connectionCallback)).build()).createConnection();
            subscribe(nATSSourceState);
        } catch (IOException e) {
            log.error("Error while connecting to NATS server at destination: " + this.destination);
            throw new ConnectionUnavailableException("Error while connecting to NATS server at destination: " + this.destination, e);
        } catch (InterruptedException e2) {
            log.error("Error while connecting to NATS server at destination: " + this.destination + ".The calling thread is interrupted before the connection can be established.");
            throw new ConnectionUnavailableException("Error while connecting to NATS server at destination: " + this.destination + " .The calling thread is interrupted before the connection can be established.", e2);
        }
    }

    public void disconnect() {
        try {
            if (this.streamingConnection != null) {
                this.streamingConnection.close();
            }
        } catch (IOException | InterruptedException | TimeoutException e) {
            log.error("Error disconnecting the Stan receiver", e);
        }
    }

    public void destroy() {
    }

    public void pause() {
        if (this.natsMessageProcessor != null) {
            this.natsMessageProcessor.pause();
            if (log.isDebugEnabled()) {
                log.debug("Nats source paused for destination: " + this.destination);
            }
        }
    }

    public void resume() {
        if (this.natsMessageProcessor != null) {
            this.natsMessageProcessor.resume();
            if (log.isDebugEnabled()) {
                log.debug("Nats source resumed for destination: " + this.destination);
            }
        }
    }

    private void subscribe(NATSSourceState nATSSourceState) {
        SubscriptionOptions.Builder builder = new SubscriptionOptions.Builder();
        if (this.sequenceNumber != null && nATSSourceState.lastSentSequenceNo.intValue() < Integer.parseInt(this.sequenceNumber)) {
            nATSSourceState.lastSentSequenceNo.set(Integer.parseInt(this.sequenceNumber));
        }
        builder.startAtSequence(nATSSourceState.lastSentSequenceNo.get());
        try {
            if (this.durableName != null) {
                builder.durableName(this.durableName);
            }
            this.natsMessageProcessor = new NATSMessageProcessor(this.sourceEventListener, nATSSourceState.lastSentSequenceNo);
            if (this.queueGroupName != null) {
                this.subscription = this.streamingConnection.subscribe(this.destination, this.queueGroupName, this.natsMessageProcessor, builder.build());
            } else {
                this.subscription = this.streamingConnection.subscribe(this.destination, this.natsMessageProcessor, builder.build());
            }
        } catch (IOException e) {
            log.error("Error occurred in initializing the NATS receiver for stream: " + this.sourceEventListener.getStreamDefinition().getId());
            throw new NATSInputAdaptorRuntimeException("Error occurred in initializing the NATS receiver for stream: " + this.sourceEventListener.getStreamDefinition().getId(), e);
        } catch (InterruptedException e2) {
            log.error("Error occurred in initializing the NATS receiver for stream: " + this.sourceEventListener.getStreamDefinition().getId() + ".The calling thread is interrupted before the connection completes.");
            throw new NATSInputAdaptorRuntimeException("Error occurred in initializing the NATS receiver for stream: " + this.sourceEventListener.getStreamDefinition().getId() + ".The calling thread is interrupted before the connection completes.", e2);
        } catch (TimeoutException e3) {
            log.error("Error occurred in initializing the NATS receiver for stream: " + this.sourceEventListener.getStreamDefinition().getId() + ".The server request cannot be completed within the subscription timeout.");
            throw new NATSInputAdaptorRuntimeException("Error occurred in initializing the NATS receiver for stream: " + this.sourceEventListener.getStreamDefinition().getId() + ".The server request cannot be completed within the subscription timeout.", e3);
        }
    }

    private void initNATSProperties() {
        this.destination = this.optionHolder.validateAndGetStaticValue(NATSConstants.DESTINATION);
        this.clusterId = this.optionHolder.validateAndGetStaticValue(NATSConstants.CLUSTER_ID, NATSConstants.DEFAULT_CLUSTER_ID);
        this.clientId = this.optionHolder.validateAndGetStaticValue(NATSConstants.CLIENT_ID, NATSUtils.createClientId());
        this.natsUrl = this.optionHolder.validateAndGetStaticValue(NATSConstants.BOOTSTRAP_SERVERS, NATSConstants.DEFAULT_SERVER_URL);
        if (this.optionHolder.isOptionExists(NATSConstants.DURABLE_NAME)) {
            this.durableName = this.optionHolder.validateAndGetStaticValue(NATSConstants.DURABLE_NAME);
        }
        if (this.optionHolder.isOptionExists(NATSConstants.QUEUE_GROUP_NAME)) {
            this.queueGroupName = this.optionHolder.validateAndGetStaticValue(NATSConstants.QUEUE_GROUP_NAME);
        }
        if (this.optionHolder.isOptionExists(NATSConstants.SUBSCRIPTION_SEQUENCE)) {
            this.sequenceNumber = this.optionHolder.validateAndGetStaticValue(NATSConstants.SUBSCRIPTION_SEQUENCE);
        }
        NATSUtils.validateNatsUrl(this.natsUrl, this.sourceEventListener.getStreamDefinition().getId());
    }

    public /* bridge */ /* synthetic */ void connect(Source.ConnectionCallback connectionCallback, State state) throws ConnectionUnavailableException {
        connect((Source<NATSSourceState>.ConnectionCallback) connectionCallback, (NATSSourceState) state);
    }
}
