ballerina/jms package

Package overview

The ballerina/jms package provides an API to connect to an external JMS provider like Ballerina Message Broker, ActiveMQ.

The package provides consumer and producer endpoint types for queues and topics. Following are the endpoint types supported by this package.

  • QueueReceiver
  • TopicSubscriber
  • DurableTopicSubscriber
  • SimpleQueueReceiver
  • SimpleTopicSubscriber
  • SimpleDurableTopicSubscriber
  • QueueReceiver
  • TopicSubscriber
  • DurableTopicSubscriber
  • SimpleQueueReceiver
  • SimpleTopicSubscriber
  • SimpleDurableTopicSubscriber

The endpoints prefixed with Simple will automatically create a JMS connection and a JMS session when the endpoint is initialized. For other endpoints the developer has explicitly provide a properly initialized JMS Sessions.

Samples

JMS Simple Queue Consumer.

Following is a simple listener program that consumes messages from a Ballerina Message Broker queue named MyQueue.

import ballerina/jms;
import ballerina/log;

// Create a simple queue receiver.
endpoint jms:SimpleQueueReceiver consumer {
    initialContextFactory: "bmbInitialContextFactory",
    providerUrl: "amqp://admin:admin@ballerina/default?brokerlist='tcp://localhost:5672'",
    acknowledgementMode: "AUTO_ACKNOWLEDGE",
    queueName: "MyQueue"
};

// Bind the created consumer to the listener service.
service<jms:Consumer> jmsListener bind consumer {

    // The `OnMessage` resource gets invoked when a message is received.
    onMessage(endpoint consumer, jms:Message message) {
        match (message.getTextMessageContent()) {
            string messageText => log:printInfo("Message : " + messageText);
            error e => log:printError("Error occurred while reading message", err = e);
        }
    }
}

JMS Simple Queue Producer.

Following is a simple queue sender program that sends messages to a Ballerina Message Broker queue named MyQueue.

import ballerina/jms;
import ballerina/log;

// Create a topic publisher.
endpoint jms:SimpleTopicPublisher topicPublisher {
    initialContextFactory: "bmbInitialContextFactory",
    providerUrl: "amqp://admin:admin@ballerina/default?brokerlist='tcp://localhost:5672'",
    acknowledgementMode: "AUTO_ACKNOWLEDGE",
    topicPattern: "BallerinaTopic"
};

function main(string... args) {
    // Create a Text message.
    match (topicPublisher.createTextMessage("Hello from Ballerina")) {
        error e => {
            log:printError("Error occurred while creating message", err = e);
        }

        jms:Message msg => {
            // Send the Ballerina message to the JMS provider.
            topicPublisher->send(msg) but {
                error e => log:printError("Error occurred while sending message", err = e)
            };
        }
    }
}

JMS queue message receiver

Following is a listener program which explicitly initialize a JMS session to be used in the consumer.

import ballerina/jms;
import ballerina/log;

// Initialize a JMS connection with the provider.
jms:Connection conn = new({
        initialContextFactory: "bmbInitialContextFactory",
        providerUrl: "amqp://admin:admin@ballerina/default?brokerlist='tcp://localhost:5672'"
    });

// Initialize a JMS session on top of the created connection.
jms:Session jmsSession = new(conn, {
        // An optional property that defaults to `AUTO_ACKNOWLEDGE`.
        acknowledgementMode: "AUTO_ACKNOWLEDGE"
    });

// Initialize a queue receiver using the created session.
endpoint jms:QueueReceiver consumer {
    session: jmsSession,
    queueName: "MyQueue"
};

// Bind the created consumer to the listener service.
service<jms:Consumer> jmsListener bind consumer {

    // The `OnMessage` resource gets invoked when a message is received.
    onMessage(endpoint consumer, jms:Message message) {
        // Retrieve the text message.
        match (message.getTextMessageContent()) {
            string messageText => log:printInfo("Message : " + messageText);
            error e => log:printError("Error occurred while reading message", err = e);
        }
    }
}

JMS queue message producer

Following is a queue sender program which explicitly initialize a JMS session to be used in the producer.

import ballerina/jms;
import ballerina/log;

// Initialize a JMS connection with the provider.
jms:Connection jmsConnection = new({
        initialContextFactory: "bmbInitialContextFactory",
        providerUrl: "amqp://admin:admin@ballerina/default?brokerlist='tcp://localhost:5672'"
    });

// Initialize a JMS session on top of the created connection.
jms:Session jmsSession = new(jmsConnection, {
        acknowledgementMode: "AUTO_ACKNOWLEDGE"
    });

endpoint jms:QueueSender queueSender {
    session: jmsSession,
    queueName: "MyQueue"
};

function main(string... args) {
    // Create a Text message.
    match (jmsSession.createTextMessage("Hello from Ballerina")) {
        error e => {
            log:printError("Error occurred while creating message", err = e);
        }

        jms:Message msg => {
            // Send the Ballerina message to the JMS provider.
            queueSender->send(msg) but { error e => log:printError("Error occurred while sending message", err = e) };
        }
    }
}

public type ConnectionConfiguration

Field Name Data Type Default Value Description
initialContextFactory string wso2mbInitialContextFactory
providerUrl string amqp://admin:admin@ballerina/default?brokerlist='tcp://localhost:5672'
connectionFactoryName string ConnectionFactory
properties map

public type ConsumerEndpointConfiguration

Field Name Data Type Default Value Description
session Session
identifier string

public type DurableTopicSubscriberEndpointConfiguration

Field Name Data Type Default Value Description
session Session
topicPattern string
messageSelector string
identifier string

public type QueueReceiverEndpointConfiguration

Field Name Data Type Default Value Description
session Session
queueName string
messageSelector string
identifier string

public type QueueSenderEndpointConfiguration

Field Name Data Type Default Value Description
session Session
queueName string

public type SessionConfiguration

Field Name Data Type Default Value Description
acknowledgementMode string AUTO_ACKNOWLEDGE

public type SimpleDurableTopicSubscriberEndpointConfiguration

Field Name Data Type Default Value Description
initialContextFactory string bmbInitialContextFactory
providerUrl string amqp://admin:admin@ballerina/default?brokerlist='tcp://localhost:5672'
connectionFactoryName string ConnectionFactory
acknowledgementMode string AUTO_ACKNOWLEDGE
identifier string
properties map
messageSelector string
topicPattern string

public type SimpleQueueReceiverEndpointConfiguration

Field Name Data Type Default Value Description
initialContextFactory string bmbInitialContextFactory
providerUrl string amqp://admin:admin@ballerina/default?brokerlist='tcp://localhost:5672'
connectionFactoryName string ConnectionFactory
acknowledgementMode string AUTO_ACKNOWLEDGE
messageSelector string
properties map
queueName string

public type SimpleQueueSenderEndpointConfiguration

Field Name Data Type Default Value Description
initialContextFactory string bmbInitialContextFactory
providerUrl string amqp://admin:admin@ballerina/default?brokerlist='tcp://localhost:5672'
connectionFactoryName string ConnectionFactory
acknowledgementMode string AUTO_ACKNOWLEDGE
properties map
queueName string

public type SimpleTopicPublisherEndpointConfiguration

Field Name Data Type Default Value Description
initialContextFactory string bmbInitialContextFactory
providerUrl string amqp://admin:admin@ballerina/default?brokerlist='tcp://localhost:5672'
connectionFactoryName string ConnectionFactory
acknowledgementMode string AUTO_ACKNOWLEDGE
properties map
topicPattern string

public type SimpleTopicSubscriberEndpointConfiguration

Field Name Data Type Default Value Description
initialContextFactory string bmbInitialContextFactory
providerUrl string amqp://admin:admin@ballerina/default?brokerlist='tcp://localhost:5672'
connectionFactoryName string ConnectionFactory
acknowledgementMode string AUTO_ACKNOWLEDGE
messageSelector string
properties map
topicPattern string

public type TopicPublisherEndpointConfiguration

Field Name Data Type Default Value Description
session Session
topicPattern string

public type TopicSubscriberEndpointConfiguration

Field Name Data Type Default Value Description
session Session
topicPattern string
messageSelector string
identifier string

public type Connection object

Field Name Data Type Default Value Description
config ConnectionConfiguration
  • <Connection> createConnection()

  • <Connection> start()

  • <Connection> stop()

public type Consumer object

  • <Consumer> getEndpoint() returns (ConsumerTemplate)

    Return Type Description
    ConsumerTemplate

public type ConsumerActions object

  • <ConsumerActions> acknowledge(Message message) returns (error)

    Parameter Name Data Type Default Value Description
    message Message
    Return Type Description
    error

public type ConsumerTemplate object

Field Name Data Type Default Value Description
consumerActions ConsumerActions
config ConsumerEndpointConfiguration
  • <ConsumerTemplate> init(ConsumerEndpointConfiguration config)

    Parameter Name Data Type Default Value Description
    config ConsumerEndpointConfiguration
  • <ConsumerTemplate> register(typedesc serviceType)

    Parameter Name Data Type Default Value Description
    serviceType typedesc
  • <ConsumerTemplate> start()

  • <ConsumerTemplate> stop()

  • <ConsumerTemplate> getCallerActions() returns (ConsumerActions)

    Return Type Description
    ConsumerActions

public type DurableTopicSubscriber object

Field Name Data Type Default Value Description
consumerActions DurableTopicSubscriberActions
config DurableTopicSubscriberEndpointConfiguration
  • <DurableTopicSubscriber> init(DurableTopicSubscriberEndpointConfiguration config)

    Parameter Name Data Type Default Value Description
    config DurableTopicSubscriberEndpointConfiguration
  • <DurableTopicSubscriber> register(typedesc serviceType)

    Parameter Name Data Type Default Value Description
    serviceType typedesc
  • <DurableTopicSubscriber> start()

  • <DurableTopicSubscriber> getCallerActions() returns (DurableTopicSubscriberActions)

    Return Type Description
    DurableTopicSubscriberActions
  • <DurableTopicSubscriber> stop()

public type DurableTopicSubscriberActions object

  • <DurableTopicSubscriberActions> acknowledge(Message message) returns (error)

    Parameter Name Data Type Default Value Description
    message Message
    Return Type Description
    error
  • <DurableTopicSubscriberActions> receive() returns (Message | error)

    Return Type Description
    Message | error

public type Message object

  • <Message> getTextMessageContent() returns (string | error)

    Gets text content of the JMS message returns message content as string

    Return Type Description
    string | error
  • <Message> setStringProperty(string key, string value) returns (error)

    Sets a JMS transport string property from the message

    Parameter Name Data Type Default Value Description
    key string

    The string property name

    value string

    The string property value

    Return Type Description
    error
  • <Message> getStringProperty(string key) returns (string | error)

    Gets a JMS transport string property from the message

    Parameter Name Data Type Default Value Description
    key string

    The string property name returns The string property value

    Return Type Description
    string | error
  • <Message> setIntProperty(string key, int value) returns (error)

    Sets a JMS transport integer property from the message

    Parameter Name Data Type Default Value Description
    key string

    The integer property name

    value int

    The integer property value

    Return Type Description
    error
  • <Message> getIntProperty(string key) returns (int | error)

    Gets a JMS transport integer property from the message

    Parameter Name Data Type Default Value Description
    key string

    The integer property name returns The integer property value

    Return Type Description
    int | error
  • <Message> setBooleanProperty(string key, boolean value) returns (error)

    Sets a JMS transport boolean property from the message

    Parameter Name Data Type Default Value Description
    key string

    The boolean property name

    value boolean

    The boolean property value

    Return Type Description
    error
  • <Message> getBooleanProperty(string key) returns (boolean | error)

    Gets a JMS transport boolean property from the message

    Parameter Name Data Type Default Value Description
    key string

    The boolean property name returns The boolean property value

    Return Type Description
    boolean | error
  • <Message> setFloatProperty(string key, float value) returns (error)

    Sets a JMS transport float property from the message

    Parameter Name Data Type Default Value Description
    key string

    The float property name

    value float

    The float property value

    Return Type Description
    error
  • <Message> getFloatProperty(string key) returns (float | error)

    Gets a JMS transport float property from the message

    Parameter Name Data Type Default Value Description
    key string

    The float property name returns The float property value

    Return Type Description
    float | error
  • <Message> getMessageID() returns (string | error)

    Get JMS transport header MessageID from the message returns The header value

    Return Type Description
    string | error
  • <Message> getTimestamp() returns (int | error)

    Get JMS transport header Timestamp from the message returns The header value

    Return Type Description
    int | error
  • <Message> setDeliveryMode(int mode) returns (error)

    Sets DeliveryMode JMS transport header to the message

    Parameter Name Data Type Default Value Description
    mode int

    The header value

    Return Type Description
    error
  • <Message> getDeliveryMode() returns (int | error)

    Get JMS transport header DeliveryMode from the message returns The header value"

    Return Type Description
    int | error
  • <Message> setExpiration(int value) returns (error)

    Sets Expiration JMS transport header to the message

    Parameter Name Data Type Default Value Description
    value int

    The header value

    Return Type Description
    error
  • <Message> getExpiration() returns (int | error)

    Get JMS transport header Expiration from the message returns int: The header value

    Return Type Description
    int | error
  • <Message> setType(string messageType) returns (error)

    Sets Type JMS transport header to the message

    Parameter Name Data Type Default Value Description
    messageType string

    The message type header value returns error if any JMS provider level internal error occur

    Return Type Description
    error
  • <Message> getType() returns (string | error)

    Get JMS transport header Type from the message returns The message type header value

    Return Type Description
    string | error
  • <Message> clearProperties()

    Clear JMS properties of the message returns error if any JMS provider level internal error occur

  • <Message> clearBody() returns (error)

    Clears body of the JMS message returns error if any JMS provider level internal error occur

    Return Type Description
    error
  • <Message> setPriority(int value) returns (error)

    Sets Priority JMS transport header to the message

    Parameter Name Data Type Default Value Description
    value int

    The header value

    Return Type Description
    error
  • <Message> getPriority() returns (int | error)

    Get JMS transport header Priority from the message returns The header value

    Return Type Description
    int | error
  • <Message> getRedelivered() returns (boolean | error)

    Get JMS transport header Redelivered from the message returns The header value

    Return Type Description
    boolean | error
  • <Message> setCorrelationID(string value) returns (error)

    Sets CorrelationID JMS transport header to the message

    Parameter Name Data Type Default Value Description
    value string

    The header value

    Return Type Description
    error
  • <Message> getCorrelationID() returns (string | error)

    Get JMS transport header CorrelationID from the message returns The header value

    Return Type Description
    string | error

public type QueueReceiver object

Field Name Data Type Default Value Description
consumerActions QueueReceiverActions
config QueueReceiverEndpointConfiguration
  • <QueueReceiver> init(QueueReceiverEndpointConfiguration config)

    Parameter Name Data Type Default Value Description
    config QueueReceiverEndpointConfiguration
  • <QueueReceiver> register(typedesc serviceType)

    Parameter Name Data Type Default Value Description
    serviceType typedesc
  • <QueueReceiver> start()

  • <QueueReceiver> getCallerActions() returns (QueueReceiverActions)

    Return Type Description
    QueueReceiverActions
  • <QueueReceiver> stop()

public type QueueReceiverActions object

  • <QueueReceiverActions> acknowledge(Message message) returns (error)

    Parameter Name Data Type Default Value Description
    message Message
    Return Type Description
    error
  • <QueueReceiverActions> receive() returns (Message | error)

    Return Type Description
    Message | error

public type QueueSender object

Field Name Data Type Default Value Description
producerActions QueueSenderActions
config QueueSenderEndpointConfiguration
  • <QueueSender> init(QueueSenderEndpointConfiguration config)

    Parameter Name Data Type Default Value Description
    config QueueSenderEndpointConfiguration
  • <QueueSender> initQueueSender(Session session)

    Parameter Name Data Type Default Value Description
    session Session
  • <QueueSender> register(typedesc serviceType)

    Parameter Name Data Type Default Value Description
    serviceType typedesc
  • <QueueSender> start()

  • <QueueSender> getCallerActions() returns (QueueSenderActions)

    Return Type Description
    QueueSenderActions
  • <QueueSender> stop()

public type QueueSenderActions object

  • <QueueSenderActions> send(Message m) returns (error)

    Parameter Name Data Type Default Value Description
    m Message
    Return Type Description
    error

public type Session object

Field Name Data Type Default Value Description
config SessionConfiguration
  • <Session> initEndpoint(Connection connection)

    Parameter Name Data Type Default Value Description
    connection Connection
  • <Session> createTextMessage(string content) returns (Message | error)

    Parameter Name Data Type Default Value Description
    content string
    Return Type Description
    Message | error
  • <Session> unsubscribe(string subscriptionId) returns (error)

    Parameter Name Data Type Default Value Description
    subscriptionId string
    Return Type Description
    error

public type SimpleDurableTopicSubscriber object

Field Name Data Type Default Value Description
config SimpleDurableTopicSubscriberEndpointConfiguration
  • <SimpleDurableTopicSubscriber> init(SimpleDurableTopicSubscriberEndpointConfiguration config)

    Parameter Name Data Type Default Value Description
    config SimpleDurableTopicSubscriberEndpointConfiguration
  • <SimpleDurableTopicSubscriber> register(typedesc serviceType)

    Parameter Name Data Type Default Value Description
    serviceType typedesc
  • <SimpleDurableTopicSubscriber> start()

  • <SimpleDurableTopicSubscriber> getCallerActions() returns (DurableTopicSubscriberActions)

    Return Type Description
    DurableTopicSubscriberActions
  • <SimpleDurableTopicSubscriber> stop()

  • <SimpleDurableTopicSubscriber> createTextMessage(string message) returns (Message | error)

    Parameter Name Data Type Default Value Description
    message string
    Return Type Description
    Message | error

public type SimpleQueueReceiver object

Field Name Data Type Default Value Description
config SimpleQueueReceiverEndpointConfiguration
  • <SimpleQueueReceiver> init(SimpleQueueReceiverEndpointConfiguration config)

    Parameter Name Data Type Default Value Description
    config SimpleQueueReceiverEndpointConfiguration
  • <SimpleQueueReceiver> register(typedesc serviceType)

    Parameter Name Data Type Default Value Description
    serviceType typedesc
  • <SimpleQueueReceiver> start()

  • <SimpleQueueReceiver> getCallerActions() returns (QueueReceiverActions)

    Return Type Description
    QueueReceiverActions
  • <SimpleQueueReceiver> stop()

  • <SimpleQueueReceiver> createTextMessage(string message) returns (Message | error)

    Parameter Name Data Type Default Value Description
    message string
    Return Type Description
    Message | error

public type SimpleQueueSender object

Field Name Data Type Default Value Description
config SimpleQueueSenderEndpointConfiguration
  • <SimpleQueueSender> init(SimpleQueueSenderEndpointConfiguration config)

    Parameter Name Data Type Default Value Description
    config SimpleQueueSenderEndpointConfiguration
  • <SimpleQueueSender> register(typedesc serviceType)

    Parameter Name Data Type Default Value Description
    serviceType typedesc
  • <SimpleQueueSender> start()

  • <SimpleQueueSender> getCallerActions() returns (QueueSenderActions)

    Return Type Description
    QueueSenderActions
  • <SimpleQueueSender> stop()

  • <SimpleQueueSender> createTextMessage(string message) returns (Message | error)

    Parameter Name Data Type Default Value Description
    message string
    Return Type Description
    Message | error

public type SimpleTopicPublisher object

Field Name Data Type Default Value Description
config SimpleTopicPublisherEndpointConfiguration
  • <SimpleTopicPublisher> init(SimpleTopicPublisherEndpointConfiguration config)

    Parameter Name Data Type Default Value Description
    config SimpleTopicPublisherEndpointConfiguration
  • <SimpleTopicPublisher> register(typedesc serviceType)

    Parameter Name Data Type Default Value Description
    serviceType typedesc
  • <SimpleTopicPublisher> start()

  • <SimpleTopicPublisher> getCallerActions() returns (TopicPublisherActions)

    Return Type Description
    TopicPublisherActions
  • <SimpleTopicPublisher> stop()

  • <SimpleTopicPublisher> createTextMessage(string message) returns (Message | error)

    Parameter Name Data Type Default Value Description
    message string
    Return Type Description
    Message | error

public type SimpleTopicSubscriber object

Field Name Data Type Default Value Description
config SimpleTopicSubscriberEndpointConfiguration
  • <SimpleTopicSubscriber> init(SimpleTopicSubscriberEndpointConfiguration config)

    Parameter Name Data Type Default Value Description
    config SimpleTopicSubscriberEndpointConfiguration
  • <SimpleTopicSubscriber> register(typedesc serviceType)

    Parameter Name Data Type Default Value Description
    serviceType typedesc
  • <SimpleTopicSubscriber> start()

  • <SimpleTopicSubscriber> getCallerActions() returns (TopicSubscriberActions)

    Return Type Description
    TopicSubscriberActions
  • <SimpleTopicSubscriber> stop()

  • <SimpleTopicSubscriber> createTextMessage(string message) returns (Message | error)

    Parameter Name Data Type Default Value Description
    message string
    Return Type Description
    Message | error

public type TopicPublisher object

Field Name Data Type Default Value Description
producerActions TopicPublisherActions
config TopicPublisherEndpointConfiguration
  • <TopicPublisher> init(TopicPublisherEndpointConfiguration config)

    Parameter Name Data Type Default Value Description
    config TopicPublisherEndpointConfiguration
  • <TopicPublisher> initTopicPublisher(Session session)

    Parameter Name Data Type Default Value Description
    session Session
  • <TopicPublisher> register(typedesc serviceType)

    Parameter Name Data Type Default Value Description
    serviceType typedesc
  • <TopicPublisher> start()

  • <TopicPublisher> getCallerActions() returns (TopicPublisherActions)

    Return Type Description
    TopicPublisherActions
  • <TopicPublisher> stop()

public type TopicPublisherActions object

  • <TopicPublisherActions> send(Message m) returns (error)

    Parameter Name Data Type Default Value Description
    m Message
    Return Type Description
    error

public type TopicSubscriber object

Field Name Data Type Default Value Description
consumerActions TopicSubscriberActions
config TopicSubscriberEndpointConfiguration
  • <TopicSubscriber> init(TopicSubscriberEndpointConfiguration config)

    Parameter Name Data Type Default Value Description
    config TopicSubscriberEndpointConfiguration
  • <TopicSubscriber> register(typedesc serviceType)

    Parameter Name Data Type Default Value Description
    serviceType typedesc
  • <TopicSubscriber> start()

  • <TopicSubscriber> getCallerActions() returns (TopicSubscriberActions)

    Return Type Description
    TopicSubscriberActions
  • <TopicSubscriber> stop()

public type TopicSubscriberActions object

  • <TopicSubscriberActions> acknowledge(Message message) returns (error)

    Parameter Name Data Type Default Value Description
    message Message
    Return Type Description
    error
  • <TopicSubscriberActions> receive() returns (Message | error)

    Return Type Description
    Message | error