Module : nats

Module Overview

This module provides the capability to connect with NATS and NATS Streaming servers and performs the below functionalities.

Basic Usage

Setting up the connection

First step is setting up the connection with the NATS Basic/Streaming server. The following ways can be used to connect to a NATS Basic/Streaming server.

  1. Connect to a server using the URL
nats:Connection connection = new("nats://localhost:4222");
  1. Connect to one or more servers with a custom configuration
nats:Connection connection = new("nats://serverone:4222, nats://servertwo:4222",  config);

Publishing messages

Publishing messages is handled differently in the NATS Basic server and Streaming server. The 'ballerina/nats' module provides different APIs to publish messages to each server.

Publishing messages to the NATS basic server

Once connected, publishing is accomplished via one of the below two methods.

  1. Publish with the subject and the message content.
nats:Producer producer = new(connection);
nats:Error? result = producer->publish(subject, "hello world");
  1. Publish as a request that expects a reply.
nats:Producer producer = new(connection);
nats:Message|nats:Error reqReply = producer->request(subject, "hello world", 5000);
  1. Publish messages with a replyTo subject
nats:Producer producer = new(connection);
nats:Error? result = producer->publish(subject, <@untainted>message, 
                         replyToSubject);
  1. Publish messages with a replyTo callback service
nats:Producer producer = new(connection);
nats:Error? result = producer->publish(subject, <@untainted>message, 
                         replyToService);
service replyToService =
@nats:SubscriptionConfig {
    subject: "replySubject"
}
service {

    resource function onMessage(nats:Message msg, string data) {
    }

    resource function onError(nats:Message msg, nats:Error err) {
    }
};

Publishing messages to a NATS streaming server

Once connected to a streaming server, publishing messages is accomplished using the following method.

nats:StreamingProducer producer = new(connection);
string|error result = producer->publish(subject, "hello world");

Publish api supports the byte[], boolean, string, int, float, decimal, xml, json, record {} message types.

Listening to incoming messages

The Ballerina NATS module provides the following mechanisms to listen to messages. Similar to message publishing, listening to messages is also handled differently in the NATS basic and streaming servers.

Listening to messages from a NATS server

// Initializes the NATS listener.
listener nats:Listener subscription = new(connection);

// Binds the consumer to listen to the messages published to the 'demo' subject.
@nats:SubscriptionConfig {
    subject: "demo"
}
service demo on subscription {

    resource function onMessage(nats:Message msg, string data) {
    }

    resource function onError(nats:Message msg, nats:Error err) {
    }
}

Listening to messages from a Streaming server

// Initializes the NATS Streaming listener.
listener nats:StreamingListener subscription = new(conn, "test-cluster", "c1");

// Binds the consumer to listen to the messages published to the 'demo' subject.
@nats:StreamingSubscriptionConfig {
    subject: "demo"
}
service demo on subscription {

    resource function onMessage(nats:StreamingMessage msg, string data) {
    }

    resource function onError(nats:StreamingMessage msg, nats:Error err) {
    }

}

Advanced Usage

Using the TLS protocol

The Ballerina NATS module allows the use of the tls:// protocol in its URLs. This setting expects a secure socket to be set in the connection configuration as shown below.

nats:ConnectionConfig config = {
    secureSocket : {
        trustStore : {
            path: "nats-basic/keyStore.p12",
            password: "xxxxx"
        }
    }
};

// Initializes a connection.
nats:Connection connection = new("tls://localhost:4222", config = config);

Note: The default thread pool size used in Ballerina is the number of processors available * 2. You can configure the thread pool size by using the BALLERINA_MAX_POOL_SIZE environment variable.

For information on the operations, which you can perform with this module, see the below Functions.

For examples on the usage of the connector, see the following.

Records

ConnectionConfig Configurations related to creating a NATS streaming subscription.
Detail Holds the details of an error.
PendingLimits The configurations to set limits on the maximum number of messages or maximum size of messages this consumer will hold before it starts to drop new messages waiting for the resource functions to drain the queue.
SecureSocket Configurations related to facilitating a secure communication with a remote HTTP endpoint.
StreamingConfig Configuration related to establishing a streaming connection.
StreamingSubscriptionConfigData The configurations for the NATS streaming subscription.
SubscriptionConfigData The configurations for the NATS basic subscription.
T1 Anonymous record

Objects

Connection Represents a single network connection to the NATS server.
Message Represents the message, which a NATS server sends to its subscribed services.

Clients

Producer The producer provides the capability to publish messages to the NATS server.
StreamingMessage Represents the message a NATS Streaming Server sends to its subscribed services.
StreamingProducer The streaming producer provides the capability to publish messages to the NATS streaming server.

Listeners

Listener Represents the NATS server connection to which a subscription service should be bound in order to receive messages of the corresponding subscription.
StreamingListener Represents the NATS streaming server connection to which a subscription service should be bound in order to receive messages of the corresponding subscription.

Constants

DEFAULT_URL Default URL for NATS connections.
NATS_ERROR Represents the reason for the NATS module related errors.
NEW_ONLY Specifies that message delivery should start with the messages, which are published after the subscription is created.
LAST_RECEIVED Specifies that message delivery should start with the last (most recent) message stored for this subject.
FIRST Specifies that message delivery should begin at the oldest available message for this subject.
TIME_DELTA_START The key for the TimeDeltaStart type.
SEQUENCE_NUMBER The key for the SequenceNumber type.

Annotations

StreamingSubscriptionConfig The annotation, which is used to configure the streaming subscription.
SubscriptionConfig The annotation, which is used to configure the basic subscription.

Types

Content Data types supported when publishing and consuming messages.
StartPosition Specifies the position to start receiving messages.

Errors

Error Represents NATS module related errors.