package org.apache.stratos.messaging.broker.connect.mqtt;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.broker.connect.TopicSubscriber;
import org.apache.stratos.messaging.broker.subscribe.MessageListener;
import org.apache.stratos.messaging.domain.Message;
import org.apache.stratos.messaging.domain.exception.MessagingException;
import org.apache.stratos.messaging.util.MessagingUtil;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/* loaded from: input_file:org/apache/stratos/messaging/broker/connect/mqtt/MqttTopicSubscriber.class */
public class MqttTopicSubscriber extends MqttTopicConnector implements TopicSubscriber {
    protected static final Log log = LogFactory.getLog(MqttTopicSubscriber.class);
    private final MessageListener messageListener;
    private String topicName;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/stratos/messaging/broker/connect/mqtt/MqttTopicSubscriber$MQTTSubscriberCallback.class */
    public class MQTTSubscriberCallback implements MqttCallback {
        private MQTTSubscriberCallback() {
        }

        public synchronized void connectionLost(Throwable th) {
            if (MqttTopicSubscriber.log.isWarnEnabled()) {
                MqttTopicSubscriber.log.warn("MQTT Connection is lost, topic: " + MqttTopicSubscriber.this.topicName, th);
            }
            if (!MqttTopicSubscriber.this.mqttClient.isConnected()) {
                reconnect();
            } else if (MqttTopicSubscriber.log.isDebugEnabled()) {
                MqttTopicSubscriber.log.debug("MQTT client is already re-connected");
            }
        }

        private void reconnect() {
            boolean z = false;
            while (!z) {
                try {
                    if (MqttTopicSubscriber.log.isInfoEnabled()) {
                        MqttTopicSubscriber.log.info("Will try to subscribe again in " + (MessagingUtil.getSubscriberFailoverInterval() / 1000) + " sec");
                    }
                    try {
                        Thread.sleep(MessagingUtil.getSubscriberFailoverInterval());
                    } catch (InterruptedException e) {
                    }
                    if (MqttTopicSubscriber.log.isInfoEnabled()) {
                        MqttTopicSubscriber.log.info("Reconnection initiated for topic " + MqttTopicSubscriber.this.topicName);
                    }
                    MqttTopicSubscriber.this.create();
                    MqttTopicSubscriber.this.connect();
                    MqttTopicSubscriber.this.subscribe();
                    z = true;
                } catch (Exception e2) {
                    if (MqttTopicSubscriber.log.isErrorEnabled()) {
                        MqttTopicSubscriber.log.error("Could not reconnect", e2);
                    }
                }
            }
            if (MqttTopicSubscriber.log.isInfoEnabled()) {
                MqttTopicSubscriber.log.info("Re-connected and subscribed to topic " + MqttTopicSubscriber.this.topicName);
            }
        }

        public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
            String str2 = new String(mqttMessage.getPayload());
            if (MqttTopicSubscriber.log.isDebugEnabled()) {
                MqttTopicSubscriber.log.debug(String.format("Message received: %s", str2));
            }
            MqttTopicSubscriber.this.messageListener.messageReceived(new Message(str, str2));
        }

        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
            if (MqttTopicSubscriber.log.isDebugEnabled()) {
                Log log = MqttTopicSubscriber.log;
                Object[] objArr = new Object[1];
                objArr[0] = iMqttDeliveryToken != null ? iMqttDeliveryToken.toString() : "";
                log.debug(String.format("Message delivery is complete: %s", objArr));
            }
        }
    }

    public MqttTopicSubscriber(MessageListener messageListener, String str) {
        this.messageListener = messageListener;
        this.topicName = str;
        create();
    }

    @Override // org.apache.stratos.messaging.broker.connect.TopicSubscriber
    public void subscribe() {
        try {
            if (log.isDebugEnabled()) {
                log.debug("Subscribing to topic " + this.topicName);
            }
            if (this.mqttClient == null) {
                if (log.isErrorEnabled()) {
                    log.error("Could not subscribe to topic, MQTT client has not been initialized");
                }
                throw new MessagingException("Could not subscribe to topic, MQTT client has not been initialized");
            }
            this.mqttClient.setCallback(new MQTTSubscriberCallback());
            this.mqttClient.subscribe(this.topicName, 2);
            if (log.isDebugEnabled()) {
                log.debug("Subscribed to topic " + this.topicName);
            }
        } catch (Exception e) {
            String str = "Error in subscribing to topic " + this.topicName;
            log.error(str, e);
            throw new MessagingException(str, e);
        }
    }
}
