/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.sdk.iot.device.transport.mqtt;

import com.microsoft.azure.sdk.iot.device.DeviceClientConfig;
import com.microsoft.azure.sdk.iot.device.Message;
import com.microsoft.azure.sdk.iot.device.auth.IotHubSasToken;
import com.microsoft.azure.sdk.iot.device.transport.TransportUtils;
import com.microsoft.azure.sdk.iot.device.transport.mqtt.MqttConnection;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.security.InvalidParameterException;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public abstract class Mqtt
implements MqttCallback {
    private MqttConnection mqttConnection;
    private DeviceClientConfig deviceClientConfig = null;
    ConcurrentLinkedQueue<Pair<String, byte[]>> allReceivedMessages;
    Object mqttLock = null;
    private boolean userSpecifiedSASTokenExpiredOnRetry = false;
    static final char MESSAGE_PROPERTY_SEPARATOR = '&';
    private static final String MESSAGE_SYSTEM_PROPERTY_IDENTIFIER_ENCODED = "%24";
    private static final char MESSAGE_SYSTEM_PROPERTY_IDENTIFIER_DECODED = '$';
    static final char MESSAGE_PROPERTY_KEY_VALUE_SEPARATOR = '=';
    private static final int PROPERTY_KEY_INDEX = 0;
    private static final int PROPERTY_VALUE_INDEX = 1;
    private static final String ABSOLUTE_EXPIRY_TIME = "$.exp";
    static final String CORRELATION_ID = "$.cid";
    static final String MESSAGE_ID = "$.mid";
    static final String TO = "$.to";
    static final String USER_ID = "$.uid";
    private static final String IOTHUB_ACK = "iothub-ack";

    public Mqtt(MqttConnection mqttConnection) throws IllegalArgumentException {
        if (mqttConnection == null) {
            throw new IllegalArgumentException("Mqtt connection info cannot be null");
        }
        this.mqttConnection = mqttConnection;
        this.allReceivedMessages = mqttConnection.getAllReceivedMessages();
        this.mqttLock = mqttConnection.getMqttLock();
        this.userSpecifiedSASTokenExpiredOnRetry = false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void connect() throws IOException {
        Object object = this.mqttLock;
        synchronized (object) {
            try {
                if (this.mqttConnection == null) {
                    throw new IOException("Mqtt client should be initialised atleast once before using it");
                }
                if (!this.mqttConnection.getMqttAsyncClient().isConnected()) {
                    IMqttToken connectToken = this.mqttConnection.getMqttAsyncClient().connect(this.mqttConnection.getConnectionOptions());
                    connectToken.waitForCompletion();
                }
            }
            catch (MqttException e) {
                throw new IOException("Unable to connect to service" + e.getCause());
            }
        }
    }

    protected void disconnect() throws IOException {
        try {
            if (this.mqttConnection.getMqttAsyncClient() != null && this.mqttConnection.getMqttAsyncClient().isConnected()) {
                IMqttToken disconnectToken = this.mqttConnection.getMqttAsyncClient().disconnect();
                disconnectToken.waitForCompletion();
            }
            this.mqttConnection.setMqttAsyncClient(null);
        }
        catch (MqttException e) {
            throw new IOException("Unable to disconnectbecause " + e.getMessage());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void publish(String publishTopic, byte[] payload) throws IOException {
        Object object = this.mqttLock;
        synchronized (object) {
            try {
                if (this.mqttConnection == null) {
                    throw new InvalidParameterException();
                }
                if (this.userSpecifiedSASTokenExpiredOnRetry) {
                    throw new IOException("Cannot publish when user supplied SAS token has expired");
                }
                if (!this.mqttConnection.getMqttAsyncClient().isConnected()) {
                    throw new IOException("Cannot publish when mqtt client is disconnected");
                }
                if (publishTopic == null || publishTopic.length() == 0 || payload == null) {
                    throw new IOException("Cannot publish on null or empty publish topic");
                }
                while (this.mqttConnection.getMqttAsyncClient().getPendingDeliveryTokens().length >= 10) {
                    Thread.sleep(10L);
                    if (this.mqttConnection.getMqttAsyncClient().isConnected()) continue;
                    throw new IOException("Cannot publish when mqtt client is holding 10 tokens and  is disconnected");
                }
                MqttMessage mqttMessage = payload.length == 0 ? new MqttMessage() : new MqttMessage(payload);
                mqttMessage.setQos(1);
                IMqttDeliveryToken iMqttDeliveryToken = this.mqttConnection.getMqttAsyncClient().publish(publishTopic, mqttMessage);
            }
            catch (MqttException e) {
                throw new IOException("Unable to publish message on topic : " + publishTopic + " because " + e.getCause() + e.getMessage());
            }
            catch (InterruptedException e) {
                throw new IOException("Interrupted, Unable to publish message on topic : " + publishTopic);
            }
            catch (Exception e) {
                throw new IOException("Unable to publish message on topic : " + publishTopic + " " + e.getCause() + e.getMessage());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void subscribe(String topic) throws IOException {
        Object object = this.mqttLock;
        synchronized (object) {
            try {
                if (this.mqttConnection == null) {
                    throw new IOException("Mqtt client should be initialised atleast once before using it");
                }
                if (topic == null) {
                    throw new InvalidParameterException("Topic cannot be null");
                }
                if (this.userSpecifiedSASTokenExpiredOnRetry) {
                    throw new IOException("Cannot subscribe when user supplied SAS token has expired");
                }
                if (!this.mqttConnection.getMqttAsyncClient().isConnected()) {
                    throw new IOException("Cannot suscribe when mqtt client is disconnected");
                }
                IMqttToken subToken = this.mqttConnection.getMqttAsyncClient().subscribe(topic, 1);
                subToken.waitForCompletion(1000L);
            }
            catch (MqttException e) {
                throw new IOException("Unable to subscribe to topic :" + topic + " because " + e.getCause() + e.getMessage());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void unsubscribe(String topic) throws IOException {
        Object object = this.mqttLock;
        synchronized (object) {
            try {
                if (!this.mqttConnection.getMqttAsyncClient().isConnected()) {
                    throw new IOException("Cannot unsubscribe when mqtt client is disconnected");
                }
                if (this.userSpecifiedSASTokenExpiredOnRetry) {
                    throw new IOException("Cannot unsubscribe when user supplied SAS token has expired");
                }
                IMqttToken subToken = this.mqttConnection.getMqttAsyncClient().unsubscribe(topic);
                subToken.waitForCompletion();
            }
            catch (MqttException e) {
                throw new IOException("Unable to unsubscribe to topic :" + topic + "because " + e.getCause() + e.getMessage());
            }
        }
    }

    protected boolean isConnected() {
        if (this.mqttConnection == null || this.mqttConnection.getMqttAsyncClient() == null) {
            throw new InvalidParameterException("Mqtt client should be initialised atleast once before using it");
        }
        return this.mqttConnection.getMqttAsyncClient().isConnected();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Message receive() throws IOException {
        Object object = this.mqttLock;
        synchronized (object) {
            if (this.mqttConnection == null) {
                throw new InvalidParameterException("Mqtt client should be initialised at least once before using it");
            }
            Pair<String, byte[]> messagePair = this.peekMessage();
            if (messagePair != null) {
                String topic = (String)messagePair.getKey();
                if (topic != null) {
                    byte[] data = (byte[])messagePair.getValue();
                    if (data != null) {
                        this.allReceivedMessages.poll();
                        return this.constructMessage(data, topic);
                    }
                    throw new IOException("Data cannot be null when topic is non-null");
                }
                return null;
            }
            return null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void connectionLost(Throwable throwable) {
        Object object = this.mqttLock;
        synchronized (object) {
            if (this.mqttConnection != null && this.mqttConnection.getMqttAsyncClient() != null) {
                int currentReconnectionAttempt = 0;
                while (!this.mqttConnection.getMqttAsyncClient().isConnected()) {
                    System.out.println("Lost connection to the server. Reconnecting " + currentReconnectionAttempt + " time.");
                    try {
                        ++currentReconnectionAttempt;
                        if (this.deviceClientConfig.getAuthenticationType() == DeviceClientConfig.AuthType.SAS_TOKEN) {
                            if (!IotHubSasToken.isExpired(new String(this.mqttConnection.getConnectionOptions().getPassword()))) {
                                this.connect();
                                continue;
                            }
                            if (this.deviceClientConfig.getIotHubConnectionString().getSharedAccessKey() != null) {
                                String sasToken = this.deviceClientConfig.getSasTokenAuthentication().getRenewedSasToken();
                                this.mqttConnection.getConnectionOptions().setPassword(sasToken.toCharArray());
                                this.connect();
                                continue;
                            }
                            this.userSpecifiedSASTokenExpiredOnRetry = true;
                            return;
                        }
                        if (this.deviceClientConfig.getAuthenticationType() != DeviceClientConfig.AuthType.X509_CERTIFICATE) continue;
                        this.connect();
                    }
                    catch (IOException e) {
                        try {
                            Thread.sleep(TransportUtils.generateSleepInterval(currentReconnectionAttempt));
                        }
                        catch (InterruptedException interruptedException) {}
                    }
                }
            } else {
                System.out.println("Initialise before using this..");
            }
        }
    }

    public void messageArrived(String topic, MqttMessage mqttMessage) {
        this.mqttConnection.getAllReceivedMessages().add((Pair<String, byte[]>)new MutablePair((Object)topic, (Object)mqttMessage.getPayload()));
    }

    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    }

    public Pair<String, byte[]> peekMessage() throws IOException {
        if (this.allReceivedMessages == null) {
            throw new IOException("Queue cannot be null");
        }
        return this.allReceivedMessages.peek();
    }

    private Message constructMessage(byte[] data, String topic) throws IllegalArgumentException {
        Message message = new Message(data);
        int propertiesStringStartingIndex = topic.indexOf(MESSAGE_SYSTEM_PROPERTY_IDENTIFIER_ENCODED);
        if (propertiesStringStartingIndex != -1) {
            String propertiesString = topic.substring(propertiesStringStartingIndex);
            this.assignPropertiesToMessage(message, propertiesString);
        }
        return message;
    }

    private void assignPropertiesToMessage(Message message, String propertiesString) throws IllegalArgumentException, NumberFormatException {
        for (String propertyString : propertiesString.split(String.valueOf('&'))) {
            if (propertyString.contains("=")) {
                String key = propertyString.split("=")[0];
                String value = propertyString.split("=")[1];
                try {
                    key = URLDecoder.decode(key, StandardCharsets.UTF_8.name());
                    value = URLDecoder.decode(value, StandardCharsets.UTF_8.name());
                }
                catch (UnsupportedEncodingException e) {
                    throw new IllegalStateException(e);
                }
                switch (key) {
                    case "$.to": {
                        break;
                    }
                    case "$.mid": {
                        message.setMessageId(value);
                        break;
                    }
                    case "iothub-ack": {
                        break;
                    }
                    case "$.cid": {
                        message.setCorrelationId(value);
                        break;
                    }
                    case "$.uid": {
                        break;
                    }
                    case "$.exp": {
                        break;
                    }
                    default: {
                        message.setProperty(key, value);
                        break;
                    }
                }
                continue;
            }
            throw new IllegalArgumentException("Unexpected property string provided. Expected '=' symbol between key and value of the property in string: " + propertyString);
        }
    }

    protected void setDeviceClientConfig(DeviceClientConfig deviceConfig) throws IllegalArgumentException {
        if (deviceConfig == null) {
            throw new IllegalArgumentException("DeviceClientConfig is null");
        }
        this.deviceClientConfig = deviceConfig;
    }
}

