ballerina/rabbitmq module

Module overview

'ballerina/rabbitmq' provides the capability to connect with a RabbitMQ server and perform the following

  • Point to point communication (Queues)
  • Pub/Sub (Topics)

Samples

RabbitMQ Producer

Following program will produce a message to a RabbitMQ server

import ballerina/rabbitmq;
import ballerina/log;

public function main() {
     rabbitmq:Channel chann = new({ host: "localhost", port: 5672 });
     var sendResult = chann->basicPublish("Hello from ballerina", "testingDemo", exchange = "");
     if (sendResult is error) {
          log:printError("An error occurred while sending the message");
     } else {
          log:printInfo("The message was sent successfully");
     }
}

RabbitMQ Subscriber

Following program will consume a message from a RabbitMQ server

import ballerina/rabbitmq;
import ballerina/log;

listener rabbitmq:ChannelListener chann = new({ host: "localhost", port: 5672 });

@rabbitmq:ServiceConfig {
        queueName: "testingDemo"
}
service testSimpleConsumer on chann {
    resource function onMessage(string message) {
            log:printInfo("The message received: " + message);
    }
}

Type Definitions

Type Values Description
ExchangeType topic | headers | fanout | direct

Types of exchanges supported by the Ballerina RabbitMQ Connector.

Annotations

Name Attachement Points Data Type Description
ServiceConfig service RabbitMQServiceConfig

Service descriptor data generated at compile time.

Records Summary

Record Description
ConnectionConfiguration Holds the parameters used to create a RabbitMQ `Connection`.
ExchangeConfiguration Holds the parameters used to declare an exchange.
QueueConfiguration Holds the parameters used to declare a queue.
RabbitMQServiceConfig Represents the list of parameters required to create a subscription.

Objects Summary

Object Description
ChannelListener

Public Ballerina API - Ballerina RabbitMQ Message Listener. To provide a listener to consume messages from RabbitMQ.

Connection

Public Ballerina API - Interface to an AMQ Connection.

Endpoints Summary

Endpoint Description
Channel

Public Ballerina API - Ballerina interface to an AMQP Channel. To provide AMQ Channel related functionalities.

Constants

Name Data Type Value Description
DIRECT_EXCHANGE direct

Constant for the RabbitMQ Direct Exchange type.

FANOUT_EXCHANGE fanout

Constant for the RabbitMQ Fanout Exchange type.

TOPIC_EXCHANGE topic

Constant for the RabbitMQ Topic Exchange type.

RABBITMQ_ERROR_CODE string {ballerina/RabbitMQ}RabbitMQError

RabbitMQ Error code.

public type ConnectionConfiguration record

Holds the parameters used to create a RabbitMQ `Connection`.

Field Name Data Type Default Value Description
host string

The host used for establishing the connection.

port int 5672

The port used for establishing the connection.

username string? ()

The username used for establishing the connection.

password string? ()

The password used for establishing the connection.

connectionTimeout int? ()

Connection TCP establishment timeout in milliseconds; zero for infinite.

handshakeTimeout int? ()

The AMQP 0-9-1 protocol handshake timeout, in milliseconds.

shutdownTimeout int? ()

Shutdown timeout in milliseconds; zero for infinite; default 10000. If consumers exceed this timeout then any remaining queued deliveries (and other Consumer callbacks) will be lost.

heartbeat int? ()

The initially requested heartbeat timeout, in seconds; zero for none.

public type ExchangeConfiguration record

Holds the parameters used to declare an exchange.

Field Name Data Type Default Value Description
exchangeName string

The name of the exchange.

exchangeType direct|fanout|topic|headers DIRECT_EXCHANGE

The type of the exchange.

durable boolean false

True if declaring a durable exchange (the exchange will survive a server restart).

public type QueueConfiguration record

Holds the parameters used to declare a queue.

Field Name Data Type Default Value Description
queueName string

The name of the queue, if not specified then autogenerated.

durable boolean false

True if declaring a durable queue (the queue will survive a server restart).

exclusive boolean false

True if we are declaring an exclusive queue (restricted to this connection).

autoDelete boolean true

True if we are declaring an autodelete queue (server will delete it when no longer in use).

public type RabbitMQServiceConfig record

Represents the list of parameters required to create a subscription.

Field Name Data Type Default Value Description
queueConfig rabbitmq:QueueConfiguration

Specifies configuration details about the queue to be subscribed to.

public type ChannelListener object

Public Ballerina API - Ballerina RabbitMQ Message Listener. To provide a listener to consume messages from RabbitMQ.

  • <ChannelListener> __init(rabbitmq:ConnectionConfiguration|rabbitmq:Connection connectionOrConnectionConfig)

    Initializes a Ballerina ChannelListener object with the given Connection object or connection parameters. Creates a Connection object if only the connection configuration is given.

    Parameter Name Data Type Default Value Description
    connectionOrConnectionConfig rabbitmq:ConnectionConfiguration|rabbitmq:Connection

    Holds a Ballerina RabbitMQ Connection object or the connection parameters.

  • <ChannelListener> __start() returns (error<>|null)

    Starts the endpoint. Function is ignored by the ChannelListener.

    Return Type Description
    error<>|null

    Nil or error upon failure to start.

  • <ChannelListener> __stop() returns (error<>|null)

    Stops consuming messages through ChannelListener endpoint.

    Return Type Description
    error<>|null

    Nil or error upon failure to close ChannelListener.

  • <ChannelListener> __attach(service serviceType, string? name) returns (error<>|null)

    Binds the ChannelListener to a service.

    Parameter Name Data Type Default Value Description
    serviceType service

    Type descriptor of the service to bind to.

    name string? ()

    Name of the service.

    Return Type Description
    error<>|null

    () or error upon failure to register listener.

public type Connection object

Public Ballerina API - Interface to an AMQ Connection.

  • <Connection> __init(rabbitmq:ConnectionConfiguration connectionConfiguration)

    Initializes a Ballerina RabbitMQ Connection object.

    Parameter Name Data Type Default Value Description
    connectionConfiguration rabbitmq:ConnectionConfiguration

    Holds connection parameters required to initialize the Connection.

  • <Connection> close(int? timeout) returns (error<>|null)

    Closes the RabbitMQ Connection and all it's Channels. It waits with a provided timeout for all the close operations to complete. When timeout is reached the socket is forced to close.

    Parameter Name Data Type Default Value Description
    timeout int? ()

    Timeout (in milliseconds) for completing all the close-related operations, use -1 for infinity.

    Return Type Description
    error<>|null

    An error if an I/O problem is encountered.

  • <Connection> isClosed() returns (boolean)

    Checks whether close was already called.

    Return Type Description
    boolean

    The value true if the Connection is already closed and false otherwise.

Endpoint Channel

Public Ballerina API - Ballerina interface to an AMQP Channel. To provide AMQ Channel related functionalities.

  • <Channel> queueDeclare(rabbitmq:QueueConfiguration? queueConfig) returns (string|error<>|null)

    Actively declare a server-named exclusive, autodelete, non-durable queue or queue with the given configurations.

    Parameter Name Data Type Default Value Description
    queueConfig rabbitmq:QueueConfiguration? ()

    Holds the paramters required to declare a queue.

    Return Type Description
    string|error<>|null

    Returns the name of the queue if autogenerated or nil if the queue was successfully generated with the given parameters. An error is returned if an I/O error is encountered.

  • <Channel> exchangeDeclare(rabbitmq:ExchangeConfiguration config) returns (error<>|null)

    Actively declare a non-autodelete, non-durable exchange with no extra arguments, If the arguments are specifed, then the exchange is declared accordingly.

    Parameter Name Data Type Default Value Description
    config rabbitmq:ExchangeConfiguration

    Holds parameters required to declare an exchange.

    Return Type Description
    error<>|null

    Returns an error if an I/O error is encountered or nil if successful.

  • <Channel> queueBind(string queueName, string exchangeName, string bindingKey) returns (error<>|null)

    Binds a queue to an exchange with the given binding key.

    Parameter Name Data Type Default Value Description
    queueName string

    Name of the queue.

    exchangeName string

    Name of the exchange.

    bindingKey string

    Binding key used to bind the queue to the exchange.

    Return Type Description
    error<>|null

    Returns an error if an I/O error is encountered or nil if successful.

  • <Channel> basicPublish(string message, string routingKey, string exchange) returns (error<>|null)

    Publishes a message. Publishing to a non-existent exchange will result in a channel-level protocol exception, which closes the channel.

    Parameter Name Data Type Default Value Description
    message string

    The message body.

    routingKey string

    The routing key.

    exchange string

    The name of the exchange the message is published to.

    Return Type Description
    error<>|null

    Returns an error if an I/O error is encountered or nil if successful.

  • <Channel> queueDelete(string queueName) returns (error<>|null)

    Deletes a queue, without regard for whether it is in use or has messages on it, If the paramters ifUnused or ifEmpty is given, the queue is checked before deleting.

    Parameter Name Data Type Default Value Description
    queueName string

    Name of the queue to be deleted.

    Return Type Description
    error<>|null

    Returns error if an I/O error is encountered or nil if successful.

  • <Channel> exchangeDelete(string exchange) returns (error<>|null)

    Deletes an exchange.

    Parameter Name Data Type Default Value Description
    exchange string

    The name of the exchange.

    Return Type Description
    error<>|null

    An I/O error if an error is encountered or nil otherwise.

  • <Channel> queuePurge(string queueName) returns (error<>|null)

    Purges the contents of the given queue.

    Parameter Name Data Type Default Value Description
    queueName string

    The name of the queue.

    Return Type Description
    error<>|null

    An error if an I/O error is encountered or nil if successful.