ballerina/rabbitmq module
Module overview
'ballerina/rabbitmq' provides the capability to connect with a RabbitMQ server and perform the following
- Point to point communication (Queues)
- Pub/Sub (Topics)
Samples
RabbitMQ Producer
Following program will produce a message to a RabbitMQ server
import ballerina/rabbitmq;
import ballerina/log;
public function main() {
rabbitmq:Channel chann = new({ host: "localhost", port: 5672 });
var sendResult = chann->basicPublish("Hello from ballerina", "testingDemo", exchange = "");
if (sendResult is error) {
log:printError("An error occurred while sending the message");
} else {
log:printInfo("The message was sent successfully");
}
}
RabbitMQ Subscriber
Following program will consume a message from a RabbitMQ server
import ballerina/rabbitmq;
import ballerina/log;
listener rabbitmq:ChannelListener chann = new({ host: "localhost", port: 5672 });
@rabbitmq:ServiceConfig {
queueName: "testingDemo"
}
service testSimpleConsumer on chann {
resource function onMessage(string message) {
log:printInfo("The message received: " + message);
}
}
Type Definitions
Type | Values | Description | |
---|---|---|---|
ExchangeType | topic | headers | fanout | direct | Types of exchanges supported by the Ballerina RabbitMQ Connector. |
Annotations
Name | Attachement Points | Data Type | Description |
---|---|---|---|
ServiceConfig | service | RabbitMQServiceConfig | Service descriptor data generated at compile time. |
Records Summary
Record | Description | ||
---|---|---|---|
ConnectionConfiguration | Holds the parameters used to create a RabbitMQ `Connection`. | ||
ExchangeConfiguration | Holds the parameters used to declare an exchange. | ||
QueueConfiguration | Holds the parameters used to declare a queue. | ||
RabbitMQServiceConfig | Represents the list of parameters required to create a subscription. |
Objects Summary
Object | Description | ||
---|---|---|---|
ChannelListener | Public Ballerina API - Ballerina RabbitMQ Message Listener. To provide a listener to consume messages from RabbitMQ. |
||
Connection | Public Ballerina API - Interface to an AMQ |
Endpoints Summary
Endpoint | Description | ||
---|---|---|---|
Channel | Public Ballerina API - Ballerina interface to an AMQP |
Constants
Name | Data Type | Value | Description | |
---|---|---|---|---|
DIRECT_EXCHANGE | direct | Constant for the RabbitMQ Direct Exchange type. |
||
FANOUT_EXCHANGE | fanout | Constant for the RabbitMQ Fanout Exchange type. |
||
TOPIC_EXCHANGE | topic | Constant for the RabbitMQ Topic Exchange type. |
||
RABBITMQ_ERROR_CODE | string | {ballerina/RabbitMQ}RabbitMQError | RabbitMQ Error code. |
public type ConnectionConfiguration record
Holds the parameters used to create a RabbitMQ `Connection`.
Field Name | Data Type | Default Value | Description |
---|---|---|---|
host | string | The host used for establishing the connection. |
|
port | int | 5672 | The port used for establishing the connection. |
username | string? | () | The username used for establishing the connection. |
password | string? | () | The password used for establishing the connection. |
connectionTimeout | int? | () | Connection TCP establishment timeout in milliseconds; zero for infinite. |
handshakeTimeout | int? | () | The AMQP 0-9-1 protocol handshake timeout, in milliseconds. |
shutdownTimeout | int? | () | Shutdown timeout in milliseconds; zero for infinite; default 10000. If consumers exceed this timeout then any remaining queued deliveries (and other Consumer callbacks) will be lost. |
heartbeat | int? | () | The initially requested heartbeat timeout, in seconds; zero for none. |
public type ExchangeConfiguration record
Holds the parameters used to declare an exchange.
Field Name | Data Type | Default Value | Description |
---|---|---|---|
exchangeName | string | The name of the exchange. |
|
exchangeType | direct|fanout|topic|headers | DIRECT_EXCHANGE | The type of the exchange. |
durable | boolean | false | True if declaring a durable exchange (the exchange will survive a server restart). |
public type QueueConfiguration record
Holds the parameters used to declare a queue.
Field Name | Data Type | Default Value | Description |
---|---|---|---|
queueName | string | The name of the queue, if not specified then autogenerated. |
|
durable | boolean | false | True if declaring a durable queue (the queue will survive a server restart). |
exclusive | boolean | false | True if we are declaring an exclusive queue (restricted to this connection). |
autoDelete | boolean | true | True if we are declaring an autodelete queue (server will delete it when no longer in use). |
public type RabbitMQServiceConfig record
Represents the list of parameters required to create a subscription.
Field Name | Data Type | Default Value | Description |
---|---|---|---|
queueConfig | rabbitmq:QueueConfiguration | Specifies configuration details about the queue to be subscribed to. |
public type ChannelListener object
Public Ballerina API - Ballerina RabbitMQ Message Listener. To provide a listener to consume messages from RabbitMQ.
-
<ChannelListener> __init(rabbitmq:ConnectionConfiguration|rabbitmq:Connection connectionOrConnectionConfig)
Initializes a Ballerina ChannelListener object with the given Connection object or connection parameters. Creates a Connection object if only the connection configuration is given.
Parameter Name Data Type Default Value Description connectionOrConnectionConfig rabbitmq:ConnectionConfiguration|rabbitmq:Connection Holds a Ballerina RabbitMQ Connection object or the connection parameters.
-
<ChannelListener> __start() returns (error<>|null)
Starts the endpoint. Function is ignored by the ChannelListener.
Return Type Description error<>|null Nil or error upon failure to start.
-
<ChannelListener> __stop() returns (error<>|null)
Stops consuming messages through ChannelListener endpoint.
Return Type Description error<>|null Nil or error upon failure to close ChannelListener.
-
<ChannelListener> __attach(service serviceType, string? name) returns (error<>|null)
Binds the ChannelListener to a service.
Parameter Name Data Type Default Value Description serviceType service Type descriptor of the service to bind to.
name string? () Name of the service.
Return Type Description error<>|null () or error upon failure to register listener.
public type Connection object
Public Ballerina API - Interface to an AMQ Connection
.
-
<Connection> __init(rabbitmq:ConnectionConfiguration connectionConfiguration)
Initializes a Ballerina RabbitMQ
Connection
object.Parameter Name Data Type Default Value Description connectionConfiguration rabbitmq:ConnectionConfiguration Holds connection parameters required to initialize the
Connection
. -
<Connection> close(int? timeout) returns (error<>|null)
Closes the RabbitMQ
Connection
and all it'sChannel
s. It waits with a provided timeout for all the close operations to complete. When timeout is reached the socket is forced to close.Parameter Name Data Type Default Value Description timeout int? () Timeout (in milliseconds) for completing all the close-related operations, use -1 for infinity.
Return Type Description error<>|null An
error
if an I/O problem is encountered. -
<Connection> isClosed() returns (boolean)
Checks whether
close
was already called.Return Type Description boolean The value
true
if theConnection
is already closed andfalse
otherwise.
Endpoint Channel
Public Ballerina API - Ballerina interface to an AMQP Channel
.
To provide AMQ Channel
related functionalities.
-
<Channel> queueDeclare(rabbitmq:QueueConfiguration? queueConfig) returns (string|error<>|null)
Actively declare a server-named exclusive, autodelete, non-durable queue or queue with the given configurations.
Parameter Name Data Type Default Value Description queueConfig rabbitmq:QueueConfiguration? () Holds the paramters required to declare a queue.
Return Type Description string|error<>|null Returns the name of the queue if autogenerated or nil if the queue was successfully generated with the given parameters. An error is returned if an I/O error is encountered.
-
<Channel> exchangeDeclare(rabbitmq:ExchangeConfiguration config) returns (error<>|null)
Actively declare a non-autodelete, non-durable exchange with no extra arguments, If the arguments are specifed, then the exchange is declared accordingly.
Parameter Name Data Type Default Value Description config rabbitmq:ExchangeConfiguration Holds parameters required to declare an exchange.
Return Type Description error<>|null Returns an error if an I/O error is encountered or nil if successful.
-
<Channel> queueBind(string queueName, string exchangeName, string bindingKey) returns (error<>|null)
Binds a queue to an exchange with the given binding key.
Parameter Name Data Type Default Value Description queueName string Name of the queue.
exchangeName string Name of the exchange.
bindingKey string Binding key used to bind the queue to the exchange.
Return Type Description error<>|null Returns an error if an I/O error is encountered or nil if successful.
-
<Channel> basicPublish(string message, string routingKey, string exchange) returns (error<>|null)
Publishes a message. Publishing to a non-existent exchange will result in a channel-level protocol exception, which closes the channel.
Parameter Name Data Type Default Value Description message string The message body.
routingKey string The routing key.
exchange string The name of the exchange the message is published to.
Return Type Description error<>|null Returns an error if an I/O error is encountered or nil if successful.
-
<Channel> queueDelete(string queueName) returns (error<>|null)
Deletes a queue, without regard for whether it is in use or has messages on it, If the paramters ifUnused or ifEmpty is given, the queue is checked before deleting.
Parameter Name Data Type Default Value Description queueName string Name of the queue to be deleted.
Return Type Description error<>|null Returns error if an I/O error is encountered or nil if successful.
-
<Channel> exchangeDelete(string exchange) returns (error<>|null)
Deletes an exchange.
Parameter Name Data Type Default Value Description exchange string The name of the exchange.
Return Type Description error<>|null An I/O error if an error is encountered or nil otherwise.
-
<Channel> queuePurge(string queueName) returns (error<>|null)
Purges the contents of the given queue.
Parameter Name Data Type Default Value Description queueName string The name of the queue.
Return Type Description error<>|null An error if an I/O error is encountered or nil if successful.