/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.sdk.iot.device.transport.mqtt;

import com.microsoft.azure.sdk.iot.device.DeviceTwin.DeviceOperations;
import com.microsoft.azure.sdk.iot.device.Message;
import com.microsoft.azure.sdk.iot.device.MessageType;
import com.microsoft.azure.sdk.iot.device.exceptions.TransportException;
import com.microsoft.azure.sdk.iot.device.transport.IotHubListener;
import com.microsoft.azure.sdk.iot.device.transport.IotHubTransportMessage;
import com.microsoft.azure.sdk.iot.device.transport.mqtt.MqttConnection;
import com.microsoft.azure.sdk.iot.device.transport.mqtt.MqttMessageListener;
import com.microsoft.azure.sdk.iot.device.transport.mqtt.exceptions.PahoExceptionTranslator;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public abstract class Mqtt
implements MqttCallback {
    private static final int CONNECTION_TIMEOUT = 60000;
    private static final int DISCONNECTION_TIMEOUT = 60000;
    private MqttConnection mqttConnection;
    private MqttMessageListener messageListener;
    ConcurrentLinkedQueue<Pair<String, byte[]>> allReceivedMessages;
    private final Object stateLock;
    protected final Object incomingLock;
    private final Object publishLock;
    private static Map<Integer, Message> unacknowledgedSentMessages = new ConcurrentHashMap<Integer, Message>();
    private boolean userSpecifiedSASTokenExpiredOnRetry = false;
    static final char MESSAGE_PROPERTY_SEPARATOR = '&';
    private static final String MESSAGE_SYSTEM_PROPERTY_IDENTIFIER_ENCODED = "%24";
    private static final char MESSAGE_SYSTEM_PROPERTY_IDENTIFIER_DECODED = '$';
    static final char MESSAGE_PROPERTY_KEY_VALUE_SEPARATOR = '=';
    private static final int PROPERTY_KEY_INDEX = 0;
    private static final int PROPERTY_VALUE_INDEX = 1;
    private static final String ABSOLUTE_EXPIRY_TIME = "$.exp";
    static final String CORRELATION_ID = "$.cid";
    static final String MESSAGE_ID = "$.mid";
    static final String TO = "$.to";
    static final String USER_ID = "$.uid";
    static final String OUTPUT_NAME = "$.on";
    static final String CONNECTION_DEVICE_ID = "$.cdid";
    static final String CONNECTION_MODULE_ID = "$.cmid";
    static final String CONTENT_TYPE = "$.ct";
    static final String CONTENT_ENCODING = "$.ce";
    private static final String IOTHUB_ACK = "iothub-ack";
    private static final String INPUTS_PATH_STRING = "inputs";
    private static final String MODULES_PATH_STRING = "modules";
    private IotHubListener listener;
    private String connectionId;

    public Mqtt(MqttConnection mqttConnection, IotHubListener listener, MqttMessageListener messageListener, String connectionId) throws IllegalArgumentException {
        if (mqttConnection == null) {
            throw new IllegalArgumentException("Mqtt connection info cannot be null");
        }
        this.mqttConnection = mqttConnection;
        this.allReceivedMessages = mqttConnection.getAllReceivedMessages();
        this.stateLock = mqttConnection.getMqttLock();
        this.incomingLock = new Object();
        this.publishLock = new Object();
        this.userSpecifiedSASTokenExpiredOnRetry = false;
        this.listener = listener;
        this.messageListener = messageListener;
        this.connectionId = connectionId;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void connect() throws TransportException {
        Object object = this.stateLock;
        synchronized (object) {
            try {
                if (!this.mqttConnection.getMqttAsyncClient().isConnected()) {
                    IMqttToken connectToken = this.mqttConnection.getMqttAsyncClient().connect(this.mqttConnection.getConnectionOptions());
                    connectToken.waitForCompletion(60000L);
                }
            }
            catch (MqttException e) {
                this.disconnect();
                throw PahoExceptionTranslator.convertToMqttException(e, "Unable to establish MQTT connection");
            }
        }
    }

    protected void disconnect() throws TransportException {
        try {
            IMqttToken disconnectToken;
            if (this.mqttConnection.isConnected() && (disconnectToken = this.mqttConnection.disconnect()) != null) {
                disconnectToken.waitForCompletion(60000L);
            }
            this.mqttConnection.close();
            this.mqttConnection.setMqttAsyncClient(null);
        }
        catch (MqttException e) {
            throw PahoExceptionTranslator.convertToMqttException(e, "Unable to disconnect");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void publish(String publishTopic, Message message) throws TransportException {
        Object object = this.publishLock;
        synchronized (object) {
            try {
                if (this.mqttConnection.getMqttAsyncClient() == null) {
                    TransportException transportException = new TransportException("Need to open first!");
                    transportException.setRetryable(true);
                    throw transportException;
                }
                if (this.userSpecifiedSASTokenExpiredOnRetry) {
                    throw new TransportException("Cannot publish when user supplied SAS token has expired");
                }
                if (!this.mqttConnection.getMqttAsyncClient().isConnected()) {
                    TransportException transportException = new TransportException("Cannot publish when mqtt client is disconnected");
                    transportException.setRetryable(true);
                    throw transportException;
                }
                if (message == null || publishTopic == null || publishTopic.length() == 0 || message.getBytes() == null) {
                    throw new IllegalArgumentException("Cannot publish on null or empty publish topic");
                }
                byte[] payload = message.getBytes();
                while (this.mqttConnection.getMqttAsyncClient().getPendingDeliveryTokens().length >= 10) {
                    Thread.sleep(10L);
                    if (this.mqttConnection.getMqttAsyncClient() == null) {
                        TransportException transportException = new TransportException("Connection was lost while waiting for mqtt deliveries to finish");
                        transportException.setRetryable(true);
                        throw transportException;
                    }
                    if (this.mqttConnection.getMqttAsyncClient().isConnected()) continue;
                    TransportException transportException = new TransportException("Cannot publish when mqtt client is holding 10 tokens and is disconnected");
                    transportException.setRetryable(true);
                    throw transportException;
                }
                MqttMessage mqttMessage = payload.length == 0 ? new MqttMessage() : new MqttMessage(payload);
                mqttMessage.setQos(1);
                IMqttDeliveryToken publishToken = this.mqttConnection.getMqttAsyncClient().publish(publishTopic, mqttMessage);
                unacknowledgedSentMessages.put(publishToken.getMessageId(), message);
            }
            catch (MqttException e) {
                throw PahoExceptionTranslator.convertToMqttException(e, "Unable to publish message on topic : " + publishTopic);
            }
            catch (InterruptedException e) {
                throw new TransportException("Interrupted, Unable to publish message on topic : " + publishTopic, e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void subscribe(String topic) throws TransportException {
        Object object = this.stateLock;
        synchronized (object) {
            try {
                if (topic == null) {
                    throw new IllegalArgumentException("Topic cannot be null");
                }
                if (this.userSpecifiedSASTokenExpiredOnRetry) {
                    throw new TransportException("Cannot subscribe when user supplied SAS token has expired");
                }
                if (!this.mqttConnection.getMqttAsyncClient().isConnected()) {
                    TransportException transportException = new TransportException("Cannot subscribe when mqtt client is disconnected");
                    transportException.setRetryable(true);
                    throw transportException;
                }
                IMqttToken subToken = this.mqttConnection.getMqttAsyncClient().subscribe(topic, 1);
                subToken.waitForCompletion(15000L);
            }
            catch (MqttException e) {
                throw PahoExceptionTranslator.convertToMqttException(e, "Unable to subscribe to topic :" + topic);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public IotHubTransportMessage receive() throws TransportException {
        Object object = this.incomingLock;
        synchronized (object) {
            if (this.mqttConnection == null) {
                throw new TransportException(new IllegalArgumentException("Mqtt client should be initialised at least once before using it"));
            }
            Pair<String, byte[]> messagePair = this.peekMessage();
            if (messagePair != null) {
                String topic = (String)messagePair.getKey();
                if (topic != null) {
                    byte[] data = (byte[])messagePair.getValue();
                    if (data != null) {
                        this.allReceivedMessages.poll();
                        return this.constructMessage(data, topic);
                    }
                    throw new TransportException("Data cannot be null when topic is non-null");
                }
                return null;
            }
            return null;
        }
    }

    public void connectionLost(Throwable throwable) {
        TransportException ex = null;
        try {
            if (this.mqttConnection != null) {
                this.disconnect();
            }
        }
        catch (TransportException e) {
            ex = e;
        }
        if (this.listener != null) {
            throwable = ex == null ? (throwable instanceof MqttException ? PahoExceptionTranslator.convertToMqttException((MqttException)throwable, "Mqtt connection lost") : new TransportException(throwable)) : ex;
            this.listener.onConnectionLost(throwable, this.connectionId);
        }
    }

    public void messageArrived(String topic, MqttMessage mqttMessage) {
        this.mqttConnection.getAllReceivedMessages().add((Pair<String, byte[]>)new MutablePair((Object)topic, (Object)mqttMessage.getPayload()));
        if (this.messageListener != null) {
            this.messageListener.onMessageArrived(mqttMessage.getId());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        Object object = this.publishLock;
        synchronized (object) {
            if (this.listener != null && unacknowledgedSentMessages.containsKey(iMqttDeliveryToken.getMessageId())) {
                DeviceOperations deviceOperation;
                Message deliveredMessage = unacknowledgedSentMessages.remove(iMqttDeliveryToken.getMessageId());
                if (deliveredMessage instanceof IotHubTransportMessage && ((deviceOperation = ((IotHubTransportMessage)deliveredMessage).getDeviceOperationType()) == DeviceOperations.DEVICE_OPERATION_TWIN_SUBSCRIBE_DESIRED_PROPERTIES_REQUEST || deviceOperation == DeviceOperations.DEVICE_OPERATION_METHOD_SUBSCRIBE_REQUEST || deviceOperation == DeviceOperations.DEVICE_OPERATION_TWIN_UNSUBSCRIBE_DESIRED_PROPERTIES_REQUEST)) {
                    return;
                }
                this.listener.onMessageSent(deliveredMessage, null);
            }
        }
    }

    public Pair<String, byte[]> peekMessage() {
        return this.allReceivedMessages.peek();
    }

    protected boolean sendMessageAcknowledgement(int messageId) throws TransportException {
        return this.mqttConnection.sendMessageAcknowledgement(messageId);
    }

    private IotHubTransportMessage constructMessage(byte[] data, String topic) {
        IotHubTransportMessage message = new IotHubTransportMessage(data, MessageType.DEVICE_TELEMETRY);
        int propertiesStringStartingIndex = topic.indexOf(MESSAGE_SYSTEM_PROPERTY_IDENTIFIER_ENCODED);
        if (propertiesStringStartingIndex != -1) {
            String propertiesString = topic.substring(propertiesStringStartingIndex);
            this.assignPropertiesToMessage(message, propertiesString);
            String routeString = topic.substring(0, propertiesStringStartingIndex);
            String[] routeComponents = routeString.split("/");
            if (routeComponents.length > 2 && routeComponents[2].equals(MODULES_PATH_STRING)) {
                message.setConnectionModuleId(routeComponents[3]);
            }
            if (routeComponents.length > 4 && routeComponents[4].equals(INPUTS_PATH_STRING)) {
                message.setInputName(routeComponents[5]);
            }
        }
        return message;
    }

    private void assignPropertiesToMessage(Message message, String propertiesString) throws IllegalStateException, IllegalArgumentException {
        for (String propertyString : propertiesString.split(String.valueOf('&'))) {
            if (propertyString.contains("=")) {
                String key = propertyString.split("=")[0];
                String value = propertyString.split("=")[1];
                try {
                    key = URLDecoder.decode(key, StandardCharsets.UTF_8.name());
                    value = URLDecoder.decode(value, StandardCharsets.UTF_8.name());
                }
                catch (UnsupportedEncodingException e) {
                    throw new IllegalStateException(e);
                }
                switch (key) {
                    case "$.to": {
                        break;
                    }
                    case "$.mid": {
                        message.setMessageId(value);
                        break;
                    }
                    case "iothub-ack": {
                        break;
                    }
                    case "$.cid": {
                        message.setCorrelationId(value);
                        break;
                    }
                    case "$.uid": {
                        break;
                    }
                    case "$.on": {
                        message.setOutputName(value);
                        break;
                    }
                    case "$.ce": {
                        message.setContentEncoding(value);
                        break;
                    }
                    case "$.ct": {
                        message.setContentType(value);
                        break;
                    }
                    case "$.exp": {
                        break;
                    }
                    default: {
                        message.setProperty(key, value);
                        break;
                    }
                }
                continue;
            }
            throw new IllegalArgumentException("Unexpected property string provided. Expected '=' symbol between key and value of the property in string: " + propertyString);
        }
    }
}

