/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.sdk.iot.device.transport.amqps;

import com.microsoft.azure.sdk.iot.deps.ws.impl.WebSocketImpl;
import com.microsoft.azure.sdk.iot.device.CustomLogger;
import com.microsoft.azure.sdk.iot.device.DeviceClientConfig;
import com.microsoft.azure.sdk.iot.device.IotHubMessageResult;
import com.microsoft.azure.sdk.iot.device.ObjectLock;
import com.microsoft.azure.sdk.iot.device.auth.IotHubSasToken;
import com.microsoft.azure.sdk.iot.device.transport.State;
import com.microsoft.azure.sdk.iot.device.transport.TransportUtils;
import com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsMessage;
import com.microsoft.azure.sdk.iot.device.transport.amqps.IotHubReactor;
import com.microsoft.azure.sdk.iot.device.transport.amqps.ServerListener;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.SslDomain;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.TransportInternal;
import org.apache.qpid.proton.engine.impl.TransportLayer;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.reactor.FlowController;
import org.apache.qpid.proton.reactor.Handshaker;
import org.apache.qpid.proton.reactor.Reactor;

public final class AmqpsIotHubConnection
extends BaseHandler {
    private static final int MAX_WAIT_TO_OPEN_CLOSE_CONNECTION = 60000;
    private static final int MAX_WAIT_TO_TERMINATE_EXECUTOR = 30;
    private State state;
    private static final String SENDER_TAG = "sender";
    private static final String RECEIVE_TAG = "receiver";
    private static final String SEND_ENDPOINT_FORMAT = "/devices/%s/messages/events";
    private final String sendEndpoint;
    private static final String RECEIVE_ENDPOINT_FORMAT = "/devices/%s/messages/devicebound";
    private final String receiveEndpoint;
    private int linkCredit = -1;
    private long nextTag = 0L;
    private static final String VERSION_IDENTIFIER_KEY = "com.microsoft:client-version";
    private static final String WEB_SOCKET_PATH = "/$iothub/websocket";
    private static final String WEB_SOCKET_SUB_PROTOCOL = "AMQPWSB10";
    private static final int AMQP_PORT = 5671;
    private static final int AMQP_WEB_SOCKET_PORT = 443;
    private String sasToken;
    private Sender sender;
    private Receiver receiver;
    private Connection connection;
    private Session session;
    private String hostName;
    private String userName;
    private final Boolean useWebSockets;
    private DeviceClientConfig config;
    private final List<ServerListener> listeners = new ArrayList<ServerListener>();
    private ExecutorService executorService;
    private final ObjectLock openLock = new ObjectLock();
    private final ObjectLock closeLock = new ObjectLock();
    private Reactor reactor;
    private Boolean reconnectCall = false;
    private int currentReconnectionAttempt = 1;
    private CustomLogger logger;

    public AmqpsIotHubConnection(DeviceClientConfig config, Boolean useWebSockets) throws IOException {
        if (config == null) {
            throw new IllegalArgumentException("The DeviceClientConfig cannot be null.");
        }
        if (config.getIotHubHostname() == null || config.getIotHubHostname().length() == 0) {
            throw new IllegalArgumentException("hostName cannot be null or empty.");
        }
        if (config.getDeviceId() == null || config.getDeviceId().length() == 0) {
            throw new IllegalArgumentException("deviceID cannot be null or empty.");
        }
        if (config.getIotHubName() == null || config.getIotHubName().length() == 0) {
            throw new IllegalArgumentException("hubName cannot be null or empty.");
        }
        if (!(config.getDeviceKey() != null && config.getDeviceKey().length() != 0 || config.getSharedAccessToken() != null && config.getSharedAccessToken().length() != 0)) {
            throw new IllegalArgumentException("Both deviceKey and shared access signature cannot be null or empty.");
        }
        this.config = config;
        String deviceId = this.config.getDeviceId();
        String iotHubName = this.config.getIotHubName();
        this.userName = deviceId + "@sas." + iotHubName;
        this.useWebSockets = useWebSockets;
        this.hostName = useWebSockets != false ? String.format("%s:%d", this.config.getIotHubHostname(), 443) : String.format("%s:%d", this.config.getIotHubHostname(), 5671);
        this.sendEndpoint = String.format(SEND_ENDPOINT_FORMAT, deviceId);
        this.receiveEndpoint = String.format(RECEIVE_ENDPOINT_FORMAT, deviceId);
        this.logger = new CustomLogger(((Object)((Object)this)).getClass());
        this.add((Handler)new Handshaker());
        this.add((Handler)new FlowController());
        this.state = State.CLOSED;
        try {
            this.reactor = Proton.reactor((Handler[])new Handler[]{this});
        }
        catch (IOException e) {
            this.logger.LogError(e);
            throw new IOException("Could not create Proton reactor");
        }
        this.logger.LogInfo("AmqpsIotHubConnection object is created successfully using port %s in %s method ", useWebSockets != false ? 443 : 5671, this.logger.getMethodName());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void open() throws IOException {
        if (this.state == State.CLOSED) {
            try {
                this.openAsync();
            }
            catch (Exception e) {
                this.logger.LogError(e);
                this.close();
                throw new IOException("Error opening Amqp connection: ", e);
            }
            try {
                ObjectLock e = this.openLock;
                synchronized (e) {
                    this.openLock.waitLock(60000L);
                }
            }
            catch (InterruptedException e) {
                this.logger.LogError(e);
                throw new IOException("Waited too long for the connection to open.");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws IOException {
        this.closeAsync();
        try {
            ObjectLock objectLock = this.closeLock;
            synchronized (objectLock) {
                this.closeLock.waitLock(60000L);
            }
        }
        catch (InterruptedException e) {
            this.logger.LogError(e);
            throw new IOException("Waited too long for the connection to close.");
        }
        if (this.executorService != null) {
            this.logger.LogInfo("Shutdown of executor service has started, method name is %s ", this.logger.getMethodName());
            this.executorService.shutdown();
            try {
                if (!this.executorService.awaitTermination(30L, TimeUnit.SECONDS)) {
                    this.executorService.shutdownNow();
                    if (!this.executorService.awaitTermination(30L, TimeUnit.SECONDS)) {
                        this.logger.LogInfo("Pool did not terminate", new Object[0]);
                    }
                }
            }
            catch (InterruptedException ie) {
                this.logger.LogError(ie);
                this.executorService.shutdownNow();
            }
            this.logger.LogInfo("Shutdown of executor service completed, method name is %s ", this.logger.getMethodName());
        }
    }

    private void openAsync() throws IOException {
        this.sasToken = new IotHubSasToken(this.config, System.currentTimeMillis() / 1000L + this.config.getTokenValidSecs() + 1L).toString();
        this.logger.LogInfo("SAS Token is created successfully, method name is %s ", this.logger.getMethodName());
        if (this.reactor == null) {
            this.reactor = Proton.reactor((Handler[])new Handler[]{this});
        }
        if (this.executorService == null) {
            this.executorService = Executors.newFixedThreadPool(1);
        }
        IotHubReactor iotHubReactor = new IotHubReactor(this.reactor);
        ReactorRunner reactorRunner = new ReactorRunner(iotHubReactor);
        this.executorService.submit(reactorRunner);
        this.logger.LogInfo("Reactor is assigned to executor service, method name is %s ", this.logger.getMethodName());
    }

    private void closeAsync() {
        this.state = State.CLOSED;
        if (this.sender != null) {
            this.sender.close();
        }
        if (this.receiver != null) {
            this.receiver.close();
        }
        if (this.session != null) {
            this.session.close();
        }
        if (this.connection != null) {
            this.connection.close();
        }
        this.reactor.stop();
        this.logger.LogInfo("Proton reactor has been stopped, method name is %s ", this.logger.getMethodName());
    }

    public Integer sendMessage(Message message) {
        Integer deliveryHash;
        if (this.state == State.CLOSED || this.linkCredit <= 0) {
            deliveryHash = -1;
        } else {
            int length;
            byte[] msgData = new byte[1024];
            this.logger.LogInfo("Started encoding of message - entering in while loop, method name is %s ", this.logger.getMethodName());
            while (true) {
                try {
                    length = message.encode(msgData, 0, msgData.length);
                    this.logger.LogInfo("Completed encoding of message, length is %s - breaking the while loop to come out, method name is %s ", length, this.logger.getMethodName());
                }
                catch (BufferOverflowException e) {
                    this.logger.LogError(e);
                    msgData = new byte[msgData.length * 2];
                    continue;
                }
                break;
            }
            byte[] tag = String.valueOf(this.nextTag++).getBytes();
            Delivery dlv = this.sender.delivery(tag);
            try {
                this.logger.LogInfo("Attempting to send the message using the sender link, method name is %s ", this.logger.getMethodName());
                this.sender.send(msgData, 0, length);
                this.logger.LogInfo("Advancing the sender link, method name is %s ", this.logger.getMethodName());
                this.sender.advance();
                deliveryHash = dlv.hashCode();
                this.logger.LogInfo("Delivery hash returned by the sender link %s, method name is %s ", deliveryHash, this.logger.getMethodName());
            }
            catch (Exception e) {
                this.sender.advance();
                dlv.free();
                deliveryHash = -1;
            }
        }
        return deliveryHash;
    }

    public Boolean sendMessageResult(AmqpsMessage message, IotHubMessageResult result) {
        Boolean ackResult = false;
        if (this.state != State.CLOSED) {
            try {
                this.logger.LogInfo("Acknowledgement for received message is %s, method name is %s ", result.name(), this.logger.getMethodName());
                switch (result) {
                    case COMPLETE: {
                        message.acknowledge(AmqpsMessage.ACK_TYPE.COMPLETE);
                        break;
                    }
                    case REJECT: {
                        message.acknowledge(AmqpsMessage.ACK_TYPE.REJECT);
                        break;
                    }
                    case ABANDON: {
                        message.acknowledge(AmqpsMessage.ACK_TYPE.ABANDON);
                        break;
                    }
                    default: {
                        this.logger.LogError("Invalid IoT Hub message result (%s), method name is %s ", result.name(), this.logger.getMethodName());
                        throw new IllegalStateException("Invalid IoT Hub message result.");
                    }
                }
                ackResult = true;
            }
            catch (Exception e) {
                this.logger.LogError(e);
            }
        }
        return ackResult;
    }

    public void onConnectionInit(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        this.connection = event.getConnection();
        this.connection.setHostname(this.hostName);
        this.session = this.connection.session();
        this.receiver = this.session.receiver(RECEIVE_TAG);
        this.sender = this.session.sender(SENDER_TAG);
        HashMap<Symbol, String> properties = new HashMap<Symbol, String>();
        properties.put(Symbol.getSymbol((String)VERSION_IDENTIFIER_KEY), "com.microsoft.azure.sdk.iot.iot-device-client/1.3.30");
        this.receiver.setProperties(properties);
        this.sender.setProperties(properties);
        this.connection.open();
        this.session.open();
        this.receiver.open();
        this.sender.open();
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onConnectionBound(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        Transport transport = event.getConnection().getTransport();
        if (transport != null) {
            if (this.useWebSockets.booleanValue()) {
                WebSocketImpl webSocket = new WebSocketImpl();
                webSocket.configure(this.hostName, WEB_SOCKET_PATH, 0, WEB_SOCKET_SUB_PROTOCOL, null, null);
                ((TransportInternal)transport).addTransportLayer((TransportLayer)webSocket);
            }
            Sasl sasl = transport.sasl();
            sasl.plain(this.userName, this.sasToken);
            SslDomain domain = this.makeDomain();
            transport.ssl(domain);
        }
        ObjectLock objectLock = this.openLock;
        synchronized (objectLock) {
            this.openLock.notifyLock();
        }
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void onConnectionUnbound(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        this.state = State.CLOSED;
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void onReactorInit(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        if (this.useWebSockets.booleanValue()) {
            event.getReactor().connectionToHost(this.config.getIotHubHostname(), 443, (Handler)this);
        } else {
            event.getReactor().connectionToHost(this.config.getIotHubHostname(), 5671, (Handler)this);
        }
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onReactorFinal(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        ObjectLock objectLock = this.closeLock;
        synchronized (objectLock) {
            this.closeLock.notifyLock();
        }
        this.reactor = null;
        if (this.reconnectCall.booleanValue()) {
            this.reconnectCall = false;
            try {
                this.openAsync();
            }
            catch (IOException e) {
                this.logger.LogError(e);
                e.printStackTrace();
            }
        }
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void onDelivery(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        if (event.getLink().getName().equals(RECEIVE_TAG)) {
            this.logger.LogInfo("Reading the receiver link, method name is %s ", this.logger.getMethodName());
            Receiver receiveLink = (Receiver)event.getLink();
            Delivery delivery = receiveLink.current();
            if (delivery.isReadable() && !delivery.isPartial()) {
                this.logger.LogInfo("Reading the received buffer, method name is %s ", this.logger.getMethodName());
                int size = delivery.pending();
                byte[] buffer = new byte[size];
                int read = receiveLink.recv(buffer, 0, buffer.length);
                receiveLink.advance();
                this.logger.LogInfo("Reading the received buffer completed, method name is %s ", this.logger.getMethodName());
                AmqpsMessage msg = new AmqpsMessage();
                msg.setDelivery(delivery);
                this.logger.LogInfo("Decoding the received message , method name is %s ", this.logger.getMethodName());
                msg.decode(buffer, 0, read);
                this.logger.LogInfo("Decoding the received message completed , method name is %s ", this.logger.getMethodName());
                this.messageReceivedFromServer(msg);
            }
        } else if (event.getType() == Event.Type.DELIVERY) {
            this.logger.LogInfo("Reading the delivery event in Sender link, method name is %s ", this.logger.getMethodName());
            Delivery d = event.getDelivery();
            DeliveryState remoteState = d.getRemoteState();
            boolean state = remoteState.equals(Accepted.getInstance());
            this.logger.LogInfo("Is state of remote Delivery COMPLETE ? %s, method name is %s ", state, this.logger.getMethodName());
            this.logger.LogInfo("Inform listener that a message has been sent to IoT Hub along with remote state, method name is %s ", this.logger.getMethodName());
            for (ServerListener listener : this.listeners) {
                listener.messageSent(d.hashCode(), state);
            }
            d.free();
        }
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void onLinkFlow(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        this.linkCredit = event.getLink().getCredit();
        this.logger.LogDebug("The link credit value is %s, method name is %s", this.linkCredit, this.logger.getMethodName());
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void onLinkRemoteOpen(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        Link link = event.getLink();
        if (link.getName().equals(SENDER_TAG)) {
            this.state = State.OPEN;
            for (ServerListener listener : this.listeners) {
                listener.connectionEstablished();
            }
        }
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void onLinkRemoteClose(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        this.state = State.CLOSED;
        if (event.getLink().getName().equals(SENDER_TAG)) {
            this.logger.LogInfo("Starting to reconnect to IotHub, method name is %s ", this.logger.getMethodName());
            this.startReconnect();
        }
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void onLinkInit(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        Link link = event.getLink();
        if (link.getName().equals(SENDER_TAG)) {
            Target t = new Target();
            t.setAddress(this.sendEndpoint);
            link.setTarget((org.apache.qpid.proton.amqp.transport.Target)t);
            link.setSenderSettleMode(SenderSettleMode.UNSETTLED);
        } else {
            Source source = new Source();
            source.setAddress(this.receiveEndpoint);
            link.setSource((org.apache.qpid.proton.amqp.transport.Source)source);
        }
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void onTransportError(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        this.state = State.CLOSED;
        this.logger.LogInfo("Starting to reconnect to IotHub, method name is %s ", this.logger.getMethodName());
        this.startReconnect();
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void addListener(ServerListener listener) {
        this.listeners.add(listener);
    }

    private void startReconnect() {
        this.reconnectCall = true;
        for (ServerListener listener : this.listeners) {
            listener.connectionLost();
        }
        if (this.currentReconnectionAttempt == Integer.MAX_VALUE) {
            this.currentReconnectionAttempt = 0;
        }
        System.out.println("Lost connection to the server. Reconnection attempt " + this.currentReconnectionAttempt++ + "...");
        this.logger.LogInfo("Lost connection to the server. Reconnection attempt %s, method name is %s ", this.currentReconnectionAttempt, this.logger.getMethodName());
        try {
            Thread.sleep(TransportUtils.generateSleepInterval(this.currentReconnectionAttempt));
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        this.closeAsync();
    }

    private void messageReceivedFromServer(AmqpsMessage msg) {
        this.logger.LogInfo("All the listeners are informed that a message has been received, method name is %s ", this.logger.getMethodName());
        for (ServerListener listener : this.listeners) {
            listener.messageReceived(msg);
        }
    }

    private SslDomain makeDomain() {
        SslDomain domain = Proton.sslDomain();
        domain.setSslContext(this.config.getIotHubSSLContext().getIotHubSSlContext());
        domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER);
        domain.init(SslDomain.Mode.CLIENT);
        return domain;
    }

    private class ReactorRunner
    implements Callable {
        private final IotHubReactor iotHubReactor;

        ReactorRunner(IotHubReactor iotHubReactor) {
            this.iotHubReactor = iotHubReactor;
        }

        public Object call() {
            this.iotHubReactor.run();
            return null;
        }
    }
}

