package org.apache.stratos.messaging.broker.subscribe;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.broker.connect.RetryTimer;
import org.apache.stratos.messaging.broker.connect.TopicSubscriber;
import org.apache.stratos.messaging.broker.connect.TopicSubscriberFactory;
import org.apache.stratos.messaging.util.MessagingUtil;
import org.eclipse.paho.client.mqttv3.MqttException;

/* loaded from: input_file:org/apache/stratos/messaging/broker/subscribe/EventSubscriber.class */
public class EventSubscriber implements Runnable {
    private static final Log log = LogFactory.getLog(EventSubscriber.class);
    private final TopicSubscriber topicSubscriber;
    private final String topicName;
    private boolean subscribed;

    public EventSubscriber(String str, MessageListener messageListener) {
        this.topicName = str;
        String messagingProtocol = MessagingUtil.getMessagingProtocol();
        this.topicSubscriber = TopicSubscriberFactory.createTopicSubscriber(messagingProtocol, messageListener, str);
        if (log.isDebugEnabled()) {
            log.debug(String.format("Topic subscriber created: [protocol] %s [topic] %s", messagingProtocol, str));
        }
    }

    private void doSubscribe() throws MqttException {
        if (log.isDebugEnabled()) {
            log.debug(String.format("Subscribing to topic: [topic] %s [server] %s", this.topicName, this.topicSubscriber.getServerURI()));
        }
        this.topicSubscriber.connect();
        this.topicSubscriber.subscribe();
        this.subscribed = true;
    }

    @Override // java.lang.Runnable
    public void run() {
        RetryTimer retryTimer = new RetryTimer();
        while (!this.subscribed) {
            try {
                doSubscribe();
            } catch (Exception e) {
                this.subscribed = false;
                if (log.isErrorEnabled()) {
                    log.error("Error while subscribing to topic: " + this.topicName, e);
                }
                long nextInterval = retryTimer.getNextInterval();
                if (log.isInfoEnabled()) {
                    log.info("Will try to subscribe again in " + (nextInterval / 1000) + " sec");
                }
                try {
                    Thread.sleep(nextInterval);
                } catch (InterruptedException e2) {
                }
            }
        }
    }

    public void terminate() {
        if (this.topicSubscriber != null) {
            this.topicSubscriber.disconnect();
        }
    }

    public boolean isSubscribed() {
        return this.subscribed;
    }
}
