package org.wso2.carbon.event.input.adapter.mqtt.internal.util;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.core.ServerStatus;
import org.wso2.carbon.event.input.adapter.core.InputEventAdapterListener;
import org.wso2.carbon.event.input.adapter.core.exception.InputEventAdapterRuntimeException;

/* loaded from: input_file:org/wso2/carbon/event/input/adapter/mqtt/internal/util/MQTTAdapterListener.class */
public class MQTTAdapterListener implements MqttCallback, Runnable {
    private static final Log log = LogFactory.getLog(MQTTAdapterListener.class);
    private MqttClient mqttClient;
    private MqttConnectOptions connectionOptions;
    private boolean cleanSession;
    private int keepAlive;
    private MQTTBrokerConnectionConfiguration mqttBrokerConnectionConfiguration;
    private String mqttClientId;
    private String topic;
    private int tenantId;
    private boolean connectionSucceeded = false;
    private InputEventAdapterListener eventAdapterListener;

    public MQTTAdapterListener(MQTTBrokerConnectionConfiguration mQTTBrokerConnectionConfiguration, String str, String str2, InputEventAdapterListener inputEventAdapterListener, int i) {
        this.eventAdapterListener = null;
        this.mqttClientId = (str2 == null || str2.trim().isEmpty()) ? MqttClient.generateClientId() : str2;
        this.mqttBrokerConnectionConfiguration = mQTTBrokerConnectionConfiguration;
        this.cleanSession = mQTTBrokerConnectionConfiguration.isCleanSession();
        this.keepAlive = mQTTBrokerConnectionConfiguration.getKeepAlive();
        this.topic = str;
        this.eventAdapterListener = inputEventAdapterListener;
        this.tenantId = i;
        MqttDefaultFilePersistence mqttDefaultFilePersistence = new MqttDefaultFilePersistence(System.getProperty("java.io.tmpdir"));
        try {
            this.connectionOptions = new MqttConnectOptions();
            this.connectionOptions.setCleanSession(this.cleanSession);
            this.connectionOptions.setKeepAliveInterval(this.keepAlive);
            if (this.mqttBrokerConnectionConfiguration.getBrokerPassword() != null) {
                this.connectionOptions.setPassword(this.mqttBrokerConnectionConfiguration.getBrokerPassword().toCharArray());
            }
            if (this.mqttBrokerConnectionConfiguration.getBrokerUsername() != null) {
                this.connectionOptions.setUserName(this.mqttBrokerConnectionConfiguration.getBrokerUsername());
            }
            this.mqttClient = new MqttClient(this.mqttBrokerConnectionConfiguration.getBrokerUrl(), this.mqttClientId, mqttDefaultFilePersistence);
            this.mqttClient.setCallback(this);
        } catch (MqttException e) {
            log.error("Exception occurred while subscribing to MQTT broker at " + mQTTBrokerConnectionConfiguration.getBrokerUrl());
            throw new InputEventAdapterRuntimeException(e);
        } catch (Throwable th) {
            log.error("Exception occurred while subscribing to MQTT broker at " + mQTTBrokerConnectionConfiguration.getBrokerUrl());
            throw new InputEventAdapterRuntimeException(th);
        }
    }

    public void startListener() throws MqttException {
        this.mqttClient.connect(this.connectionOptions);
        this.mqttClient.subscribe(this.topic);
    }

    public void stopListener(String str) {
        if (this.connectionSucceeded) {
            try {
                if (!ServerStatus.getCurrentStatus().equals("SHUTTING_DOWN") || this.cleanSession) {
                    this.mqttClient.unsubscribe(this.topic);
                }
                this.mqttClient.disconnect(3000L);
            } catch (MqttException e) {
                log.error("Can not unsubscribe from the destination " + this.topic + " with the event adapter " + str, e);
            }
        }
        this.connectionSucceeded = true;
    }

    public void connectionLost(Throwable th) {
        log.warn("MQTT connection not reachable " + th);
        this.connectionSucceeded = false;
        new Thread(this).start();
    }

    public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
        try {
            try {
                String mqttMessage2 = mqttMessage.toString();
                if (log.isDebugEnabled()) {
                    log.debug(mqttMessage2);
                }
                PrivilegedCarbonContext.startTenantFlow();
                PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(this.tenantId);
                if (log.isDebugEnabled()) {
                    log.debug("Event received in MQTT Event Adapter - " + mqttMessage2);
                }
                this.eventAdapterListener.onEvent(mqttMessage2);
            } catch (InputEventAdapterRuntimeException e) {
                throw new InputEventAdapterRuntimeException(e);
            }
        } finally {
            PrivilegedCarbonContext.endTenantFlow();
        }
    }

    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!this.connectionSucceeded) {
            try {
                MQTTEventAdapterConstants.initialReconnectDuration *= 2;
                Thread.sleep(MQTTEventAdapterConstants.initialReconnectDuration);
                startListener();
                this.connectionSucceeded = true;
                log.info("MQTT Connection successful");
            } catch (MqttException e) {
                log.error("MQTT Exception occurred when starting listener", e);
            } catch (InterruptedException e2) {
                log.error("Interruption occurred while waiting for reconnection", e2);
            }
        }
    }

    public void createConnection() {
        new Thread(this).start();
    }
}
