[Documentation Index]

WSO2 MB - Samples :JMS Queue Sample

This guide demonstrates how persistent queues can be created and used in Message Broker using JMS API.

Contents

JMS Queue Sample

Following JMS client is used to send messages to a known created queue in WSO2 Message Broker. Queue Receiver can receive messages and message is printed in console.

First log into WSO2 Message Broker Management console and create a queue named 'testQueue'. Click on the 'Add' menu item under the 'Queues' menu to create a queue. To create a queue , the only thing needed to be provided is the name of the queue.

note: To run this code sample, you need to have dependencies located at $CARBON_HOME/client-lib in class path. You need to run QueueReceiver class prior to QueueSender class when testing this sample.

Using following QueueSender JMS client messages can be sent to 'testQueue'.

                package com.org.wso2.mb.jms.sample;
                /**
                 * Copyright (c) 2009, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
                 *
                 * Licensed under the Apache License, Version 2.0 (the "License");
                 * you may not use this file except in compliance with the License.
                 * You may obtain a copy of the License at
                 *
                 * http://www.apache.org/licenses/LICENSE-2.0
                 *
                 * Unless required by applicable law or agreed to in writing, software
                 * distributed under the License is distributed on an "AS IS" BASIS,
                 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
                 * See the License for the specific language governing permissions and
                 * limitations under the License.
                 */

                import javax.jms.JMSException;
                import javax.jms.Queue;
                import javax.jms.QueueConnection;
                import javax.jms.QueueConnectionFactory;
                import javax.jms.QueueSession;
                import javax.jms.TextMessage;
                import javax.naming.Context;
                import javax.naming.InitialContext;
                import javax.naming.NamingException;
                import java.util.Properties;

                public class QueueSender {
                    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
                    private static final String CF_NAME_PREFIX = "connectionfactory.";
                    private static final String QUEUE_NAME_PREFIX = "queue.";
                    private static final String CF_NAME = "qpidConnectionfactory";
                    String userName = "admin";
                    String password = "admin";

                    private static String CARBON_CLIENT_ID = "carbon";
                    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
                    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
                    private static String CARBON_DEFAULT_PORT = "5672";
                    String queueName = "testQueue";


                    public static void main(String[] args) throws NamingException, JMSException {
                        QueueSender queueSender = new QueueSender();
                        queueSender.sendMessages();
                    }

                    public void sendMessages() throws NamingException, JMSException {
                        Properties properties = new Properties();
                        properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
                        properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
                        properties.put(QUEUE_NAME_PREFIX + queueName, queueName);

                        System.out.println("getTCPConnectionURL(userName,password) = " + getTCPConnectionURL(userName, password));

                        InitialContext ctx = new InitialContext(properties);
                        // Lookup connection factory
                        QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME);
                        QueueConnection queueConnection = connFactory.createQueueConnection();
                        queueConnection.start();
                        QueueSession queueSession =
                                queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);

                        // Send message
                        Queue queue = (Queue)ctx.lookup(queueName);

                        // create the message to send
                        TextMessage textMessage = queueSession.createTextMessage("Test Message Content");

                        javax.jms.QueueSender queueSender = queueSession.createSender(queue);
                        queueSender.send(textMessage);

                        queueSender.close();
                        queueSession.close();
                        queueConnection.close();

                    }

                    public String getTCPConnectionURL(String username, String password) {
                        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
                        return new StringBuffer()
                                .append("amqp://").append(username).append(":").append(password)
                                .append("@").append(CARBON_CLIENT_ID)
                                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                                .toString();
                    }


                }

            

You can view created queue and its increased message count using Message Broker management console.

Now to receive the messages from the 'testQueue' following QueueReceiver class can be used. Message received gets printed on the console.

                package com.org.wso2.mb.jms.sample;
                /**
                 * Copyright (c) 2009, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
                 *
                 * Licensed under the Apache License, Version 2.0 (the "License");
                 * you may not use this file except in compliance with the License.
                 * You may obtain a copy of the License at
                 *
                 * http://www.apache.org/licenses/LICENSE-2.0
                 *
                 * Unless required by applicable law or agreed to in writing, software
                 * distributed under the License is distributed on an "AS IS" BASIS,
                 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
                 * See the License for the specific language governing permissions and
                 * limitations under the License.
                 */

                import javax.jms.*;
                import javax.naming.Context;
                import javax.naming.InitialContext;
                import javax.naming.NamingException;
                import java.util.Properties;

                public class QueueReceiver {

                    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
                    private static final String CF_NAME_PREFIX = "connectionfactory.";
                    private static final String CF_NAME = "qpidConnectionfactory";
                    String userName = "admin";
                    String password = "admin";

                    private static String CARBON_CLIENT_ID = "carbon";
                    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
                    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
                    private static String CARBON_DEFAULT_PORT = "5672";
                    String queueName = "testQueue";


                    public static void main(String[] args) throws NamingException, JMSException {
                        QueueReceiver queueReceiver = new QueueReceiver();
                        queueReceiver.receiveMessages();
                    }

                    public void receiveMessages() throws NamingException, JMSException {
                        Properties properties = new Properties();
                        properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
                        properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
                        properties.put("queue."+ queueName,queueName);

                        System.out.println("getTCPConnectionURL(userName,password) = " + getTCPConnectionURL(userName, password));

                        InitialContext ctx = new InitialContext(properties);
                        // Lookup connection factory
                        QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME);
                        QueueConnection queueConnection = connFactory.createQueueConnection();
                        queueConnection.start();
                        QueueSession queueSession =
                                queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);

                        //Receive message
                        Queue queue =  (Queue) ctx.lookup(queueName);
                        MessageConsumer queueReceiver = queueSession.createConsumer(queue);
                        TextMessage message = (TextMessage) queueReceiver.receive();
                        System.out.println("Got message ==>" + message.getText());

                        queueReceiver.close();
                        queueSession.close();
                        queueConnection.stop();
                        queueConnection.close();

                    }

                    public String getTCPConnectionURL(String username, String password) {
                        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
                        return new StringBuffer()
                                .append("amqp://").append(username).append(":").append(password)
                                .append("@").append(CARBON_CLIENT_ID)
                                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                                .toString();
                    }


                }

            

Now you should be able to see message count gets decreased for 'testQueue' using Management Console.