Module : rabbitmq
Module overview
This module contains functionality to support messaging with RabbitMQ broker (AMQP 0-9-1). RabbitMQ is one of the most popular open-source enterprise messaging systems, modeled on the Advanced Message Queuing Protocol (AMQP) standard.
Samples
RabbitMQ Producer
Following program will produce a message to a RabbitMQ server
import ballerina/io;
import ballerina/rabbitmq;
public function main() {
// Creates a ballerina RabbitMQ connection that allows re-usability if necessary.
rabbitmq:Connection connection = new({ host: "localhost", port: 5672 });
// Creates multiple ballerina RabbitMQ channels.
rabbitmq:Channel newChannel1 = new(connection);
rabbitmq:Channel newChannel2 = new(connection);
// Declares the queue, MyQueue1.
var queueResult1 = newChannel1->queueDeclare({ queueName: "MyQueue1" });
if (queueResult1 is rabbitmq:Error) {
io:println("An error occurred while creating the MyQueue1 queue.");
}
// Declares the queue, MyQueue2.
var queueResult2 = newChannel2->queueDeclare({ queueName: "MyQueue2" });
if (queueResult2 is rabbitmq:Error) {
io:println("An error occurred while creating the MyQueue2 queue.");
}
// Publishing messages to an exchange using a routing key.
// Publishes the message using newChannel1 and the routing key named MyQueue1.
worker w1 {
var sendResult = newChannel1->basicPublish("Hello from Ballerina", "MyQueue1");
if (sendResult is rabbitmq:Error) {
io:println("An error occurred while sending the message to
MyQueue1 using newChannel1.");
}
}
// Publishing messages to the same routing key using a different channel.
// Publishes the message using newChannel2 and the same routing key named MyQueue1.
worker w2 {
var sendResult = newChannel2->basicPublish("Hello from Ballerina", "MyQueue1");
if (sendResult is rabbitmq:Error) {
io:println("An error occurred while sending the message to
MyQueue1 using newChannel2.");
}
}
// Publishing messages to different routing keys using the same channel.
// Publishes the message using newChannel1 to a different routing key named MyQueue2.
worker w3 {
var sendResult = newChannel1->basicPublish("Hello from Ballerina", "MyQueue2");
if (sendResult is rabbitmq:Error) {
io:println("An error occurred while sending the message to
MyQueue2 using newChannel1.");
}
}
_ = wait {w1, w2, w3};
}
RabbitMQ Subscriber
Following program will consume a message from a RabbitMQ server
import ballerina/log;
import ballerina/rabbitmq;
// Creates a ballerina RabbitMQ connection that allows re-usability if necessary.
rabbitmq:Connection connection = new({ host: "localhost", port: 5672 });
listener rabbitmq:Listener channelListener= new(connection);
// The consumer service listens to the "MyQueue" queue.
// The `ackMode` is by default rabbitmq:AUTO_ACK where messages are acknowledged
// immediately after consuming.
@rabbitmq:ServiceConfig {
queueConfig: {
queueName: "MyQueue"
}
}
// Attaches the service to the listener.
service rabbitmqConsumer on channelListener {
// Gets triggered when a message is received by the queue.
resource function onMessage(rabbitmq:Message message) {
// Retrieves the text content of the message.
var messageContent = message.getTextContent();
if (messageContent is string) {
log:printInfo("The message received: " + messageContent);
} else {
log:printError("Error occurred while retrieving the message content.");
}
}
}
Note: The default thread pool size used in Ballerina is number of processors available * 2. You can configure the thread pool size by using the
BALLERINA_MAX_POOL_SIZE
environment variable.
BasicProperties | Holds other properties of the message - routing headers etc. |
ConnectionConfiguration | Holds the parameters used to create a RabbitMQ `Connection`. |
ExchangeConfiguration | Holds the parameters used to declare an exchange. |
QueueConfiguration | Holds the parameters used to declare a queue. |
RabbitMQServiceConfig | Represents the list of parameters required to create a subscription. |
SecureSocket | Provides configurations for facilitating secure connections. |
Connection | Public Ballerina API - Interface to an AMQ |
Channel | Public Ballerina API - Ballerina interface to an AMQP |
Message | Public Ballerina API - Ballerina RabbitMQ Message. |
Listener | Public Ballerina API - Ballerina RabbitMQ Message Listener. To provide a listener to consume messages from RabbitMQ. |
DIRECT_EXCHANGE | Constant for the RabbitMQ Direct Exchange type. |
FANOUT_EXCHANGE | Constant for the RabbitMQ Fanout Exchange type. |
TOPIC_EXCHANGE | Constant for the RabbitMQ Topic Exchange type. |
AUTO_ACK | Constant for the RabbitMQ auto acknowledgement mode. |
CLIENT_ACK | Constant for the RabbitMQ client acknowledgement mode. |
RABBITMQ_ERROR |
ServiceConfig | Service descriptor data generated at compile time. |
AcknowledgementMode | Types of acknowledgement modes supported by the Ballerina RabbitMQ Connector. |
MessageContent | Holds the types of message content that can be published. |
Error |