The goal of this guide is to provide guidelines to be followed in order to get familiar with WSO2 Message Broker and the procedure of using the features provided by the product.
This gives a brief introduction of how message broker can be used in publishing and receiving messages.
WSO2 Message Broker is basically consist of two major features.They are:
This feature of the WSO2 Message Broker provides the facility for users to route messages to the required users. If we are explaining deeply, there is a concept called 'Topic' and message routing is done on the base of that topic. When a particular user want to publish a message to a particular sector , he creates a topic with a name related to the messages that he is going to pulish. As an example, if a particular user want to publish messages related with sports news, he can create a topic with the name 'SportsNews' and publish the messages to that topic.
When another particular user is interested on any topic in the topic tree, he can subscribe to that topic and receive messages which are published to that topic by the publisher. As in the above example, when a particular user is interested on sports, he can subscribed to the topic 'SportsNews' and get messages published to that topic.
In WSO2 Message Broker , Pub/Sub feature is one of the two major features. Inorder to use this feature , it is needed to create a Topic and subscribe to it.
Login to the server
Click on the 'Add' menu item under the 'Topics' menu to create a topic. To create a topic , the only thing needed to be provided is the name of the topic.
When you add a topic using the 'add' button , you will be directed to the 'Topic Browser' page and you will see the topic tree.
Once you click on a topic in the topic tree , it will display all the available operations on a topic. Once you click on the 'Help' link on that page you will find the information on all the operations available on the topic
If you click on details link , you will find following page.
Once you click on the topic , you will get the following page.
Once you click on 'Subscribe' link on the above page, you will be directed to Add subscriptions page.
You can create a subscription to the topic by provide the information on subscription
User does not need to specify the topic here , since its automatically sets up.
This is the mode of the subscription and there are three modes.
The default mode for the subscription is "Topic Only". With this mode , user creates the subscription only to the topic. In this mode subscribers only receive events which are published only to the that topic.
Next mode of subscription is "Topic and Immediate child". In this mode subscribers of the topic receives events published not only the specified topic but also to the immediate child of that topic.
Last mode of subscription is "Topic and Children". In this mode subscribers of the specified topic will receive events published to the specified topic and all its children
This is the URL which the subscriber should provide to receive events published. When events are published to the topic, they are sent to the specified URL here.
Here user can specify the expiration time of the subscription. This is not a required parameter and if user leave it alone, subscription will never be expired.
Note : You can create a simple axis2service and use it's URL as the EventSinkURL . Inorder to create an axis2service ,
Now you can create a subscription by providing the Event Sink URL : https://localhost:9443/services/EventSinkService/getOMElement
Click on the button 'Subscribe' and it will create the subscription and list it in the subscription table of that topic in topic details page.
At the end of the subscribing process , you can test whether the topic and the subscriptions created are working fine. In order to do that what you have to do is type a XML message in the provided text box and under the 'Publish' section of Topic Details page and click on 'Publish button'
Then check the command line and you will be able to see the XML Message that you types in the provided space.
WSO2 MB is now supporting clustering. That means high availability and failover support is there. You can setup several Message Broker nodes and configure them up to work as a cluster so that if one node is down message routing and handling will be taken over by other nodes in the cluster. At the same time overhead of routing messages is distributed among the Message Broker cluster nodes so that overall performance of Message Brokering goes up. Thus, having a lot of publishers and subscribers will not be a problem anymore as there is no significant performance degrade with the number of publishers, subscribers and exchanges. Scalability is beyond you with WSO2 MB with combined resources you have. Fault tolerance brings you great benefits in deployment apart from the performance gain.
There are several cluster deployment models supported by WSO2 Message Broker. Read on them at Deployment guide and choose the suitable deployment pattern for your use-case and enable clustering as described there.
import javax.jms.*; import javax.naming.InitialContext; import javax.naming.NamingException; import java.util.Properties; public class ConsumeClient { public void consumeMessage() { Properties initialContextProperties = new Properties(); initialContextProperties.put("java.naming.factory.initial", "org.wso2.andes.jndi.PropertiesFileInitialContextFactory"); String connectionString = "amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'"; initialContextProperties.put("connectionfactory.qpidConnectionfactory", connectionString); initialContextProperties.put("queue.myQueue", "myQueue"); try { InitialContext initialContext = new InitialContext(initialContextProperties); QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) initialContext.lookup("qpidConnectionfactory"); QueueConnection queueConnection = queueConnectionFactory.createQueueConnection(); queueConnection.start(); QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); Destination destination = (Destination) initialContext.lookup("myQueue"); MessageConsumer messageConsumer = queueSession.createConsumer(destination); TextMessage textMessage = (TextMessage) messageConsumer.receive(); System.out.println("Got message ==> " + textMessage.getText()); try { Thread.sleep(9000); } catch (Exception e) { System.out.println(e); } messageConsumer.close(); queueSession.close(); queueConnection.stop(); queueConnection.close(); } catch (NamingException e) { e.printStackTrace(); } catch (JMSException e) { e.printStackTrace(); } } public static void main(String[] args) { ConsumeClient sendConsumeClient = new ConsumeClient(); sendConsumeClient.consumeMessage(); } }
import javax.jms.*; import javax.naming.InitialContext; import javax.naming.NamingException; import java.util.Properties; public class SendClient { public static void main(String[] args) { SendClient sendClient = new SendClient(); sendClient.sendMessage(); } public void sendMessage() { Properties initialContextProperties = new Properties(); initialContextProperties.put("java.naming.factory.initial", "org.wso2.andes.jndi.PropertiesFileInitialContextFactory"); String connectionString = "amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5673'"; initialContextProperties.put("connectionfactory.qpidConnectionfactory", connectionString); initialContextProperties.put("queue.myQueue", "myQueue"); try { InitialContext initialContext = new InitialContext(initialContextProperties); QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) initialContext.lookup("qpidConnectionfactory"); QueueConnection queueConnection = queueConnectionFactory.createQueueConnection(); queueConnection.start(); QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); TextMessage textMessage = queueSession.createTextMessage(); textMessage.setText("Test message"); System.out.println("Sending Message : " + textMessage.getText().length()); // Send message Queue queue = (Queue) initialContext.lookup("myQueue"); QueueSender queueSender = queueSession.createSender(queue); queueSender.send(textMessage); // Housekeeping queueSender.close(); queueSession.close(); queueConnection.stop(); queueConnection.close(); } catch (NamingException e) { e.printStackTrace(); } catch (JMSException e) { e.printStackTrace(); } } }
Following is a typical deployment diagram for a Message Broker cluster setup.