Module : rabbitmq

Module overview

RabbitMQ is one of the most popular open-source enterprise messaging systems modelled on the Advanced Message Queuing Protocol (AMQP) standard. This guide covers the Ballerina RabbitMQ module and its public API. This module contains the functionality to support messaging with RabbitMQ broker (AMQP 0-9-1). It assumes that the most recent version of Ballerina is used and is built on top of the basics.

Key sections include:

Connecting to RabbitMQ

The core APIs are Connection, Channel, and Message representing an AMQP 0-9-1 Connection, a Channel, and a Ballerina RabbitMQ Message respectively. The following code connects to a RabbitMQ node using the given parameters (host name, port number, etc.):

   rabbitmq:Connection newConnection = new({ host: "localhost", 
                                            port: 5672, 
                                            username: “guest”, 
                                            password: “guest” });

The Connection created above can then be used to open a Channel.

   rabbitmq:Channel newChannel = new(connection);

The Channel can now be used to send and receive messages as described in the subsequent sections.

Enabling TLS

It is possible to encrypt the communication between the Ballerina client and the broker by using TLS-enabled connections. Client and server authentication (peer verification) is also supported. To enable the TLS support in the RabbitMQ broker, the node has to be configured to know the location of the Certificate Authority bundle, the server's certificate file, and the server's key. A TLS listener should also be configured to know what port to listen to for TLS-enabled client connections.

Connecting to a TLS-enabled RabbitMQ node using the Ballerina client can be done by passing a SecureSocket record with the appropriate values to the ConnectionConfiguration record when initializing the connection.

   rabbitmq:Connection connection = new({ host: "localhost", 
                                    	    port: 5671, 
                                            secureSocket: { trustStore: { path: "/path/to/trustStore",
                                                                        password: "rabbitstore" },
                                                          keystore: { path: "/path/to/client_key.p12",
                                                                      password: "MySecretPassword" },
                                                          verifyHostname: true }});

Disconnecting from RabbitMQ

To disconnect, simply close the open channels and the connections:

   newChannel.close();
   newConnection.close();

Note: Closing the Channel may be a good practice. However, it isn’t strictly necessary in this case as it will be done automatically when the underlying Connection is closed.

Using exchanges and queues

Client applications work with exchanges and queues, which are the high-level building blocks of the AMQP protocol. These must be declared before they can be used. The following code declares an exchange and a server-named queue and then binds them together.

   var exchangeResult = newChannel->exchangeDeclare({ exchangeName: "MyExchange",
                                                      exchangeType: rabbitmq:DIRECT_EXCHANGE,
                                                      durable: true,
                                                      autoDelete: true });
   if (exchangeResult is error) {
        io:println("An error occurred while declaring the exchange");
   }
   
   var queueResult = newChannel->queueDeclare();
   if (queueResult is string) {
   
        var bindResult = newChannel.queueBind(queueResult, "MyExchange", "routing-key");
        if (bindResult is error) {
            io:println("Error ocurred while binding the queue to the exhchange");
        }
   }
   else {
        io:println("An error occurred while creating the queue.");
   } 

This sample code will declare,

Next, the above function calls bind the queue to the exchange with the given routing key.

   var exchangeResult = newChannel->exchangeDeclare({ exchangeName: "MyExchange",
                                                      exchangeType: rabbitmq:DIRECT_EXCHANGE,
                                                      durable: true,
                                                      autoDelete: true });
   if (exchangeResult is error) {
        io:println("An error occurred while declaring the exchange");
   }
   
   var queueResult = newChannel->queueDeclare({ queueName: "MyQueue", 
                                                durable: true,
                                                exclusive: false,
                                                autoDelete: false });
   if (queueResult is error) {
    io:println("An error occurred while creating the MyQueue queue.");
   }

    var bindResult = newChannel.queueBind("MyQueue", "MyExchange", "routing-key");
        if (bindResult is error) {
            io:println("Error ocurred while binding the queue to the exhchange");
        }

This sample code will declare,

Deleting entities and purging queues

   newChannel->queueDelete("MyQueue");
   newChannel->queueDelete("MyQueue", false, true);
   newChannel->queueDelete("MyQueue", true, false);
   newChannel->exchangeDelete("MyExchange");
   newChannel->queuePurge("MyQueue");

Publishing messages

To publish a message to an exchange, use the basicPublish() function as follows:

   var sendResult = newChannel->basicPublish("Hello from Ballerina", "MyQueue");
   if (sendResult is error) {
        io:println("An error occurred while sending the message to MyQueue using newChannel.");
   }

Setting other properties of the message such as routing headers can be done by using the BasicProperties record with the appropriate values.

Consuming messages using consumer services

The most efficient way to receive messages is to set up a subscription using a Ballerina RabbitMQ Listener and any number of consumer services. The messages will then be delivered automatically as they arrive rather than having to be explicitly requested.

Multiple consumer services can be bound to one Ballerina RabbitMQ Listener. The queue, which the service is listening to is configured in the ServiceConfig annotation of the service.

listener rabbitmq:Listener channelListener= new(newConnection);

@rabbitmq:ServiceConfig {
    queueConfig: {
        queueName: "MyQueue"
    }
}
service rabbitmqConsumer on channelListener {
    resource function onMessage(rabbitmq:Message message) {
        var messageContent = message.getTextContent();
        if (messageContent is string) {
            log:printInfo("The message received: " + messageContent);
        } else {
            log:printError("Error occurred while retrieving the message content.");
        }
    }
}

The Message object received can be used to retrieve its contents and for manual client acknowledgements.

Client acknowledgements

The message consuming is supported by mainly two types of acknowledgement modes, which are auto acknowledgements and client acknowledgements. Client acknowledgements can further be divided into to two different types as positive and negative acknowledgements. The default acknowledgement mode is auto-ack (messages are acknowledged immediately after consuming).

WARNING: To ensure the reliability of receiving messages, use the client-ack mode.

The negatively-acknowledged (rejected) messages can be re-queued.

Note: The default thread pool size used in Ballerina is number of processors available * 2. You can configure the thread pool size by using the BALLERINA_MAX_POOL_SIZE environment variable.

Records

BasicProperties Holds other properties of the message - routing headers etc.
ConnectionConfiguration Holds the parameters used to create a RabbitMQ `Connection`.
Detail Record type to hold the details of an error.
ExchangeConfiguration Holds the parameters used to declare an exchange.
QueueConfiguration Holds the parameters used to declare a queue.
RabbitMQServiceConfig Represents the list of parameters required to create a subscription.
SecureSocket Provides configurations for facilitating secure connections.

Objects

Connection

Represents a single network connection to the RabbitMQ broker.

Clients

Channel

Ballerina interface to provide AMQP Channel related functionality.

Message

Provides the functionality to manipulate the messages received by the consumer services.

Listeners

Listener

Ballerina RabbitMQ Message Listener. Provides a listener to consume messages from the RabbitMQ server.

Constants

DIRECT_EXCHANGE

Constant for the RabbitMQ Direct Exchange type.

FANOUT_EXCHANGE

Constant for the RabbitMQ Fan-out Exchange type.

TOPIC_EXCHANGE

Constant for the RabbitMQ Topic Exchange type.

AUTO_ACK

Constant for the RabbitMQ auto acknowledgement mode.

CLIENT_ACK

Constant for the RabbitMQ client acknowledgement mode.

RABBITMQ_ERROR

Represents the reason for the RabbitMQ module related errors.

Annotations

ServiceConfig

Service descriptor data generated at compile time.

Types

AcknowledgementMode

Types of acknowledgement modes supported by the Ballerina RabbitMQ Connector.

MessageContent

Holds the types of message content that can be published.

Errors

Error

Represents the RabbitMQ module related errors.