JmsConsumer.java

/**
 *  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
 *
 *  WSO2 Inc. licenses this file to you under the Apache License,
 *  Version 2.0 (the "License"); you may not use this file except
 *  in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.synapse.message.store.impl.jms;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseException;
import org.apache.synapse.message.MessageConsumer;
import org.apache.synapse.message.StoreForwardException;
import org.apache.synapse.message.processor.MessageProcessorConstants;
import org.apache.synapse.message.store.Constants;
import org.apache.synapse.message.store.impl.commons.MessageConverter;
import org.apache.synapse.message.store.impl.commons.StorableMessage;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;

public class JmsConsumer implements MessageConsumer {

    private static final Log logger = LogFactory.getLog(JmsConsumer.class.getName());

    private Connection connection;

    private Session session;

    private javax.jms.MessageConsumer consumer;

    private JmsStore store;

    private String idString;

    private boolean isInitialized;

    /** Holds the last message read from the message store. */
    private CachedMessage cachedMessage;

    /** Did last receive() call cause an error? */
    private boolean isReceiveError;

    /**
     * Boolean to store if the message processor is alive
     */
    private boolean isAlive;

    /**
     * Constructor for JMS consumer
     *
     * @param store JMSStore associated to this JMS consumer
     */
    public JmsConsumer(JmsStore store) {
        if (store == null) {
            logger.error("Cannot initialize.");
            return;
        }
        this.store = store;
        cachedMessage = new CachedMessage();
        isReceiveError = false;
        isInitialized = true;
        isAlive = true;
    }

    public MessageContext receive() {

        if (isAlive()) {
            boolean connectionSuccess = checkAndTryConnect();

            if (!connectionSuccess) {
                throw new SynapseException(idString + "Error while connecting to JMS provider. "
                        + MessageProcessorConstants.STORE_CONNECTION_ERROR);
            }

            try {
                Message message = consumer.receive(1000);
                if (message == null) {
                    return null;
                }
                if (!(message instanceof ObjectMessage)) {
                    logger.warn("JMS Consumer " + getId() + " did not receive a javax.jms.ObjectMessage");
                    //we just discard this message as we only store Object messages via JMS Message store
                    message.acknowledge();
                    return null;
                }
                ObjectMessage msg = (ObjectMessage) message;
                String messageId = msg.getStringProperty(Constants.OriginalMessageID);
                if (!(msg.getObject() instanceof StorableMessage)) {
                    logger.warn("JMS Consumer " + getId() + " did not receive a valid message.");
                    message.acknowledge();
                    return null;
                }

                //create a ,essage context back from the stored message
                StorableMessage storableMessage = (StorableMessage) msg.getObject();
                org.apache.axis2.context.MessageContext axis2Mc = store.newAxis2Mc();
                MessageContext synapseMc = store.newSynapseMc(axis2Mc);
                synapseMc = MessageConverter.toMessageContext(storableMessage, axis2Mc, synapseMc);

                //cache the message
                updateCache(message, synapseMc, messageId, false);

                if (logger.isDebugEnabled()) {
                    logger.debug(getId() + " Received MessageId:" + messageId + " priority:" + message.getJMSPriority());
                }

                return synapseMc;

            } catch (JMSException e) {
                logger.error("Cannot fetch messages from Store " + store.getName());
                updateCache(null, null, "", true);
                cleanup();
            /* try connecting and receiving again. Try to connect will happen configured number of times
            and give up with a SynapseException */
                return receive();
            }
        } else {
            if (logger.isDebugEnabled()) {
                logger.debug("Trying to receive messages from a consumer that is not alive. Id: " + getId()
                        + ", store: " + store.getName());
            }
            return null;
        }
    }

    public boolean ack() {
        boolean result = cachedMessage.ack();
        if (result) {
            store.dequeued();
        }
        return result;
    }

    public boolean cleanup() throws SynapseException {
        isAlive = false;
        if (logger.isDebugEnabled()) {
            logger.debug(getId() + " cleaning up...");
        }
        try {
            store.cleanup(connection, session);
            return true;
        } catch (JMSException e) {
            throw new SynapseException("Error while connecting to store to close created connections. JMS provider "
                    + "might not be accessible " + store.getName() + " "
                    + MessageProcessorConstants.STORE_CONNECTION_ERROR, e);
        } finally {
            connection = null;
            session = null;
            consumer = null;
        }
    }

    public boolean isAlive() {

        if (isAlive) {
            try {
                session.getAcknowledgeMode(); /** No straight forward way to check session availability */
            } catch (JMSException e) {
                return false;
            }
        } else {
            return false;
        }
        return true;
    }

    public void setAlive(boolean isAlive) {
    }

    public Connection getConnection() {
        return connection;
    }

    public JmsConsumer setConnection(Connection connection) {
        this.connection = connection;
        return this;
    }

    public Session getSession() {
        return session;
    }

    public JmsConsumer setSession(Session session) {
        this.session = session;
        return this;
    }

    public javax.jms.MessageConsumer getConsumer() {
        return consumer;
    }

    public JmsConsumer setConsumer(javax.jms.MessageConsumer consumer) {
        this.consumer = consumer;
        return this;
    }

    public boolean isInitialized() {
        return isInitialized;
    }

    public void setId(int id) {
        idString = "[" + store.getName() + "-C-" + id + "]";
    }

    public void setStringId(String idString) {
        this.idString = idString;
    }

    public String getId() {
        return getIdAsString();
    }

    private String getIdAsString() {
        if (idString == null) {
            return "[unknown-consumer]";
        }
        return idString;
    }

    /**
     * Check if connection, session and consumer is created successfully, if not try to connect
     *
     * @return true if connection to JMS provider is successfully made
     */
    private boolean checkAndTryConnect() {

        boolean connectionSuccess = false;

        if (consumer != null && session != null && connection != null) {
            connectionSuccess = true;
        } else {
            try {
                reconnect();
                connectionSuccess = true;
            } catch (StoreForwardException | SynapseException e) {
                logger.error("Error while connecting to JMS store and initializing consumer", e);
            }
        }
        return connectionSuccess;
    }

    private void writeToFileSystem() {
    }

    private void updateCache(Message message, MessageContext synCtx, String messageId, boolean receiveError) {
        isReceiveError = receiveError;
        cachedMessage.setMessage(message);
        cachedMessage.setMc(synCtx);
        cachedMessage.setId(messageId);
    }

    private void reconnect() throws StoreForwardException {
        logger.info("Trying to reconnect to JMS store " + store.getName());
        JmsConsumer consumer = (JmsConsumer) store.getConsumer();
        logger.info("Successfully connected to JMS store " + store.getName());
        if (consumer.getConsumer() == null) {
            if (logger.isDebugEnabled()) {
                logger.debug(getId() + " could not reconnect to the broker.");
            }
        }
        connection = consumer.getConnection();
        session = consumer.getSession();
        this.consumer = consumer.getConsumer();
        if (logger.isDebugEnabled()) {
            logger.debug(getId() + " ===> " + consumer.getId());
        }
    }

    private final class CachedMessage {
        private Message message = null;
        private MessageContext mc = null;
        private String id = "";

        public CachedMessage setMessage(Message message) {
            this.message = message;
            return this;
        }

        /**
         * Acknowledge the message
         *
         * @return true if ack is processed successfully. If there was some issue,
         * we call recover on session will return false
         */
        public boolean ack() {
            try {
                if (message != null) {
                    message.acknowledge();
                }
            } catch (javax.jms.IllegalStateException e) {
                logger.warn("JMS Session is in an illegal state. Recovering session.");

                try {
                    getSession().recover();
                    logger.warn("JMS Session recovered.");
                } catch (JMSException e1) {
                    logger.error("Error occurred while recovering session: "
                            + e.getLocalizedMessage(), e);
                    return false;
                }
                return false;
            } catch (JMSException e) {
                logger.error(getId() + " cannot ack last read message. Error:"
                        + e.getLocalizedMessage(), e);
                return false;
            }
            return true;
        }

        public Message getMessage() {
            return message;
        }

        public CachedMessage setMc(MessageContext mc) {
            this.mc = mc;
            return this;
        }

        public CachedMessage setId(String id) {
            this.id = id;
            return this;
        }

        public String getId() {
            return id;
        }
    }
}