/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.sdk.iot.device.transport.mqtt;

import com.microsoft.azure.sdk.iot.device.IotHubSSLContext;
import com.microsoft.azure.sdk.iot.device.Message;
import com.microsoft.azure.sdk.iot.device.transport.TransportUtils;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.security.InvalidParameterException;
import java.util.concurrent.ConcurrentSkipListMap;
import javax.net.SocketFactory;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public abstract class Mqtt
implements MqttCallback {
    private static MqttConnectionInfo info;
    static ConcurrentSkipListMap<String, byte[]> allReceivedMessages;
    private static Object MQTT_LOCK;
    protected static final char MESSAGE_PROPERTY_SEPARATOR = '&';
    private static final String MESSAGE_SYSTEM_PROPERTY_IDENTIFIER_ENCODED = "%24";
    private static final char MESSAGE_SYSTEM_PROPERTY_IDENTIFIER_DECODED = '$';
    protected static final char MESSAGE_PROPERTY_KEY_VALUE_SEPARATOR = '=';
    private static final int PROPERTY_KEY_INDEX = 0;
    private static final int PROPERTY_VALUE_INDEX = 1;
    protected static final String ABSOLUTE_EXPIRY_TIME = "$.exp";
    protected static final String CORRELATION_ID = "$.cid";
    protected static final String MESSAGE_ID = "$.mid";
    protected static final String TO = "$.to";
    protected static final String USER_ID = "$.uid";
    protected static final String IOTHUB_ACK = "iothub-ack";

    abstract String parseTopic() throws IOException;

    abstract byte[] parsePayload(String var1) throws IOException;

    private void setMqttInfo(String serverURI, String clientId, String userName, String password, IotHubSSLContext iotHubSSLContext) throws IOException {
        if (info == null) {
            info = new MqttConnectionInfo(serverURI, clientId, userName, password, iotHubSSLContext);
            allReceivedMessages = new ConcurrentSkipListMap();
            MQTT_LOCK = new Object();
        }
    }

    public Mqtt() {
        if (MQTT_LOCK == null) {
            MQTT_LOCK = new Object();
        }
    }

    public Mqtt(String serverURI, String clientId, String userName, String userPassword, IotHubSSLContext iotHubSSLContext) throws IOException {
        if (serverURI == null || clientId == null || userName == null || userPassword == null || iotHubSSLContext == null) {
            throw new InvalidParameterException();
        }
        if (serverURI.length() == 0 || clientId.length() == 0 || userName.length() == 0 || userPassword.length() == 0) {
            throw new InvalidParameterException();
        }
        try {
            this.setMqttInfo(serverURI, clientId, userName, userPassword, iotHubSSLContext);
        }
        catch (IOException e) {
            info = null;
            allReceivedMessages = null;
            MQTT_LOCK = null;
            throw new IOException(e.getMessage());
        }
    }

    public void restartBaseMqtt() {
        MQTT_LOCK = null;
        allReceivedMessages = null;
        info = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void connect() throws IOException {
        Object object = MQTT_LOCK;
        synchronized (object) {
            try {
                if (info == null) {
                    throw new IOException("Mqtt client should be initialised atleast once before using it");
                }
                if (!Mqtt.info.mqttAsyncClient.isConnected()) {
                    IMqttToken connectToken = Mqtt.info.mqttAsyncClient.connect(info.connectionOptions);
                    connectToken.waitForCompletion();
                }
            }
            catch (MqttException e) {
                throw new IOException("Unable to connect to service" + e.getMessage());
            }
        }
    }

    protected void disconnect() throws IOException {
        try {
            if (Mqtt.info.mqttAsyncClient.isConnected()) {
                IMqttToken disconnectToken = Mqtt.info.mqttAsyncClient.disconnect();
                disconnectToken.waitForCompletion();
            }
            Mqtt.info.mqttAsyncClient = null;
        }
        catch (MqttException e) {
            throw new IOException("Unable to disconnectbecause " + e.getMessage());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void publish(String publishTopic, byte[] payload) throws IOException {
        Object object = MQTT_LOCK;
        synchronized (object) {
            try {
                if (info == null) {
                    System.out.println("Mqtt client should be initialised atleast once before using it");
                    throw new InvalidParameterException();
                }
                if (!Mqtt.info.mqttAsyncClient.isConnected()) {
                    throw new IOException("Cannot publish when mqtt client is disconnected");
                }
                if (publishTopic == null || publishTopic.length() == 0 || payload == null) {
                    throw new IOException("Cannot publish on null or empty publish topic");
                }
                while (Mqtt.info.mqttAsyncClient.getPendingDeliveryTokens().length >= 10) {
                    Thread.sleep(10L);
                    if (Mqtt.info.mqttAsyncClient.isConnected()) continue;
                    throw new IOException("Cannot publish when mqtt client is holding 10 tokens and  is disconnected");
                }
                MqttMessage mqttMessage = payload.length == 0 ? new MqttMessage() : new MqttMessage(payload);
                mqttMessage.setQos(1);
                IMqttDeliveryToken iMqttDeliveryToken = Mqtt.info.mqttAsyncClient.publish(publishTopic, mqttMessage);
            }
            catch (MqttException e) {
                throw new IOException("Unable to publish message on topic : " + publishTopic + " because " + e.getCause() + e.getMessage());
            }
            catch (InterruptedException e) {
                throw new IOException("Interrupted, Unable to publish message on topic : " + publishTopic);
            }
            catch (Exception e) {
                throw new IOException("Unable to publish message on topic : " + publishTopic + " " + e.getCause() + e.getMessage());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void subscribe(String topic) throws IOException {
        Object object = MQTT_LOCK;
        synchronized (object) {
            try {
                if (info == null) {
                    throw new IOException("Mqtt client should be initialised atleast once before using it");
                }
                if (topic == null) {
                    throw new InvalidParameterException("Topic cannot be null");
                }
                if (!Mqtt.info.mqttAsyncClient.isConnected()) {
                    throw new IOException("Cannot suscribe when mqtt client is disconnected");
                }
                IMqttToken subToken = Mqtt.info.mqttAsyncClient.subscribe(topic, 1);
                subToken.waitForCompletion(1000L);
            }
            catch (MqttException e) {
                throw new IOException("Unable to subscribe to topic :" + topic + " because " + e.getCause() + e.getMessage());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void unsubscribe(String topic) throws IOException {
        Object object = MQTT_LOCK;
        synchronized (object) {
            try {
                if (!Mqtt.info.mqttAsyncClient.isConnected()) {
                    throw new IOException("Cannot unsubscribe when mqtt client is disconnected");
                }
                IMqttToken subToken = Mqtt.info.mqttAsyncClient.unsubscribe(topic);
                subToken.waitForCompletion();
            }
            catch (MqttException e) {
                throw new IOException("Unable to unsubscribe to topic :" + topic + "because " + e.getCause() + e.getMessage());
            }
        }
    }

    protected boolean isConnected() {
        if (info == null || Mqtt.info.mqttAsyncClient == null) {
            throw new InvalidParameterException("Mqtt client should be initialised atleast once before using it");
        }
        return Mqtt.info.mqttAsyncClient.isConnected();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Message receive() throws IOException {
        Object object = MQTT_LOCK;
        synchronized (object) {
            if (info == null) {
                throw new InvalidParameterException("Mqtt client should be initialised at least once before using it");
            }
            String topic = this.parseTopic();
            if (topic != null) {
                byte[] data = this.parsePayload(topic);
                if (data != null) {
                    return this.constructMessage(data, topic);
                }
                throw new IOException("Data cannot be null when topic is non-null");
            }
            return null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void connectionLost(Throwable throwable) {
        Object object = MQTT_LOCK;
        synchronized (object) {
            if (info != null && Mqtt.info.mqttAsyncClient != null) {
                int currentReconnectionAttempt = 0;
                while (!Mqtt.info.mqttAsyncClient.isConnected()) {
                    System.out.println("Lost connection to the server. Reconnecting " + currentReconnectionAttempt + " time.");
                    try {
                        ++currentReconnectionAttempt;
                        this.connect();
                    }
                    catch (Exception e) {
                        try {
                            Thread.sleep(TransportUtils.generateSleepInterval(currentReconnectionAttempt));
                        }
                        catch (InterruptedException interruptedException) {}
                    }
                }
            } else {
                System.out.println("Initialise before using this..");
            }
        }
    }

    public void messageArrived(String topic, MqttMessage mqttMessage) {
        allReceivedMessages.put(topic, mqttMessage.getPayload());
    }

    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    }

    private Message constructMessage(byte[] data, String topic) throws IllegalArgumentException {
        Message message = new Message(data);
        int propertiesStringStartingIndex = topic.indexOf(MESSAGE_SYSTEM_PROPERTY_IDENTIFIER_ENCODED);
        if (propertiesStringStartingIndex != -1) {
            String propertiesString = topic.substring(propertiesStringStartingIndex);
            this.assignPropertiesToMessage(message, propertiesString);
        }
        return message;
    }

    private void assignPropertiesToMessage(Message message, String propertiesString) throws IllegalArgumentException, NumberFormatException {
        for (String propertyString : propertiesString.split(String.valueOf('&'))) {
            if (propertyString.contains("=")) {
                String key = propertyString.split("=")[0];
                String value = propertyString.split("=")[1];
                try {
                    key = URLDecoder.decode(key, StandardCharsets.UTF_8.name());
                    value = URLDecoder.decode(value, StandardCharsets.UTF_8.name());
                }
                catch (UnsupportedEncodingException e) {
                    throw new IllegalStateException(e);
                }
                switch (key) {
                    case "$.to": {
                        break;
                    }
                    case "$.mid": {
                        message.setMessageId(value);
                        break;
                    }
                    case "iothub-ack": {
                        break;
                    }
                    case "$.cid": {
                        message.setCorrelationId(value);
                        break;
                    }
                    case "$.uid": {
                        break;
                    }
                    case "$.exp": {
                        break;
                    }
                    default: {
                        message.setProperty(key, value);
                        break;
                    }
                }
                continue;
            }
            throw new IllegalArgumentException("Unexpected property string provided. Expected '=' symbol between key and value of the property in string: " + propertyString);
        }
    }

    protected class MqttConnectionInfo {
        protected MqttAsyncClient mqttAsyncClient = null;
        private MqttConnectOptions connectionOptions = null;
        private static final int KEEP_ALIVE_INTERVAL = 20;
        private static final int MQTT_VERSION = 4;
        private static final boolean SET_CLEAN_SESSION = false;
        private static final int QOS = 1;
        private static final int MAX_WAIT_TIME = 1000;
        private static final int MAX_IN_FLIGHT_COUNT = 10;

        MqttConnectionInfo(String serverURI, String clientId, String userName, String password, IotHubSSLContext iotHubSSLContext) throws IOException {
            try {
                this.mqttAsyncClient = new MqttAsyncClient(serverURI, clientId, (MqttClientPersistence)new MemoryPersistence());
                this.mqttAsyncClient.setCallback((MqttCallback)Mqtt.this);
                this.connectionOptions = new MqttConnectOptions();
                this.updateConnectionOptions(userName, password, iotHubSSLContext);
            }
            catch (MqttException e) {
                this.mqttAsyncClient = null;
                this.connectionOptions = null;
                throw new IOException("Error initializing MQTT connection:" + e.getMessage());
            }
        }

        private void updateConnectionOptions(String userName, String userPassword, IotHubSSLContext iotHubSSLContext) {
            this.connectionOptions.setKeepAliveInterval(20);
            this.connectionOptions.setCleanSession(false);
            this.connectionOptions.setMqttVersion(4);
            this.connectionOptions.setUserName(userName);
            this.connectionOptions.setPassword(userPassword.toCharArray());
            this.connectionOptions.setSocketFactory((SocketFactory)iotHubSSLContext.getIotHubSSlContext().getSocketFactory());
        }
    }
}

