ballerina/jms package
Package overview
The ballerina/jms
package provides an API to connect to an external JMS provider like Ballerina Message Broker, ActiveMQ.
The package provides consumer and producer endpoint types for queues and topics. Following are the endpoint types supported by this package.
- QueueReceiver
- TopicSubscriber
- DurableTopicSubscriber
- SimpleQueueReceiver
- SimpleTopicSubscriber
- SimpleDurableTopicSubscriber
- QueueReceiver
- TopicSubscriber
- DurableTopicSubscriber
- SimpleQueueReceiver
- SimpleTopicSubscriber
- SimpleDurableTopicSubscriber
The endpoints prefixed with Simple
will automatically create a JMS connection and a JMS session when the endpoint is
initialized. For other endpoints the developer has explicitly provide a properly initialized JMS Sessions.
Samples
JMS Simple Queue Consumer.
Following is a simple listener program that consumes messages from a Ballerina Message Broker queue named MyQueue
.
import ballerina/jms;
import ballerina/log;
// Create a simple queue receiver.
endpoint jms:SimpleQueueReceiver consumer {
initialContextFactory: "bmbInitialContextFactory",
providerUrl: "amqp://admin:admin@ballerina/default?brokerlist='tcp://localhost:5672'",
acknowledgementMode: "AUTO_ACKNOWLEDGE",
queueName: "MyQueue"
};
// Bind the created consumer to the listener service.
service<jms:Consumer> jmsListener bind consumer {
// The `OnMessage` resource gets invoked when a message is received.
onMessage(endpoint consumer, jms:Message message) {
match (message.getTextMessageContent()) {
string messageText => log:printInfo("Message : " + messageText);
error e => log:printError("Error occurred while reading message", err = e);
}
}
}
JMS Simple Queue Producer.
Following is a simple queue sender program that sends messages to a Ballerina Message Broker queue named MyQueue
.
import ballerina/jms;
import ballerina/log;
// Create a topic publisher.
endpoint jms:SimpleTopicPublisher topicPublisher {
initialContextFactory: "bmbInitialContextFactory",
providerUrl: "amqp://admin:admin@ballerina/default?brokerlist='tcp://localhost:5672'",
acknowledgementMode: "AUTO_ACKNOWLEDGE",
topicPattern: "BallerinaTopic"
};
function main(string... args) {
// Create a Text message.
match (topicPublisher.createTextMessage("Hello from Ballerina")) {
error e => {
log:printError("Error occurred while creating message", err = e);
}
jms:Message msg => {
// Send the Ballerina message to the JMS provider.
topicPublisher->send(msg) but {
error e => log:printError("Error occurred while sending message", err = e)
};
}
}
}
JMS queue message receiver
Following is a listener program which explicitly initialize a JMS session to be used in the consumer.
import ballerina/jms;
import ballerina/log;
// Initialize a JMS connection with the provider.
jms:Connection conn = new({
initialContextFactory: "bmbInitialContextFactory",
providerUrl: "amqp://admin:admin@ballerina/default?brokerlist='tcp://localhost:5672'"
});
// Initialize a JMS session on top of the created connection.
jms:Session jmsSession = new(conn, {
// An optional property that defaults to `AUTO_ACKNOWLEDGE`.
acknowledgementMode: "AUTO_ACKNOWLEDGE"
});
// Initialize a queue receiver using the created session.
endpoint jms:QueueReceiver consumer {
session: jmsSession,
queueName: "MyQueue"
};
// Bind the created consumer to the listener service.
service<jms:Consumer> jmsListener bind consumer {
// The `OnMessage` resource gets invoked when a message is received.
onMessage(endpoint consumer, jms:Message message) {
// Retrieve the text message.
match (message.getTextMessageContent()) {
string messageText => log:printInfo("Message : " + messageText);
error e => log:printError("Error occurred while reading message", err = e);
}
}
}
JMS queue message producer
Following is a queue sender program which explicitly initialize a JMS session to be used in the producer.
import ballerina/jms;
import ballerina/log;
// Initialize a JMS connection with the provider.
jms:Connection jmsConnection = new({
initialContextFactory: "bmbInitialContextFactory",
providerUrl: "amqp://admin:admin@ballerina/default?brokerlist='tcp://localhost:5672'"
});
// Initialize a JMS session on top of the created connection.
jms:Session jmsSession = new(jmsConnection, {
acknowledgementMode: "AUTO_ACKNOWLEDGE"
});
endpoint jms:QueueSender queueSender {
session: jmsSession,
queueName: "MyQueue"
};
function main(string... args) {
// Create a Text message.
match (jmsSession.createTextMessage("Hello from Ballerina")) {
error e => {
log:printError("Error occurred while creating message", err = e);
}
jms:Message msg => {
// Send the Ballerina message to the JMS provider.
queueSender->send(msg) but { error e => log:printError("Error occurred while sending message", err = e) };
}
}
}
Records Summary
Objects Summary
public type ConnectionConfiguration
Field Name | Data Type | Default Value | Description |
---|---|---|---|
initialContextFactory | string | wso2mbInitialContextFactory | |
providerUrl | string | amqp://admin:admin@ballerina/default?brokerlist='tcp://localhost:5672' | |
connectionFactoryName | string | ConnectionFactory | |
properties | map |
public type ConsumerEndpointConfiguration
Field Name | Data Type | Default Value | Description |
---|---|---|---|
session | Session | ||
identifier | string |
public type DurableTopicSubscriberEndpointConfiguration
Field Name | Data Type | Default Value | Description |
---|---|---|---|
session | Session | ||
topicPattern | string | ||
messageSelector | string | ||
identifier | string |
public type QueueReceiverEndpointConfiguration
Field Name | Data Type | Default Value | Description |
---|---|---|---|
session | Session | ||
queueName | string | ||
messageSelector | string | ||
identifier | string |
public type QueueSenderEndpointConfiguration
Field Name | Data Type | Default Value | Description |
---|---|---|---|
session | Session | ||
queueName | string |
public type SessionConfiguration
Field Name | Data Type | Default Value | Description |
---|---|---|---|
acknowledgementMode | string | AUTO_ACKNOWLEDGE |
public type SimpleDurableTopicSubscriberEndpointConfiguration
Field Name | Data Type | Default Value | Description |
---|---|---|---|
initialContextFactory | string | bmbInitialContextFactory | |
providerUrl | string | amqp://admin:admin@ballerina/default?brokerlist='tcp://localhost:5672' | |
connectionFactoryName | string | ConnectionFactory | |
acknowledgementMode | string | AUTO_ACKNOWLEDGE | |
identifier | string | ||
properties | map | ||
messageSelector | string | ||
topicPattern | string |
public type SimpleQueueReceiverEndpointConfiguration
Field Name | Data Type | Default Value | Description |
---|---|---|---|
initialContextFactory | string | bmbInitialContextFactory | |
providerUrl | string | amqp://admin:admin@ballerina/default?brokerlist='tcp://localhost:5672' | |
connectionFactoryName | string | ConnectionFactory | |
acknowledgementMode | string | AUTO_ACKNOWLEDGE | |
messageSelector | string | ||
properties | map | ||
queueName | string |
public type SimpleQueueSenderEndpointConfiguration
Field Name | Data Type | Default Value | Description |
---|---|---|---|
initialContextFactory | string | bmbInitialContextFactory | |
providerUrl | string | amqp://admin:admin@ballerina/default?brokerlist='tcp://localhost:5672' | |
connectionFactoryName | string | ConnectionFactory | |
acknowledgementMode | string | AUTO_ACKNOWLEDGE | |
properties | map | ||
queueName | string |
public type SimpleTopicPublisherEndpointConfiguration
Field Name | Data Type | Default Value | Description |
---|---|---|---|
initialContextFactory | string | bmbInitialContextFactory | |
providerUrl | string | amqp://admin:admin@ballerina/default?brokerlist='tcp://localhost:5672' | |
connectionFactoryName | string | ConnectionFactory | |
acknowledgementMode | string | AUTO_ACKNOWLEDGE | |
properties | map | ||
topicPattern | string |
public type SimpleTopicSubscriberEndpointConfiguration
Field Name | Data Type | Default Value | Description |
---|---|---|---|
initialContextFactory | string | bmbInitialContextFactory | |
providerUrl | string | amqp://admin:admin@ballerina/default?brokerlist='tcp://localhost:5672' | |
connectionFactoryName | string | ConnectionFactory | |
acknowledgementMode | string | AUTO_ACKNOWLEDGE | |
messageSelector | string | ||
properties | map | ||
topicPattern | string |
public type TopicPublisherEndpointConfiguration
Field Name | Data Type | Default Value | Description |
---|---|---|---|
session | Session | ||
topicPattern | string |
public type TopicSubscriberEndpointConfiguration
Field Name | Data Type | Default Value | Description |
---|---|---|---|
session | Session | ||
topicPattern | string | ||
messageSelector | string | ||
identifier | string |
public type Connection object
Field Name | Data Type | Default Value | Description |
---|---|---|---|
config | ConnectionConfiguration |
-
<Connection> createConnection()
-
<Connection> start()
-
<Connection> stop()
public type Consumer object
-
<Consumer> getEndpoint() returns (ConsumerTemplate)
Return Type Description ConsumerTemplate
public type ConsumerActions object
public type ConsumerTemplate object
Field Name | Data Type | Default Value | Description |
---|---|---|---|
consumerActions | ConsumerActions | ||
config | ConsumerEndpointConfiguration |
-
<ConsumerTemplate> init(ConsumerEndpointConfiguration config)
Parameter Name Data Type Default Value Description config ConsumerEndpointConfiguration -
<ConsumerTemplate> register(typedesc serviceType)
Parameter Name Data Type Default Value Description serviceType typedesc -
<ConsumerTemplate> start()
-
<ConsumerTemplate> stop()
-
<ConsumerTemplate> getCallerActions() returns (ConsumerActions)
Return Type Description ConsumerActions
public type DurableTopicSubscriber object
Field Name | Data Type | Default Value | Description |
---|---|---|---|
consumerActions | DurableTopicSubscriberActions | ||
config | DurableTopicSubscriberEndpointConfiguration |
-
<DurableTopicSubscriber> init(DurableTopicSubscriberEndpointConfiguration config)
Parameter Name Data Type Default Value Description config DurableTopicSubscriberEndpointConfiguration -
<DurableTopicSubscriber> register(typedesc serviceType)
Parameter Name Data Type Default Value Description serviceType typedesc -
<DurableTopicSubscriber> start()
-
<DurableTopicSubscriber> getCallerActions() returns (DurableTopicSubscriberActions)
Return Type Description DurableTopicSubscriberActions -
<DurableTopicSubscriber> stop()
public type DurableTopicSubscriberActions object
-
<DurableTopicSubscriberActions> acknowledge(Message message) returns (error)
Parameter Name Data Type Default Value Description message Message Return Type Description error -
<DurableTopicSubscriberActions> receive() returns (Message | error)
Return Type Description Message | error
public type Message object
-
<Message> getTextMessageContent() returns (string | error)
Gets text content of the JMS message returns message content as string
Return Type Description string | error -
<Message> setStringProperty(string key, string value) returns (error)
Sets a JMS transport string property from the message
Parameter Name Data Type Default Value Description key string The string property name
value string The string property value
Return Type Description error -
<Message> getStringProperty(string key) returns (string | error)
Gets a JMS transport string property from the message
Parameter Name Data Type Default Value Description key string The string property name returns The string property value
Return Type Description string | error -
<Message> setIntProperty(string key, int value) returns (error)
Sets a JMS transport integer property from the message
Parameter Name Data Type Default Value Description key string The integer property name
value int The integer property value
Return Type Description error -
<Message> getIntProperty(string key) returns (int | error)
Gets a JMS transport integer property from the message
Parameter Name Data Type Default Value Description key string The integer property name returns The integer property value
Return Type Description int | error -
<Message> setBooleanProperty(string key, boolean value) returns (error)
Sets a JMS transport boolean property from the message
Parameter Name Data Type Default Value Description key string The boolean property name
value boolean The boolean property value
Return Type Description error -
<Message> getBooleanProperty(string key) returns (boolean | error)
Gets a JMS transport boolean property from the message
Parameter Name Data Type Default Value Description key string The boolean property name returns The boolean property value
Return Type Description boolean | error -
<Message> setFloatProperty(string key, float value) returns (error)
Sets a JMS transport float property from the message
Parameter Name Data Type Default Value Description key string The float property name
value float The float property value
Return Type Description error -
<Message> getFloatProperty(string key) returns (float | error)
Gets a JMS transport float property from the message
Parameter Name Data Type Default Value Description key string The float property name returns The float property value
Return Type Description float | error -
<Message> getMessageID() returns (string | error)
Get JMS transport header MessageID from the message returns The header value
Return Type Description string | error -
<Message> getTimestamp() returns (int | error)
Get JMS transport header Timestamp from the message returns The header value
Return Type Description int | error -
<Message> setDeliveryMode(int mode) returns (error)
Sets DeliveryMode JMS transport header to the message
Parameter Name Data Type Default Value Description mode int The header value
Return Type Description error -
<Message> getDeliveryMode() returns (int | error)
Get JMS transport header DeliveryMode from the message returns The header value"
Return Type Description int | error -
<Message> setExpiration(int value) returns (error)
Sets Expiration JMS transport header to the message
Parameter Name Data Type Default Value Description value int The header value
Return Type Description error -
<Message> getExpiration() returns (int | error)
Get JMS transport header Expiration from the message returns int: The header value
Return Type Description int | error -
<Message> setType(string messageType) returns (error)
Sets Type JMS transport header to the message
Parameter Name Data Type Default Value Description messageType string The message type header value returns error if any JMS provider level internal error occur
Return Type Description error -
<Message> getType() returns (string | error)
Get JMS transport header Type from the message returns The message type header value
Return Type Description string | error -
<Message> clearProperties()
Clear JMS properties of the message returns error if any JMS provider level internal error occur
-
<Message> clearBody() returns (error)
Clears body of the JMS message returns error if any JMS provider level internal error occur
Return Type Description error -
<Message> setPriority(int value) returns (error)
Sets Priority JMS transport header to the message
Parameter Name Data Type Default Value Description value int The header value
Return Type Description error -
<Message> getPriority() returns (int | error)
Get JMS transport header Priority from the message returns The header value
Return Type Description int | error -
<Message> getRedelivered() returns (boolean | error)
Get JMS transport header Redelivered from the message returns The header value
Return Type Description boolean | error -
<Message> setCorrelationID(string value) returns (error)
Sets CorrelationID JMS transport header to the message
Parameter Name Data Type Default Value Description value string The header value
Return Type Description error -
<Message> getCorrelationID() returns (string | error)
Get JMS transport header CorrelationID from the message returns The header value
Return Type Description string | error
public type QueueReceiver object
Field Name | Data Type | Default Value | Description |
---|---|---|---|
consumerActions | QueueReceiverActions | ||
config | QueueReceiverEndpointConfiguration |
-
<QueueReceiver> init(QueueReceiverEndpointConfiguration config)
Parameter Name Data Type Default Value Description config QueueReceiverEndpointConfiguration -
<QueueReceiver> register(typedesc serviceType)
Parameter Name Data Type Default Value Description serviceType typedesc -
<QueueReceiver> start()
-
<QueueReceiver> getCallerActions() returns (QueueReceiverActions)
Return Type Description QueueReceiverActions -
<QueueReceiver> stop()
public type QueueReceiverActions object
-
<QueueReceiverActions> acknowledge(Message message) returns (error)
Parameter Name Data Type Default Value Description message Message Return Type Description error -
<QueueReceiverActions> receive() returns (Message | error)
Return Type Description Message | error
public type QueueSender object
Field Name | Data Type | Default Value | Description |
---|---|---|---|
producerActions | QueueSenderActions | ||
config | QueueSenderEndpointConfiguration |
-
<QueueSender> init(QueueSenderEndpointConfiguration config)
Parameter Name Data Type Default Value Description config QueueSenderEndpointConfiguration -
<QueueSender> initQueueSender(Session session)
Parameter Name Data Type Default Value Description session Session -
<QueueSender> register(typedesc serviceType)
Parameter Name Data Type Default Value Description serviceType typedesc -
<QueueSender> start()
-
<QueueSender> getCallerActions() returns (QueueSenderActions)
Return Type Description QueueSenderActions -
<QueueSender> stop()
public type QueueSenderActions object
public type Session object
Field Name | Data Type | Default Value | Description |
---|---|---|---|
config | SessionConfiguration |
-
<Session> initEndpoint(Connection connection)
Parameter Name Data Type Default Value Description connection Connection -
<Session> createTextMessage(string content) returns (Message | error)
Parameter Name Data Type Default Value Description content string Return Type Description Message | error -
<Session> unsubscribe(string subscriptionId) returns (error)
Parameter Name Data Type Default Value Description subscriptionId string Return Type Description error
public type SimpleDurableTopicSubscriber object
Field Name | Data Type | Default Value | Description |
---|---|---|---|
config | SimpleDurableTopicSubscriberEndpointConfiguration |
-
<SimpleDurableTopicSubscriber> init(SimpleDurableTopicSubscriberEndpointConfiguration config)
Parameter Name Data Type Default Value Description config SimpleDurableTopicSubscriberEndpointConfiguration -
<SimpleDurableTopicSubscriber> register(typedesc serviceType)
Parameter Name Data Type Default Value Description serviceType typedesc -
<SimpleDurableTopicSubscriber> start()
-
<SimpleDurableTopicSubscriber> getCallerActions() returns (DurableTopicSubscriberActions)
Return Type Description DurableTopicSubscriberActions -
<SimpleDurableTopicSubscriber> stop()
-
<SimpleDurableTopicSubscriber> createTextMessage(string message) returns (Message | error)
Parameter Name Data Type Default Value Description message string Return Type Description Message | error
public type SimpleQueueReceiver object
Field Name | Data Type | Default Value | Description |
---|---|---|---|
config | SimpleQueueReceiverEndpointConfiguration |
-
<SimpleQueueReceiver> init(SimpleQueueReceiverEndpointConfiguration config)
Parameter Name Data Type Default Value Description config SimpleQueueReceiverEndpointConfiguration -
<SimpleQueueReceiver> register(typedesc serviceType)
Parameter Name Data Type Default Value Description serviceType typedesc -
<SimpleQueueReceiver> start()
-
<SimpleQueueReceiver> getCallerActions() returns (QueueReceiverActions)
Return Type Description QueueReceiverActions -
<SimpleQueueReceiver> stop()
-
<SimpleQueueReceiver> createTextMessage(string message) returns (Message | error)
Parameter Name Data Type Default Value Description message string Return Type Description Message | error
public type SimpleQueueSender object
Field Name | Data Type | Default Value | Description |
---|---|---|---|
config | SimpleQueueSenderEndpointConfiguration |
-
<SimpleQueueSender> init(SimpleQueueSenderEndpointConfiguration config)
Parameter Name Data Type Default Value Description config SimpleQueueSenderEndpointConfiguration -
<SimpleQueueSender> register(typedesc serviceType)
Parameter Name Data Type Default Value Description serviceType typedesc -
<SimpleQueueSender> start()
-
<SimpleQueueSender> getCallerActions() returns (QueueSenderActions)
Return Type Description QueueSenderActions -
<SimpleQueueSender> stop()
-
<SimpleQueueSender> createTextMessage(string message) returns (Message | error)
Parameter Name Data Type Default Value Description message string Return Type Description Message | error
public type SimpleTopicPublisher object
Field Name | Data Type | Default Value | Description |
---|---|---|---|
config | SimpleTopicPublisherEndpointConfiguration |
-
<SimpleTopicPublisher> init(SimpleTopicPublisherEndpointConfiguration config)
Parameter Name Data Type Default Value Description config SimpleTopicPublisherEndpointConfiguration -
<SimpleTopicPublisher> register(typedesc serviceType)
Parameter Name Data Type Default Value Description serviceType typedesc -
<SimpleTopicPublisher> start()
-
<SimpleTopicPublisher> getCallerActions() returns (TopicPublisherActions)
Return Type Description TopicPublisherActions -
<SimpleTopicPublisher> stop()
-
<SimpleTopicPublisher> createTextMessage(string message) returns (Message | error)
Parameter Name Data Type Default Value Description message string Return Type Description Message | error
public type SimpleTopicSubscriber object
Field Name | Data Type | Default Value | Description |
---|---|---|---|
config | SimpleTopicSubscriberEndpointConfiguration |
-
<SimpleTopicSubscriber> init(SimpleTopicSubscriberEndpointConfiguration config)
Parameter Name Data Type Default Value Description config SimpleTopicSubscriberEndpointConfiguration -
<SimpleTopicSubscriber> register(typedesc serviceType)
Parameter Name Data Type Default Value Description serviceType typedesc -
<SimpleTopicSubscriber> start()
-
<SimpleTopicSubscriber> getCallerActions() returns (TopicSubscriberActions)
Return Type Description TopicSubscriberActions -
<SimpleTopicSubscriber> stop()
-
<SimpleTopicSubscriber> createTextMessage(string message) returns (Message | error)
Parameter Name Data Type Default Value Description message string Return Type Description Message | error
public type TopicPublisher object
Field Name | Data Type | Default Value | Description |
---|---|---|---|
producerActions | TopicPublisherActions | ||
config | TopicPublisherEndpointConfiguration |
-
<TopicPublisher> init(TopicPublisherEndpointConfiguration config)
Parameter Name Data Type Default Value Description config TopicPublisherEndpointConfiguration -
<TopicPublisher> initTopicPublisher(Session session)
Parameter Name Data Type Default Value Description session Session -
<TopicPublisher> register(typedesc serviceType)
Parameter Name Data Type Default Value Description serviceType typedesc -
<TopicPublisher> start()
-
<TopicPublisher> getCallerActions() returns (TopicPublisherActions)
Return Type Description TopicPublisherActions -
<TopicPublisher> stop()
public type TopicPublisherActions object
public type TopicSubscriber object
Field Name | Data Type | Default Value | Description |
---|---|---|---|
consumerActions | TopicSubscriberActions | ||
config | TopicSubscriberEndpointConfiguration |
-
<TopicSubscriber> init(TopicSubscriberEndpointConfiguration config)
Parameter Name Data Type Default Value Description config TopicSubscriberEndpointConfiguration -
<TopicSubscriber> register(typedesc serviceType)
Parameter Name Data Type Default Value Description serviceType typedesc -
<TopicSubscriber> start()
-
<TopicSubscriber> getCallerActions() returns (TopicSubscriberActions)
Return Type Description TopicSubscriberActions -
<TopicSubscriber> stop()
public type TopicSubscriberActions object
-
<TopicSubscriberActions> acknowledge(Message message) returns (error)
Parameter Name Data Type Default Value Description message Message Return Type Description error -
<TopicSubscriberActions> receive() returns (Message | error)
Return Type Description Message | error