package org.apache.stratos.messaging.broker.connect.amqp;

import javax.jms.Message;
import javax.jms.Topic;
import javax.jms.TopicSession;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.broker.connect.RetryTimer;
import org.apache.stratos.messaging.broker.connect.TopicSubscriber;
import org.apache.stratos.messaging.broker.subscribe.MessageListener;
import org.apache.stratos.messaging.domain.exception.MessagingException;

/* loaded from: input_file:org/apache/stratos/messaging/broker/connect/amqp/AmqpTopicSubscriber.class */
public class AmqpTopicSubscriber extends AmqpTopicConnector implements TopicSubscriber {
    protected static final Log log = LogFactory.getLog(AmqpTopicSubscriber.class);
    private final MessageListener messageListener;
    private final String topicName;

    public AmqpTopicSubscriber(MessageListener messageListener, String str) {
        this.messageListener = messageListener;
        this.topicName = str;
        create();
    }

    @Override // org.apache.stratos.messaging.broker.connect.TopicSubscriber
    public void subscribe() {
        try {
            TopicSession newSession = newSession();
            Topic lookupTopic = lookupTopic(this.topicName);
            if (lookupTopic == null) {
                lookupTopic = newSession.createTopic(this.topicName);
            }
            newSession.createSubscriber(lookupTopic).setMessageListener(new javax.jms.MessageListener() { // from class: org.apache.stratos.messaging.broker.connect.amqp.AmqpTopicSubscriber.1
                public void onMessage(Message message) {
                    String physicalName;
                    String str;
                    try {
                        if (message instanceof ActiveMQTextMessage) {
                            ActiveMQTextMessage activeMQTextMessage = (ActiveMQTextMessage) message;
                            physicalName = activeMQTextMessage.getDestination().getPhysicalName();
                            str = activeMQTextMessage.getText();
                        } else {
                            if (!(message instanceof ActiveMQBytesMessage)) {
                                throw new RuntimeException(String.format("Could not receive message, unknown JMS message type: %s", message.getClass().getName()));
                            }
                            ActiveMQBytesMessage activeMQBytesMessage = (ActiveMQBytesMessage) message;
                            physicalName = activeMQBytesMessage.getDestination().getPhysicalName();
                            str = new String(activeMQBytesMessage.getContent().data);
                        }
                        AmqpTopicSubscriber.this.messageListener.messageReceived(new org.apache.stratos.messaging.domain.Message(physicalName, str));
                    } catch (Exception e) {
                        AmqpTopicSubscriber.log.error("An error occurred when receiving message", e);
                    }
                }
            });
        } catch (Exception e) {
            String str = "Could not subscribe to topic: " + this.topicName;
            log.error(str, e);
            throw new MessagingException(str, e);
        }
    }

    @Override // org.apache.stratos.messaging.broker.connect.amqp.AmqpTopicConnector
    protected void reconnect() {
        RetryTimer retryTimer = new RetryTimer();
        boolean z = false;
        while (!z) {
            try {
                long nextInterval = retryTimer.getNextInterval();
                log.info(String.format("Topic subscriber will try to reconnect in %d seconds: [topic-name] %s", Long.valueOf(nextInterval / 1000), this.topicName));
                Thread.sleep(nextInterval);
            } catch (InterruptedException e) {
            }
            try {
                disconnect();
                create();
                connect();
                subscribe();
                z = true;
                log.info(String.format("Topic subscriber reconnected: [topic-name] %s", this.topicName));
            } catch (Exception e2) {
                log.warn("Could not reconnect to message broker", e2);
            }
        }
    }
}
