/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.sdk.iot.device.transport;

import com.microsoft.azure.sdk.iot.device.CustomLogger;
import com.microsoft.azure.sdk.iot.device.DeviceClientConfig;
import com.microsoft.azure.sdk.iot.device.IotHubConnectionState;
import com.microsoft.azure.sdk.iot.device.IotHubConnectionStateCallback;
import com.microsoft.azure.sdk.iot.device.IotHubConnectionStatusChangeCallback;
import com.microsoft.azure.sdk.iot.device.IotHubConnectionStatusChangeReason;
import com.microsoft.azure.sdk.iot.device.IotHubEventCallback;
import com.microsoft.azure.sdk.iot.device.IotHubMessageResult;
import com.microsoft.azure.sdk.iot.device.IotHubStatusCode;
import com.microsoft.azure.sdk.iot.device.Message;
import com.microsoft.azure.sdk.iot.device.MessageCallback;
import com.microsoft.azure.sdk.iot.device.exceptions.DeviceClientException;
import com.microsoft.azure.sdk.iot.device.exceptions.DeviceOperationTimeoutException;
import com.microsoft.azure.sdk.iot.device.exceptions.IotHubServiceException;
import com.microsoft.azure.sdk.iot.device.exceptions.TransportException;
import com.microsoft.azure.sdk.iot.device.exceptions.UnauthorizedException;
import com.microsoft.azure.sdk.iot.device.transport.IotHubConnectionStatus;
import com.microsoft.azure.sdk.iot.device.transport.IotHubListener;
import com.microsoft.azure.sdk.iot.device.transport.IotHubTransportConnection;
import com.microsoft.azure.sdk.iot.device.transport.IotHubTransportMessage;
import com.microsoft.azure.sdk.iot.device.transport.IotHubTransportPacket;
import com.microsoft.azure.sdk.iot.device.transport.RetryDecision;
import com.microsoft.azure.sdk.iot.device.transport.RetryPolicy;
import com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsIotHubConnection;
import com.microsoft.azure.sdk.iot.device.transport.amqps.exceptions.AmqpConnectionThrottledException;
import com.microsoft.azure.sdk.iot.device.transport.amqps.exceptions.AmqpUnauthorizedAccessException;
import com.microsoft.azure.sdk.iot.device.transport.https.HttpsIotHubConnection;
import com.microsoft.azure.sdk.iot.device.transport.mqtt.MqttIotHubConnection;
import com.microsoft.azure.sdk.iot.device.transport.mqtt.exceptions.MqttUnauthorizedException;
import java.util.Collection;
import java.util.Date;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class IotHubTransport
implements IotHubListener {
    private static final int MAX_MESSAGES_TO_SEND_PER_THREAD = 10;
    private IotHubConnectionStatus connectionStatus;
    private IotHubTransportConnection iotHubTransportConnection;
    private final Queue<IotHubTransportPacket> waitingPacketsQueue = new ConcurrentLinkedQueue<IotHubTransportPacket>();
    private final Map<String, IotHubTransportPacket> inProgressPackets = new ConcurrentHashMap<String, IotHubTransportPacket>();
    private final Queue<IotHubTransportMessage> receivedMessagesQueue = new ConcurrentLinkedQueue<IotHubTransportMessage>();
    private final Queue<IotHubTransportPacket> callbackPacketsQueue = new ConcurrentLinkedQueue<IotHubTransportPacket>();
    private IotHubConnectionStateCallback stateCallback;
    private Object stateCallbackContext;
    private IotHubConnectionStatusChangeCallback connectionStatusChangeCallback;
    private Object connectionStatusChangeCallbackContext;
    private final Object inProgressMessagesLock = new Object();
    private DeviceClientConfig defaultConfig;
    private Queue<DeviceClientConfig> deviceClientConfigs;
    private int currentReconnectionAttempt;
    private long reconnectionAttemptStartTimeMillis;
    private ScheduledExecutorService taskScheduler;
    private final CustomLogger logger;
    private final Object reconnectionLock = new Object();

    public IotHubTransport(DeviceClientConfig defaultConfig) throws IllegalArgumentException {
        if (defaultConfig == null) {
            throw new IllegalArgumentException("Config cannot be null");
        }
        this.defaultConfig = defaultConfig;
        this.connectionStatus = IotHubConnectionStatus.DISCONNECTED;
        this.currentReconnectionAttempt = 0;
        this.logger = new CustomLogger(this.getClass());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onMessageSent(Message message, Throwable e) {
        IotHubTransportPacket packet;
        if (message == null) {
            this.logger.LogError("onMessageSent called with null message, method name is %s ", message.getMessageId(), this.logger.getMethodName());
        }
        Object object = this.inProgressMessagesLock;
        synchronized (object) {
            packet = this.inProgressPackets.remove(message.getMessageId());
        }
        if (packet != null) {
            if (e == null) {
                packet.setStatus(IotHubStatusCode.OK_EMPTY);
                this.addToCallbackQueue(packet);
            } else if (e instanceof TransportException) {
                this.handleMessageException(packet, (TransportException)e);
            } else {
                this.handleMessageException(packet, new TransportException(e));
            }
        } else {
            this.logger.LogError("Message with message id %s was delivered to IoTHub, but was never sent, method name is %s ", message.getMessageId(), this.logger.getMethodName());
        }
    }

    @Override
    public void onMessageReceived(IotHubTransportMessage message, Throwable e) {
        if (message != null && e != null) {
            this.logger.LogError("IllegalArgumentException encountered, method name is %s", this.logger.getMethodName());
            this.logger.LogError(new IllegalArgumentException("Cannot call onMessageReceived with non-null message and non-null throwable"));
        } else if (message != null) {
            this.logger.LogInfo("Message with hashcode %s is received from IotHub on %s, method name is onMessageReceived", message.hashCode(), new Date());
            this.receivedMessagesQueue.add(message);
        } else if (e != null) {
            this.logger.LogError("Exception encountered while receiving messages from service, method name is %s", this.logger.getMethodName());
            this.logger.LogError(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onConnectionLost(Throwable e, String connectionId) {
        Object object = this.reconnectionLock;
        synchronized (object) {
            if (!connectionId.equals(this.iotHubTransportConnection.getConnectionId())) {
                return;
            }
            if (this.connectionStatus != IotHubConnectionStatus.CONNECTED) {
                return;
            }
            if (e instanceof TransportException) {
                this.handleDisconnection((TransportException)e);
            } else {
                this.handleDisconnection(new TransportException(e));
            }
        }
    }

    @Override
    public void onConnectionEstablished(String connectionId) {
        if (connectionId.equals(this.iotHubTransportConnection.getConnectionId())) {
            this.logger.LogInfo("The connection to the IoT Hub has been established, method name is %s ", this.logger.getMethodName());
            this.updateStatus(IotHubConnectionStatus.CONNECTED, IotHubConnectionStatusChangeReason.CONNECTION_OK, null);
        }
    }

    public void open(Collection<DeviceClientConfig> deviceClientConfigs) throws DeviceClientException {
        if (deviceClientConfigs == null || deviceClientConfigs.isEmpty()) {
            throw new IllegalArgumentException("deviceClientConfigs cannot be null or empty");
        }
        if (this.connectionStatus == IotHubConnectionStatus.CONNECTED) {
            return;
        }
        if (this.connectionStatus == IotHubConnectionStatus.DISCONNECTED_RETRYING) {
            throw new TransportException("Open cannot be called while transport is reconnecting");
        }
        if (this.isSasTokenExpired()) {
            throw new SecurityException("Your sas token has expired");
        }
        this.deviceClientConfigs = new LinkedBlockingQueue<DeviceClientConfig>(deviceClientConfigs);
        this.defaultConfig = this.deviceClientConfigs.peek();
        this.taskScheduler = Executors.newScheduledThreadPool(1);
        this.openConnection();
    }

    public void close(IotHubConnectionStatusChangeReason reason, Throwable cause) throws DeviceClientException {
        if (reason == null) {
            throw new IllegalArgumentException("reason cannot be null");
        }
        this.cancelPendingPackets();
        this.invokeCallbacks();
        if (this.taskScheduler != null) {
            this.taskScheduler.shutdown();
        }
        if (this.iotHubTransportConnection != null) {
            this.iotHubTransportConnection.close(false);
        }
        this.updateStatus(IotHubConnectionStatus.DISCONNECTED, reason, cause);
    }

    public void addMessage(Message message, IotHubEventCallback callback, Object callbackContext) {
        if (this.connectionStatus == IotHubConnectionStatus.DISCONNECTED) {
            throw new IllegalStateException("Cannot add a message when the transport is closed.");
        }
        IotHubTransportPacket packet = new IotHubTransportPacket(message, callback, callbackContext, null, System.currentTimeMillis());
        this.waitingPacketsQueue.add(packet);
    }

    public void sendMessages() {
        if (this.connectionStatus == IotHubConnectionStatus.DISCONNECTED || this.connectionStatus == IotHubConnectionStatus.DISCONNECTED_RETRYING) {
            return;
        }
        int timeSlice = 10;
        while (this.connectionStatus == IotHubConnectionStatus.CONNECTED && timeSlice-- > 0) {
            IotHubTransportPacket packet = this.waitingPacketsQueue.poll();
            if (packet == null) continue;
            this.logger.LogInfo("Get the message from waiting message queue to be sent to IoT Hub, method name is %s ", this.logger.getMethodName());
            Message message = packet.getMessage();
            if (message == null || !this.isMessageValid(packet)) continue;
            this.sendPacket(packet);
        }
    }

    public void invokeCallbacks() {
        IotHubTransportPacket packet = this.callbackPacketsQueue.poll();
        while (packet != null) {
            IotHubStatusCode status = packet.getStatus();
            IotHubEventCallback callback = packet.getCallback();
            Object context = packet.getContext();
            this.logger.LogInfo("Invoking the callback function for sent message, IoT Hub responded to message with status %s, method name is %s ", status.name(), this.logger.getMethodName());
            callback.execute(status, context);
            packet = this.callbackPacketsQueue.poll();
        }
    }

    public void handleMessage() throws DeviceClientException {
        if (this.connectionStatus == IotHubConnectionStatus.CONNECTED) {
            IotHubTransportMessage receivedMessage;
            if (this.iotHubTransportConnection instanceof HttpsIotHubConnection) {
                this.addReceivedMessagesOverHttpToReceivedQueue();
            }
            if ((receivedMessage = this.receivedMessagesQueue.poll()) != null) {
                this.acknowledgeReceivedMessage(receivedMessage);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isEmpty() {
        Object object = this.inProgressMessagesLock;
        synchronized (object) {
            return this.waitingPacketsQueue.isEmpty() && this.inProgressPackets.size() == 0 && this.callbackPacketsQueue.isEmpty();
        }
    }

    public void registerConnectionStateCallback(IotHubConnectionStateCallback callback, Object callbackContext) {
        if (callback == null) {
            throw new IllegalArgumentException("Callback cannot be null");
        }
        this.stateCallback = callback;
        this.stateCallbackContext = callbackContext;
    }

    public void registerConnectionStatusChangeCallback(IotHubConnectionStatusChangeCallback callback, Object callbackContext) {
        if (callbackContext != null && callback == null) {
            throw new IllegalArgumentException("Callback cannot be null if callback context is null");
        }
        this.connectionStatusChangeCallback = callback;
        this.connectionStatusChangeCallbackContext = callbackContext;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cancelPendingPackets() {
        IotHubTransportPacket packet = this.waitingPacketsQueue.poll();
        while (packet != null) {
            packet.setStatus(IotHubStatusCode.MESSAGE_CANCELLED_ONCLOSE);
            this.addToCallbackQueue(packet);
            packet = this.waitingPacketsQueue.poll();
        }
        Object object = this.inProgressMessagesLock;
        synchronized (object) {
            for (Map.Entry<String, IotHubTransportPacket> packetEntry : this.inProgressPackets.entrySet()) {
                IotHubTransportPacket inProgressPacket = packetEntry.getValue();
                inProgressPacket.setStatus(IotHubStatusCode.MESSAGE_CANCELLED_ONCLOSE);
                this.addToCallbackQueue(inProgressPacket);
            }
            this.inProgressPackets.clear();
        }
    }

    private void acknowledgeReceivedMessage(IotHubTransportMessage receivedMessage) throws TransportException {
        MessageCallback messageCallback = receivedMessage.getMessageCallback();
        Object messageCallbackContext = receivedMessage.getMessageCallbackContext();
        if (messageCallback != null) {
            IotHubMessageResult result = messageCallback.execute(receivedMessage, messageCallbackContext);
            try {
                this.iotHubTransportConnection.sendMessageResult(receivedMessage, result);
            }
            catch (TransportException e) {
                this.receivedMessagesQueue.add(receivedMessage);
                throw e;
            }
        }
    }

    private void addReceivedMessagesOverHttpToReceivedQueue() throws TransportException {
        IotHubTransportMessage transportMessage = ((HttpsIotHubConnection)this.iotHubTransportConnection).receiveMessage();
        if (transportMessage != null) {
            this.logger.LogInfo("Message with hashcode %s is received from IotHub on %s, method name is addReceivedMessagesOverHttpToReceivedQueue", transportMessage.hashCode(), new Date());
            this.receivedMessagesQueue.add(transportMessage);
        }
    }

    private IotHubConnectionStatusChangeReason exceptionToStatusChangeReason(Throwable e) {
        if (e instanceof TransportException) {
            TransportException transportException = (TransportException)e;
            if (transportException.isRetryable()) {
                return IotHubConnectionStatusChangeReason.NO_NETWORK;
            }
            if (this.isSasTokenExpired()) {
                return IotHubConnectionStatusChangeReason.EXPIRED_SAS_TOKEN;
            }
            return IotHubConnectionStatusChangeReason.BAD_CREDENTIAL;
        }
        return IotHubConnectionStatusChangeReason.COMMUNICATION_ERROR;
    }

    private void openConnection() throws TransportException {
        switch (this.defaultConfig.getProtocol()) {
            case HTTPS: {
                this.iotHubTransportConnection = new HttpsIotHubConnection(this.defaultConfig);
                break;
            }
            case MQTT: 
            case MQTT_WS: {
                this.iotHubTransportConnection = new MqttIotHubConnection(this.defaultConfig);
                break;
            }
            case AMQPS: 
            case AMQPS_WS: {
                this.iotHubTransportConnection = new AmqpsIotHubConnection(this.defaultConfig);
                break;
            }
            default: {
                throw new TransportException("Protocol not supported");
            }
        }
        this.iotHubTransportConnection.setListener(this);
        this.iotHubTransportConnection.open(this.deviceClientConfigs);
        this.updateStatus(IotHubConnectionStatus.CONNECTED, IotHubConnectionStatusChangeReason.CONNECTION_OK, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleDisconnection(TransportException transportException) {
        this.logger.LogInfo("The messages in progress are buffered to be sent again due to a connection loss, method name is %s ", this.logger.getMethodName());
        Object object = this.inProgressMessagesLock;
        synchronized (object) {
            this.waitingPacketsQueue.addAll(this.inProgressPackets.values());
            this.inProgressPackets.clear();
        }
        this.updateStatus(IotHubConnectionStatus.DISCONNECTED_RETRYING, this.exceptionToStatusChangeReason(transportException), transportException);
        this.checkForUnauthorizedException(transportException);
        this.reconnect(transportException);
    }

    private void reconnect(TransportException transportException) {
        if (this.reconnectionAttemptStartTimeMillis == 0L) {
            this.reconnectionAttemptStartTimeMillis = System.currentTimeMillis();
        }
        boolean hasReconnectOperationTimedOut = this.hasOperationTimedOut(this.reconnectionAttemptStartTimeMillis);
        RetryDecision retryDecision = null;
        while (this.connectionStatus == IotHubConnectionStatus.DISCONNECTED_RETRYING && !hasReconnectOperationTimedOut && transportException != null && transportException.isRetryable()) {
            ++this.currentReconnectionAttempt;
            RetryPolicy retryPolicy = this.defaultConfig.getRetryPolicy();
            retryDecision = retryPolicy.getRetryDecision(this.currentReconnectionAttempt, transportException);
            if (!retryDecision.shouldRetry()) break;
            IotHubTransport.sleepUninterruptibly(retryDecision.getDuration(), TimeUnit.MILLISECONDS);
            hasReconnectOperationTimedOut = this.hasOperationTimedOut(this.reconnectionAttemptStartTimeMillis);
            transportException = this.singleReconnectAttempt();
        }
        try {
            if (retryDecision != null && !retryDecision.shouldRetry()) {
                this.close(IotHubConnectionStatusChangeReason.RETRY_EXPIRED, transportException);
            } else if (this.hasOperationTimedOut(this.reconnectionAttemptStartTimeMillis)) {
                this.close(IotHubConnectionStatusChangeReason.RETRY_EXPIRED, new DeviceOperationTimeoutException("Device operation for reconnection timed out"));
            } else if (!transportException.isRetryable()) {
                this.close(this.exceptionToStatusChangeReason(transportException), transportException);
            }
        }
        catch (DeviceClientException ex) {
            this.updateStatus(IotHubConnectionStatus.DISCONNECTED, IotHubConnectionStatusChangeReason.COMMUNICATION_ERROR, transportException);
        }
    }

    private TransportException singleReconnectAttempt() {
        try {
            this.iotHubTransportConnection.close(true);
            this.openConnection();
        }
        catch (TransportException newTransportException) {
            this.checkForUnauthorizedException(newTransportException);
            return newTransportException;
        }
        return null;
    }

    private void handleMessageException(IotHubTransportPacket packet, TransportException transportException) {
        IotHubStatusCode errorCode;
        RetryDecision retryDecision;
        packet.incrementRetryAttempt();
        if (!this.hasOperationTimedOut(packet.getStartTimeMillis()) && transportException.isRetryable() && (retryDecision = this.defaultConfig.getRetryPolicy().getRetryDecision(packet.getCurrentRetryAttempt(), transportException)).shouldRetry()) {
            this.taskScheduler.schedule(new MessageRetryRunnable(this.waitingPacketsQueue, packet), retryDecision.getDuration(), TimeUnit.MILLISECONDS);
            return;
        }
        IotHubStatusCode iotHubStatusCode = errorCode = transportException instanceof IotHubServiceException ? ((IotHubServiceException)transportException).getStatusCode() : IotHubStatusCode.ERROR;
        if (transportException instanceof AmqpConnectionThrottledException) {
            errorCode = IotHubStatusCode.THROTTLED;
        }
        packet.setStatus(errorCode);
        this.addToCallbackQueue(packet);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendPacket(IotHubTransportPacket packet) {
        Message message = packet.getMessage();
        boolean messageAckExpected = !(message instanceof IotHubTransportMessage) || ((IotHubTransportMessage)message).isMessageAckNeeded(this.defaultConfig.getProtocol());
        try {
            IotHubStatusCode statusCode;
            if (messageAckExpected) {
                Object object = this.inProgressMessagesLock;
                synchronized (object) {
                    this.inProgressPackets.put(message.getMessageId(), packet);
                }
            }
            if ((statusCode = this.iotHubTransportConnection.sendMessage(message)) != IotHubStatusCode.OK_EMPTY && statusCode != IotHubStatusCode.OK) {
                this.handleMessageException(this.inProgressPackets.remove(message.getMessageId()), IotHubStatusCode.getConnectionStatusException(statusCode, ""));
            } else if (!messageAckExpected) {
                packet.setStatus(statusCode);
                this.addToCallbackQueue(packet);
            }
        }
        catch (TransportException transportException) {
            IotHubTransportPacket outboundPacket;
            if (messageAckExpected) {
                Object object = this.inProgressMessagesLock;
                synchronized (object) {
                    outboundPacket = this.inProgressPackets.remove(message.getMessageId());
                }
            } else {
                outboundPacket = packet;
            }
            this.handleMessageException(outboundPacket, transportException);
        }
    }

    private boolean isMessageValid(IotHubTransportPacket packet) {
        Message message = packet.getMessage();
        if (message.isExpired()) {
            this.logger.LogInfo("Creating a callback for the expired message with MESSAGE_EXPIRED status, method name is %s ", this.logger.getMethodName());
            packet.setStatus(IotHubStatusCode.MESSAGE_EXPIRED);
            this.addToCallbackQueue(packet);
            return false;
        }
        if (this.isSasTokenExpired()) {
            this.logger.LogInfo("Creating a callback for the message with expired sas token with UNAUTHORIZED status, method name is %s ", this.logger.getMethodName());
            packet.setStatus(IotHubStatusCode.UNAUTHORIZED);
            this.addToCallbackQueue(packet);
            this.updateStatus(IotHubConnectionStatus.DISCONNECTED, IotHubConnectionStatusChangeReason.EXPIRED_SAS_TOKEN, new SecurityException("Your sas token has expired"));
            return false;
        }
        return true;
    }

    private void updateStatus(IotHubConnectionStatus newConnectionStatus, IotHubConnectionStatusChangeReason reason, Throwable throwable) {
        if (this.connectionStatus != newConnectionStatus) {
            this.connectionStatus = newConnectionStatus;
            this.invokeConnectionStateCallback(newConnectionStatus, reason);
            this.invokeConnectionStatusChangeCallback(newConnectionStatus, reason, throwable);
            if (newConnectionStatus == IotHubConnectionStatus.CONNECTED) {
                this.currentReconnectionAttempt = 0;
                this.reconnectionAttemptStartTimeMillis = 0L;
            }
        }
    }

    private void invokeConnectionStateCallback(IotHubConnectionStatus status, IotHubConnectionStatusChangeReason reason) {
        if (this.stateCallback != null) {
            if (status == IotHubConnectionStatus.CONNECTED) {
                this.stateCallback.execute(IotHubConnectionState.CONNECTION_SUCCESS, this.stateCallbackContext);
            } else if (reason == IotHubConnectionStatusChangeReason.EXPIRED_SAS_TOKEN) {
                this.stateCallback.execute(IotHubConnectionState.SAS_TOKEN_EXPIRED, this.stateCallbackContext);
            } else if (status == IotHubConnectionStatus.DISCONNECTED) {
                this.stateCallback.execute(IotHubConnectionState.CONNECTION_DROP, this.stateCallbackContext);
            }
        }
    }

    private void invokeConnectionStatusChangeCallback(IotHubConnectionStatus status, IotHubConnectionStatusChangeReason reason, Throwable e) {
        if (this.connectionStatusChangeCallback != null) {
            this.connectionStatusChangeCallback.execute(status, reason, e, this.connectionStatusChangeCallbackContext);
        }
    }

    private boolean isSasTokenExpired() {
        return this.defaultConfig.getAuthenticationType() == DeviceClientConfig.AuthType.SAS_TOKEN && this.defaultConfig.getSasTokenAuthentication().isRenewalNecessary();
    }

    private boolean hasOperationTimedOut(long startTime) {
        if (startTime == 0L) {
            return false;
        }
        return System.currentTimeMillis() - startTime > this.defaultConfig.getOperationTimeout();
    }

    private void addToCallbackQueue(IotHubTransportPacket packet) {
        if (packet.getCallback() != null) {
            this.callbackPacketsQueue.add(packet);
        }
    }

    private static void sleepUninterruptibly(long sleepFor, TimeUnit unit) {
        boolean interrupted = false;
        try {
            long remainingNanos = unit.toNanos(sleepFor);
            long end = System.nanoTime() + remainingNanos;
            while (true) {
                try {
                    TimeUnit.NANOSECONDS.sleep(remainingNanos);
                    return;
                }
                catch (InterruptedException e) {
                    interrupted = true;
                    remainingNanos = end - System.nanoTime();
                    continue;
                }
                break;
            }
        }
        finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private void checkForUnauthorizedException(TransportException transportException) {
        if (!this.isSasTokenExpired() && (transportException instanceof MqttUnauthorizedException || transportException instanceof UnauthorizedException || transportException instanceof AmqpUnauthorizedAccessException)) {
            transportException.setRetryable(true);
        }
    }

    public class MessageRetryRunnable
    implements Runnable {
        final IotHubTransportPacket transportPacket;
        final Queue<IotHubTransportPacket> waitingPacketsQueue;

        public MessageRetryRunnable(Queue<IotHubTransportPacket> waitingPacketsQueue, IotHubTransportPacket transportPacket) {
            this.waitingPacketsQueue = waitingPacketsQueue;
            this.transportPacket = transportPacket;
        }

        @Override
        public void run() {
            this.waitingPacketsQueue.add(this.transportPacket);
        }
    }
}

