package org.apache.stratos.messaging.broker.connect.mqtt;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.broker.connect.TopicConnector;
import org.apache.stratos.messaging.domain.exception.MessagingException;
import org.apache.stratos.messaging.util.MessagingConstants;
import org.apache.stratos.messaging.util.MessagingUtil;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/* loaded from: input_file:org/apache/stratos/messaging/broker/connect/mqtt/MqttTopicConnector.class */
public abstract class MqttTopicConnector implements TopicConnector {
    private static final Log log = LogFactory.getLog(MqttTopicConnector.class);
    protected MqttClient mqttClient;

    @Override // org.apache.stratos.messaging.broker.connect.TopicConnector
    public void connect() {
        try {
            if (log.isDebugEnabled()) {
                log.debug("Connecting to message broker");
            }
            if (this.mqttClient == null) {
                if (log.isDebugEnabled()) {
                    log.debug("MQTT client initialization has failed previously, trying again");
                }
                create();
            }
            MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
            mqttConnectOptions.setCleanSession(true);
            this.mqttClient.connect(mqttConnectOptions);
        } catch (Exception e) {
            log.error("Could not connect to message broker", e);
            throw new MessagingException("Could not connect to message broker", e);
        }
    }

    @Override // org.apache.stratos.messaging.broker.connect.TopicConnector
    public void create() {
        try {
            String property = System.getProperty("mqtturl");
            if (StringUtils.isBlank(property)) {
                property = MessagingConstants.MQTT_PROPERTIES.getProperty("mqtturl", MessagingConstants.MQTT_URL_DEFAULT);
            }
            MemoryPersistence memoryPersistence = new MemoryPersistence();
            String randomString = MessagingUtil.getRandomString(23);
            this.mqttClient = new MqttClient(property, randomString, memoryPersistence);
            if (log.isDebugEnabled()) {
                log.debug("MQTT client created: [client-id] " + randomString);
            }
        } catch (Exception e) {
            log.error("Could not create MQTT client", e);
            throw new MessagingException("Could not create MQTT client", e);
        }
    }

    @Override // org.apache.stratos.messaging.broker.connect.TopicConnector
    public void disconnect() {
        try {
            if (log.isDebugEnabled()) {
                log.debug("Disconnecting from message broker");
            }
            if (this.mqttClient == null) {
                if (log.isWarnEnabled()) {
                    log.warn("Could not disconnect from message broker, MQTT client has not been initialized");
                }
            } else {
                synchronized (this.mqttClient) {
                    if (this.mqttClient.isConnected()) {
                        this.mqttClient.disconnect();
                    }
                    closeConnection();
                }
            }
        } catch (Exception e) {
            log.error("Error in disconnecting from Message Broker", e);
        }
    }

    private void closeConnection() {
        try {
            try {
                if (log.isDebugEnabled()) {
                    log.debug("Closing connection to message broker");
                }
                if (this.mqttClient != null) {
                    this.mqttClient.close();
                    this.mqttClient = null;
                } else {
                    if (log.isWarnEnabled()) {
                        log.warn("Could not close connection, MQTT client has not been initialized");
                    }
                    this.mqttClient = null;
                }
            } catch (Exception e) {
                log.error("Could not close MQTT client", e);
                this.mqttClient = null;
            }
        } catch (Throwable th) {
            this.mqttClient = null;
            throw th;
        }
    }

    @Override // org.apache.stratos.messaging.broker.connect.TopicConnector
    public String getServerURI() {
        if (this.mqttClient == null) {
            return null;
        }
        return this.mqttClient.getServerURI();
    }
}
