/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.sdk.iot.device.transport.amqps;

import com.microsoft.azure.sdk.iot.device.CustomLogger;
import com.microsoft.azure.sdk.iot.device.DeviceClientConfig;
import com.microsoft.azure.sdk.iot.device.IotHubConnectionState;
import com.microsoft.azure.sdk.iot.device.IotHubConnectionStateCallback;
import com.microsoft.azure.sdk.iot.device.IotHubEventCallback;
import com.microsoft.azure.sdk.iot.device.IotHubMessageResult;
import com.microsoft.azure.sdk.iot.device.IotHubResponseCallback;
import com.microsoft.azure.sdk.iot.device.IotHubStatusCode;
import com.microsoft.azure.sdk.iot.device.Message;
import com.microsoft.azure.sdk.iot.device.MessageCallback;
import com.microsoft.azure.sdk.iot.device.MessageProperty;
import com.microsoft.azure.sdk.iot.device.transport.IotHubCallbackPacket;
import com.microsoft.azure.sdk.iot.device.transport.IotHubOutboundPacket;
import com.microsoft.azure.sdk.iot.device.transport.IotHubTransport;
import com.microsoft.azure.sdk.iot.device.transport.State;
import com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsIotHubConnection;
import com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsMessage;
import com.microsoft.azure.sdk.iot.device.transport.amqps.ServerListener;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.message.impl.MessageImpl;

public final class AmqpsTransport
implements IotHubTransport,
ServerListener {
    private State state;
    private AmqpsIotHubConnection connection;
    private final Queue<IotHubOutboundPacket> waitingMessages = new LinkedBlockingDeque<IotHubOutboundPacket>();
    private final Map<Integer, IotHubOutboundPacket> inProgressMessages = new ConcurrentHashMap<Integer, IotHubOutboundPacket>();
    private final Queue<AmqpsMessage> receivedMessages = new LinkedBlockingQueue<AmqpsMessage>();
    private final Queue<IotHubCallbackPacket> callbackList = new LinkedBlockingDeque<IotHubCallbackPacket>();
    private IotHubConnectionStateCallback stateCallback;
    private Object stateCallbackContext;
    private final DeviceClientConfig config;
    private final Boolean useWebSockets;
    private final CustomLogger logger;
    private static final String TO_KEY = "to";
    private static final String USER_ID_KEY = "userId";
    public static final String AMQPS_APP_PROPERTY_PREFIX = "iothub-app-";

    public AmqpsTransport(DeviceClientConfig config, Boolean useWebSockets) {
        this.config = config;
        this.useWebSockets = useWebSockets;
        this.state = State.CLOSED;
        this.logger = new CustomLogger(this.getClass());
    }

    @Override
    public synchronized void open() throws IOException {
        if (this.state == State.OPEN) {
            return;
        }
        this.logger.LogInfo("Opening the connection..., method name is %s ", this.logger.getMethodName());
        this.connection = new AmqpsIotHubConnection(this.config, this.useWebSockets);
        try {
            this.connection.addListener(this);
            this.connection.open();
        }
        catch (Exception e) {
            this.logger.LogError(e);
            throw new IOException(e);
        }
        this.state = State.OPEN;
        this.logger.LogInfo("Connection has been opened, method name is %s ", this.logger.getMethodName());
    }

    @Override
    public synchronized void close() throws IOException {
        if (this.state == State.CLOSED) {
            this.logger.LogInfo("The connection is already in closed state, method name is %s ", this.logger.getMethodName());
            return;
        }
        while (!this.waitingMessages.isEmpty()) {
            IotHubOutboundPacket packet = this.waitingMessages.remove();
            Message message = packet.getMessage();
            if (message == null || message.getBytes().length <= 0) continue;
            IotHubCallbackPacket callbackPacket = new IotHubCallbackPacket(IotHubStatusCode.MESSAGE_CANCELLED_ONCLOSE, packet.getCallback(), packet.getContext());
            this.callbackList.add(callbackPacket);
        }
        for (Map.Entry<Integer, IotHubOutboundPacket> entry : this.inProgressMessages.entrySet()) {
            IotHubOutboundPacket packet = entry.getValue();
            IotHubCallbackPacket callbackPacket = new IotHubCallbackPacket(IotHubStatusCode.MESSAGE_CANCELLED_ONCLOSE, packet.getCallback(), packet.getContext());
            this.callbackList.add(callbackPacket);
        }
        this.invokeCallbacks();
        this.inProgressMessages.clear();
        this.logger.LogInfo("Starting to close the connection..., method name is %s ", this.logger.getMethodName());
        this.connection.close();
        this.state = State.CLOSED;
        this.logger.LogInfo("Connection has been closed, method name is %s ", this.logger.getMethodName());
    }

    @Override
    public void addMessage(Message message, IotHubEventCallback callback, Object callbackContext) throws IllegalStateException {
        if (this.state == State.CLOSED) {
            this.logger.LogError("Cannot add a message when the AMQPS transport is closed, method name is %s ", this.logger.getMethodName());
            throw new IllegalStateException("Cannot add a message when the AMQPS transport is closed.");
        }
        IotHubOutboundPacket packet = new IotHubOutboundPacket(message, callback, callbackContext);
        this.waitingMessages.add(packet);
    }

    @Override
    public void addMessage(Message message, IotHubResponseCallback callback, Object callbackContext) throws UnsupportedOperationException {
        throw new UnsupportedOperationException("AMQP do not support callback with message response");
    }

    @Override
    public void sendMessages() throws IOException, IllegalStateException {
        if (this.state == State.CLOSED) {
            this.logger.LogError("Cannot send messages when the AMQPS transport is closed, method name is %s ", this.logger.getMethodName());
            throw new IllegalStateException("Cannot send messages when the AMQPS transport is closed.");
        }
        if (this.waitingMessages.size() <= 0) {
            return;
        }
        ArrayList<IotHubOutboundPacket> failedMessages = new ArrayList<IotHubOutboundPacket>();
        while (!this.waitingMessages.isEmpty()) {
            this.logger.LogInfo("Get the message from waiting message queue to be sent to IoT Hub, method name is %s ", this.logger.getMethodName());
            IotHubOutboundPacket packet = this.waitingMessages.remove();
            Message message = packet.getMessage();
            if (message == null || message.getBytes().length <= 0) continue;
            if (message.isExpired()) {
                this.logger.LogInfo("Creating a callback for the expired message with MESSAGE_EXPIRED status, method name is %s ", this.logger.getMethodName());
                IotHubCallbackPacket callbackPacket = new IotHubCallbackPacket(IotHubStatusCode.MESSAGE_EXPIRED, packet.getCallback(), packet.getContext());
                this.callbackList.add(callbackPacket);
                continue;
            }
            this.logger.LogInfo("Converting the IoT Hub message into AmqpsMessage, method name is %s ", this.logger.getMethodName());
            MessageImpl protonMessage = this.iotHubMessageToProtonMessage(message);
            Integer sendHash = this.connection.sendMessage((org.apache.qpid.proton.message.Message)protonMessage);
            if (sendHash != -1) {
                this.inProgressMessages.put(sendHash, packet);
                continue;
            }
            failedMessages.add(packet);
        }
        this.waitingMessages.addAll(failedMessages);
    }

    @Override
    public void invokeCallbacks() throws IllegalStateException {
        if (this.state == State.CLOSED) {
            this.logger.LogError("Cannot invoke callbacks when AMQPS transport is closed, method name is %s ", this.logger.getMethodName());
            throw new IllegalStateException("Cannot invoke callbacks when AMQPS transport is closed.");
        }
        while (!this.callbackList.isEmpty()) {
            IotHubCallbackPacket packet = this.callbackList.remove();
            IotHubStatusCode status = packet.getStatus();
            IotHubEventCallback callback = packet.getCallback();
            Object context = packet.getContext();
            this.logger.LogInfo("Invoking the callback function for sent message, IoT Hub responded to message with status %s, method name is %s ", status.name(), this.logger.getMethodName());
            callback.execute(status, context);
        }
    }

    @Override
    public void handleMessage() throws IllegalStateException {
        if (this.state == State.CLOSED) {
            this.logger.LogError("Cannot handle messages when AMQPS transport is closed, method name is %s ", this.logger.getMethodName());
            throw new IllegalStateException("Cannot handle messages when AMQPS transport is closed.");
        }
        this.logger.LogInfo("Get the callback function for the received message, method name is %s ", this.logger.getMethodName());
        MessageCallback callback = this.config.getMessageCallback();
        if (callback == null) {
            this.logger.LogError("Callback is not defined therefore response to IoT Hub cannot be generated. All received messages will be removed from receive message queue, method name is %s ", this.logger.getMethodName());
            this.receivedMessages.clear();
            return;
        }
        if (this.receivedMessages.size() > 0) {
            this.logger.LogInfo("Consuming a message received from IoT Hub using receive message queue, method name is %s ", this.logger.getMethodName());
            AmqpsMessage receivedMessage = this.receivedMessages.remove();
            this.logger.LogInfo("Converting the AmqpsMessage to IoT Hub message, method name is %s ", this.logger.getMethodName());
            Message message = this.protonMessageToIoTHubMessage(receivedMessage);
            this.logger.LogInfo("Executing the callback function for received message, method name is %s ", this.logger.getMethodName());
            IotHubMessageResult result = callback.execute(message, this.config.getMessageContext());
            Boolean ackResult = this.connection.sendMessageResult(receivedMessage, result);
            if (!ackResult.booleanValue()) {
                this.logger.LogWarn("Callback did not return a response for IoT Hub. Message has been added in the queue to be processed again, method name is %s", this.logger.getMethodName());
                this.receivedMessages.add(receivedMessage);
            }
        }
    }

    @Override
    public void messageSent(Integer messageHash, Boolean deliveryState) {
        if (this.inProgressMessages.containsKey(messageHash)) {
            IotHubOutboundPacket packet = this.inProgressMessages.remove(messageHash);
            if (deliveryState.booleanValue()) {
                this.logger.LogInfo("Message with messageid %s has been successfully delivered to IoTHub, adding a callback to callbacklist with IotHubStatusCode.OK_EMPTY, method name is %s ", packet.getMessage().getMessageId(), this.logger.getMethodName());
                IotHubCallbackPacket callbackPacket = new IotHubCallbackPacket(IotHubStatusCode.OK_EMPTY, packet.getCallback(), packet.getContext());
                this.callbackList.add(callbackPacket);
            } else {
                this.logger.LogInfo("Message with messageid %s was not delivered to IoTHub, it is buffered to be sent again, method name is %s ", packet.getMessage().getMessageId(), this.logger.getMethodName());
                this.waitingMessages.add(packet);
            }
        }
    }

    @Override
    public void connectionLost() {
        this.logger.LogInfo("The messages in progress are buffered to be sent again due to a connection loss, method name is %s ", this.logger.getMethodName());
        for (Map.Entry<Integer, IotHubOutboundPacket> entry : this.inProgressMessages.entrySet()) {
            this.waitingMessages.add(entry.getValue());
        }
        this.inProgressMessages.clear();
        if (this.stateCallback != null) {
            this.stateCallback.execute(IotHubConnectionState.CONNECTION_DROP, this.stateCallbackContext);
        }
    }

    @Override
    public void connectionEstablished() {
        this.logger.LogInfo("The connection to the IoT Hub has been established, method name is %s ", this.logger.getMethodName());
        if (this.stateCallback != null) {
            this.stateCallback.execute(IotHubConnectionState.CONNECTION_SUCCESS, this.stateCallbackContext);
        }
    }

    @Override
    public void messageReceived(AmqpsMessage message) {
        this.logger.LogInfo("Message with hashcode %s is received from IotHub on %s, method name is %s ", ((Object)((Object)message)).hashCode(), new Date(), this.logger.getMethodName());
        this.receivedMessages.add(message);
    }

    @Override
    public boolean isEmpty() {
        return this.waitingMessages.isEmpty() && this.inProgressMessages.size() == 0 && this.callbackList.isEmpty();
    }

    @Override
    public void registerConnectionStateCallback(IotHubConnectionStateCallback callback, Object callbackContext) {
        this.stateCallback = callback;
        this.stateCallbackContext = callbackContext;
    }

    private Message protonMessageToIoTHubMessage(MessageImpl protonMsg) {
        this.logger.LogInfo("Started converting AmpqsMessage into IoT Hub message, method name is %s ", this.logger.getMethodName());
        Data d = (Data)protonMsg.getBody();
        Binary b = d.getValue();
        byte[] msgBody = new byte[b.getLength()];
        ByteBuffer buffer = b.asByteBuffer();
        buffer.get(msgBody);
        Message msg = new Message(msgBody);
        this.logger.LogInfo("Content of received message is %s, method name is %s ", new String(msg.getBytes(), Message.DEFAULT_IOTHUB_MESSAGE_CHARSET), this.logger.getMethodName());
        Properties properties = protonMsg.getProperties();
        if (properties.getCorrelationId() != null) {
            msg.setCorrelationId(properties.getCorrelationId().toString());
        }
        if (properties.getMessageId() != null) {
            msg.setMessageId(properties.getMessageId().toString());
        }
        if (properties.getTo() != null) {
            msg.setProperty("iothub-app-to", properties.getTo());
        }
        if (properties.getUserId() != null) {
            msg.setProperty("iothub-app-userId", properties.getUserId().toString());
        }
        if (protonMsg.getApplicationProperties() != null) {
            Map applicationProperties = protonMsg.getApplicationProperties().getValue();
            for (Map.Entry entry : applicationProperties.entrySet()) {
                String propertyKey = (String)entry.getKey();
                if (MessageProperty.RESERVED_PROPERTY_NAMES.contains(propertyKey)) continue;
                msg.setProperty((String)entry.getKey(), (String)entry.getValue());
            }
        }
        this.logger.LogInfo("Completed the conversion of AmpqsMessage into IoT Hub message, method name is %s ", this.logger.getMethodName());
        return msg;
    }

    private MessageImpl iotHubMessageToProtonMessage(Message message) {
        this.logger.LogInfo("Started converting IoT Hub message into AmpqsMessage, method name is %s ", this.logger.getMethodName());
        MessageImpl outgoingMessage = (MessageImpl)Proton.message();
        this.logger.LogInfo("Content of message is %s, method name is %s ", new String(message.getBytes(), Message.DEFAULT_IOTHUB_MESSAGE_CHARSET), this.logger.getMethodName());
        Properties properties = new Properties();
        if (message.getMessageId() != null) {
            properties.setMessageId((Object)message.getMessageId());
        }
        outgoingMessage.setProperties(properties);
        if (message.getProperties().length > 0) {
            HashMap<String, String> userProperties = new HashMap<String, String>(message.getProperties().length);
            for (MessageProperty messageProperty : message.getProperties()) {
                if (MessageProperty.RESERVED_PROPERTY_NAMES.contains(messageProperty.getName())) continue;
                userProperties.put(messageProperty.getName(), messageProperty.getValue());
            }
            ApplicationProperties applicationProperties = new ApplicationProperties(userProperties);
            outgoingMessage.setApplicationProperties(applicationProperties);
        }
        Binary binary = new Binary(message.getBytes());
        Data section = new Data(binary);
        outgoingMessage.setBody((Section)section);
        this.logger.LogInfo("Started converting IoT Hub message into AmpqsMessage, method name is %s ", this.logger.getMethodName());
        return outgoingMessage;
    }
}

