RabbitMQProducer.java

/**
 * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
 * <p>
 * WSO2 Inc. licenses this file to you under the Apache License,
 * Version 2.0 (the "License"); you may not use this file except
 * in compliance with the License.
 * You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.synapse.message.store.impl.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.MessageContext;
import org.apache.synapse.message.MessageProducer;

import org.apache.synapse.message.store.impl.commons.MessageConverter;
import org.apache.synapse.message.store.impl.commons.StorableMessage;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.util.concurrent.TimeoutException;

/**
 * The message producer responsible to store message into RabbitMQ queue
 */
public class RabbitMQProducer implements MessageProducer {

    private static final Log log = LogFactory.getLog(RabbitMQProducer.class.getName());

    private static final int DEFAULT_PRIORITY = 0;
    private Connection connection;
    private RabbitMQStore store;
    private String routingKey;
    private String exchangeName;
    private boolean isInitialized = false;
    private String idString; // ID of the MessageProducer
    private boolean publisherConfirmsEnabled;

    /**
     * The RabbitMQ producer
     *
     * @param store the {@link RabbitMQStore} object
     */
    public RabbitMQProducer(RabbitMQStore store) {
        if (store == null) {
            log.error("Cannot initialize producer: " + getId());
            return;
        }
        this.store = store;
        isInitialized = true;
    }

    /**
     * Store the given message into the queue and return whether the operation success or not
     *
     * @param synCtx Message to be saved.
     * @return {@code true} if storing of the message is successful, {@code false} otherwise.
     */
    @Override
    public boolean storeMessage(MessageContext synCtx) {
        if (synCtx == null) {
            return false;
        }
        if (connection == null) {
            log.error(getId() + " cannot proceed. RabbitMQ Connection is null. Ignored MessageId: " +
                    synCtx.getMessageID());
            return false;
        }
        boolean result = false;
        try (Channel channel = connection.createChannel()) {
            if (publisherConfirmsEnabled) {
                channel.confirmSelect();
            }
            StorableMessage storableMessage = MessageConverter.toStorableMessage(synCtx);
            final byte[] message = serializeMessage(storableMessage);
            final AMQP.BasicProperties basicProperties = getBasicProperties(synCtx, storableMessage);
            publishMessage(channel, exchangeName, routingKey, basicProperties, message);
            if (publisherConfirmsEnabled) {
                result = channel.waitForConfirms();
            } else {
                result = true;
            }
            if (log.isDebugEnabled()) {
                log.debug(getId() + ". Stored MessageId: " + synCtx.getMessageID());
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (TimeoutException | IOException e) {
            String errorMsg = getId() + ". Ignored MessageId: " + synCtx.getMessageID() + ". " +
                    "Could not store message to store [" + store.getName() + "]. " +
                    "Error:" + e.getLocalizedMessage();
            log.error(errorMsg, e);
        }
        store.enqueued();
        return result;
    }

    /**
     * Serialize the message to store in the queue
     *
     * @param storableMessage the {@link StorableMessage} object
     * @return serialize message as byte array
     * @throws IOException
     */
    private byte[] serializeMessage(StorableMessage storableMessage) throws IOException {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        ObjectOutput objectOutput = new ObjectOutputStream(outputStream);
        objectOutput.writeObject(storableMessage);
        return outputStream.toByteArray();
    }

    /**
     * Build AMQP basic properties from the message context
     *
     * @param synCtx          the {@link MessageContext} object
     * @param storableMessage the {@link StorableMessage} object
     * @return AMQP basic properties
     */
    private AMQP.BasicProperties getBasicProperties(MessageContext synCtx, StorableMessage storableMessage) {
        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties().builder();
        builder.messageId(synCtx.getMessageID());
        builder.deliveryMode(MessageProperties.MINIMAL_PERSISTENT_BASIC.getDeliveryMode());
        builder.priority(storableMessage.getPriority(DEFAULT_PRIORITY));
        return builder.build();
    }

    /**
     * Perform basic publish
     *
     * @param channel         the channel
     * @param exchangeName    the exchange to publish the message to
     * @param routingKey      the routing key
     * @param basicProperties other properties for the message
     * @param messageBody     the message body
     * @throws IOException
     */
    private void publishMessage(Channel channel, String exchangeName, String routingKey,
                                AMQP.BasicProperties basicProperties, byte[] messageBody) throws IOException {
        if (StringUtils.isNotEmpty(exchangeName)) {
            channel.basicPublish(exchangeName, routingKey, basicProperties, messageBody);
        } else {
            channel.basicPublish("", routingKey, basicProperties, messageBody);
        }
    }

    /**
     * Used to close the channel opened in this object instance.
     * This should be called after the end of each call on storeMessage method
     * But instead of this, try with resources will close the channel
     */
    @Override
    public boolean cleanup() {
        return true;
    }

    /**
     * Get ID of this RabbitMQ producer
     *
     * @return the ID
     */
    @Override
    public String getId() {
        return idString;
    }

    /**
     * Set ID of this RabbitMQ producer
     *
     * @param id ID
     */
    @Override
    public void setId(int id) {
        idString = "[" + store.getName() + "-P-" + id + "]";
    }

    /**
     * Set the routing key bind with the exchange
     *
     * @param routingKey the message routing key
     */
    public void setRoutingKey(String routingKey) {
        this.routingKey = routingKey;
    }

    /**
     * Set the exchange name to publish the message
     *
     * @param exchangeName the exchange to publish the message to
     */
    public void setExchangeName(String exchangeName) {
        this.exchangeName = exchangeName;
    }

    /**
     * Set the {@link Connection} object
     *
     * @param connection a {@link Connection} object
     */
    public void setConnection(Connection connection) {
        this.connection = connection;
    }

    /**
     * Set the publisher confirm enabled or not
     *
     * @param publisherConfirmsEnabled publsher confirm enabled or not
     */
    public void setPublisherConfirmsEnabled(boolean publisherConfirmsEnabled) {
        this.publisherConfirmsEnabled = publisherConfirmsEnabled;
    }

    /**
     * Verify to whether producer was initialized
     *
     * @return is initialized
     */
    public boolean isInitialized() {
        return isInitialized;
    }

}