[Documentation Index]

WSO2 MB - Samples :JMS Topic Sample

This guide demonstrates how topics can be created and used in Message Broker using JMS API.

Contents

JMS Topic Sample

Following code is used to create a topic, subscribe to it and publish messages. To run this code sample, you need to have dependencies located at $CARBON_HOME/client-lib in class path. To try out following code fragment, Run the Topic Subscriber first and then run Topic Publisher code. You should see, published message in Topic subscriber console. You can see created topics in management console as well.

Topic Publisher: This code is used to publish messages to a given topic.

                package com.org.wso2.mb.jms.sample;
                /**
                 * Copyright (c) 2009, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
                 *
                 * Licensed under the Apache License, Version 2.0 (the "License");
                 * you may not use this file except in compliance with the License.
                 * You may obtain a copy of the License at
                 *
                 * http://www.apache.org/licenses/LICENSE-2.0
                 *
                 * Unless required by applicable law or agreed to in writing, software
                 * distributed under the License is distributed on an "AS IS" BASIS,
                 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
                 * See the License for the specific language governing permissions and
                 * limitations under the License.
                 */

                import javax.jms.JMSException;
                import javax.jms.QueueSession;
                import javax.jms.TextMessage;
                import javax.jms.Topic;
                import javax.jms.TopicConnection;
                import javax.jms.TopicConnectionFactory;
                import javax.jms.TopicSession;
                import javax.naming.Context;
                import javax.naming.InitialContext;
                import javax.naming.NamingException;
                import java.util.Properties;


                public class TopicPublisher {
                    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
                    private static final String CF_NAME_PREFIX = "connectionfactory.";
                    private static final String CF_NAME = "qpidConnectionfactory";
                    String userName = "admin";
                    String password = "admin";

                    private static String CARBON_CLIENT_ID = "carbon";
                    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
                    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
                    private static String CARBON_DEFAULT_PORT = "5672";
                    String topicName = "MYTopic";


                    public static void main(String[] args) throws NamingException, JMSException {
                        TopicPublisher topicPublisher = new TopicPublisher();
                        topicPublisher.publishMessage();
                    }

                    public void publishMessage() throws NamingException, JMSException {
                        Properties properties = new Properties();
                        properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
                        properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));

                        System.out.println("getTCPConnectionURL(userName,password) = " + getTCPConnectionURL(userName, password));

                        InitialContext ctx = new InitialContext(properties);
                        // Lookup connection factory
                        TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
                        TopicConnection topicConnection = connFactory.createTopicConnection();
                        topicConnection.start();
                        TopicSession topicSession =
                                topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);

                        // Send message
                        Topic topic = topicSession.createTopic(topicName);

                        // create the message to send
                        TextMessage textMessage = topicSession.createTextMessage("Test Message");

                        javax.jms.TopicPublisher topicPublisher = topicSession.createPublisher(topic);
                        topicPublisher.publish(textMessage);

                        topicSession.close();
                        topicConnection.close();
                    }

                    public String getTCPConnectionURL(String username, String password) {
                        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
                        return new StringBuffer()
                                .append("amqp://").append(username).append(":").append(password)
                                .append("@").append(CARBON_CLIENT_ID)
                                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                                .toString();
                    }

                }


            

Topic Subscriber: This code is used to Subscribe for topics. topicSubscriber.receive() will wait till a message is received and exit the main thread.

                package com.org.wso2.mb.jms.sample;
                /**
                 * Copyright (c) 2009, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
                 *
                 * Licensed under the Apache License, Version 2.0 (the "License");
                 * you may not use this file except in compliance with the License.
                 * You may obtain a copy of the License at
                 *
                 * http://www.apache.org/licenses/LICENSE-2.0
                 *
                 * Unless required by applicable law or agreed to in writing, software
                 * distributed under the License is distributed on an "AS IS" BASIS,
                 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
                 * See the License for the specific language governing permissions and
                 * limitations under the License.
                 */

                import javax.jms.JMSException;
                import javax.jms.Message;
                import javax.jms.QueueSession;
                import javax.jms.TextMessage;
                import javax.jms.Topic;
                import javax.jms.TopicConnection;
                import javax.jms.TopicConnectionFactory;
                import javax.jms.TopicSession;
                import javax.naming.Context;
                import javax.naming.InitialContext;
                import javax.naming.NamingException;
                import java.util.Properties;


                public class TopicSubscriber {
                    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
                    private static final String CF_NAME_PREFIX = "connectionfactory.";
                    private static final String CF_NAME = "qpidConnectionfactory";
                    String userName = "admin";
                    String password = "admin";

                    private static String CARBON_CLIENT_ID = "carbon";
                    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
                    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
                    private static String CARBON_DEFAULT_PORT = "5672";
                    String topicName = "MYTopic";


                    public static void main(String[] args) throws NamingException, JMSException {
                        TopicSubscriber topicSubscriber = new TopicSubscriber();
                        topicSubscriber.subscribe();
                    }

                    public void subscribe() throws NamingException, JMSException {
                        Properties properties = new Properties();
                        properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
                        properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));

                        System.out.println("getTCPConnectionURL(userName,password) = " + getTCPConnectionURL(userName, password));

                        InitialContext ctx = new InitialContext(properties);
                        // Lookup connection factory
                        TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
                        TopicConnection topicConnection = connFactory.createTopicConnection();
                        topicConnection.start();
                        TopicSession topicSession =
                                topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);

                        // Send message
                        Topic topic = topicSession.createTopic(topicName);
                        javax.jms.TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic);
                        Message message = topicSubscriber.receive();
                        if (message instanceof TextMessage) {
                            TextMessage textMessage = (TextMessage) message;
                            System.out.println("textMessage.getText() = " + textMessage.getText());
                        }
                        topicSession.close();
                        topicConnection.close();
                    }

                    public String getTCPConnectionURL(String username, String password) {
                        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
                        return new StringBuffer()
                                .append("amqp://").append(username).append(":").append(password)
                                .append("@").append(CARBON_CLIENT_ID)
                                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                                .toString();
                    }

                }


            

Also note that a MessageListener can be set to topicSubscriber to receive messages asynchronously.