/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.ext.stomp.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.NetSocket;
import io.vertx.ext.stomp.Frame;
import io.vertx.ext.stomp.Frames;
import io.vertx.ext.stomp.StompClient;
import io.vertx.ext.stomp.StompClientConnection;
import io.vertx.ext.stomp.impl.FrameParser;
import io.vertx.ext.stomp.utils.Headers;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;

public class StompClientConnectionImpl
implements StompClientConnection,
Handler<Frame> {
    private static final Logger LOGGER = LoggerFactory.getLogger(StompClientConnectionImpl.class);
    private final StompClient client;
    private final NetSocket socket;
    private final Handler<AsyncResult<StompClientConnection>> resultHandler;
    private final Context context;
    private volatile long lastServerActivity;
    private final Map<String, Promise<Void>> pendingReceipts = new HashMap<String, Promise<Void>>();
    private String version;
    private String sessionId;
    private String server;
    private final List<Subscription> subscriptions = new CopyOnWriteArrayList<Subscription>();
    private volatile long pinger = -1L;
    private volatile long ponger = -1L;
    private Handler<StompClientConnection> pingHandler = connection -> connection.send(Frames.ping());
    private Handler<StompClientConnection> closeHandler;
    private Handler<StompClientConnection> droppedHandler = v -> {};
    private Handler<Frame> receivedFrameHandler;
    private Handler<Frame> writingHandler;
    private Handler<Frame> errorHandler;
    private volatile boolean closed;
    private Handler<Throwable> exceptionHandler;
    private volatile boolean connected;

    public StompClientConnectionImpl(Vertx vertx, NetSocket socket, StompClient client, Handler<AsyncResult<StompClientConnection>> resultHandler) {
        this.socket = socket;
        this.client = client;
        this.resultHandler = resultHandler;
        this.context = vertx.getOrCreateContext();
        FrameParser parser = new FrameParser();
        parser.handler(this);
        socket.handler(buffer -> {
            this.lastServerActivity = System.nanoTime();
            parser.handle((Buffer)buffer);
        }).closeHandler(v -> {
            if (!this.closed && !client.isClosed()) {
                this.close();
                if (this.droppedHandler != null) {
                    this.droppedHandler.handle((Object)this);
                }
            }
        });
    }

    @Override
    public boolean isConnected() {
        return this.connected;
    }

    @Override
    public synchronized String session() {
        return this.sessionId;
    }

    @Override
    public synchronized String version() {
        return this.version;
    }

    @Override
    public synchronized void close() {
        this.closed = true;
        this.connected = false;
        if (this.closeHandler != null) {
            this.context.runOnContext(v -> this.closeHandler.handle((Object)this));
        }
        if (this.pinger != -1L) {
            this.client.vertx().cancelTimer(this.pinger);
            this.pinger = -1L;
        }
        if (this.ponger != -1L) {
            this.client.vertx().cancelTimer(this.ponger);
            this.ponger = -1L;
        }
        ArrayList<Promise<Void>> values = new ArrayList<Promise<Void>>(this.pendingReceipts.values());
        this.pendingReceipts.clear();
        for (Promise promise : values) {
            promise.fail("Client closed");
        }
        this.socket.close();
        this.client.close();
        this.subscriptions.clear();
        this.server = null;
        this.sessionId = null;
        this.version = null;
    }

    @Override
    public synchronized String server() {
        return this.server;
    }

    @Override
    public Future<Frame> send(Map<String, String> headers, Buffer body) {
        return this.send(null, headers, body);
    }

    @Override
    public StompClientConnection send(Map<String, String> headers, Buffer body, Handler<AsyncResult<Frame>> receiptHandler) {
        return this.send(null, headers, body, receiptHandler);
    }

    @Override
    public Future<Frame> send(String destination, Buffer body) {
        return this.send(destination, null, body);
    }

    @Override
    public StompClientConnection send(String destination, Buffer body, Handler<AsyncResult<Frame>> receiptHandler) {
        return this.send(destination, null, body, receiptHandler);
    }

    @Override
    public Future<Frame> send(Frame frame) {
        Promise promise = Promise.promise();
        this.send(frame, (Handler<AsyncResult<Frame>>)promise);
        return promise.future();
    }

    @Override
    public synchronized StompClientConnection send(Frame frame, Handler<AsyncResult<Frame>> receiptHandler) {
        if (receiptHandler != null) {
            String receiptId = UUID.randomUUID().toString();
            frame.addHeader("receipt", receiptId);
            Promise promise = Promise.promise();
            promise.future().onComplete(f -> receiptHandler.handle((Object)f.map((Object)frame)));
            this.pendingReceipts.put(receiptId, (Promise<Void>)promise);
        }
        if (this.writingHandler != null) {
            this.writingHandler.handle((Object)frame);
        }
        this.socket.write((Object)frame.toBuffer(this.client.options().isTrailingLine()));
        return this;
    }

    @Override
    public Future<Frame> send(String destination, Map<String, String> headers, Buffer body) {
        Promise promise = Promise.promise();
        this.send(destination, headers, body, (Handler<AsyncResult<Frame>>)promise);
        return promise.future();
    }

    @Override
    public StompClientConnection send(String destination, Map<String, String> headers, Buffer body, Handler<AsyncResult<Frame>> receiptHandler) {
        if (headers == null) {
            headers = new Headers();
        }
        if (destination != null) {
            headers.put("destination", destination);
        }
        if (headers.get("destination") == null) {
            throw new IllegalArgumentException("The 'destination' header is mandatory : " + headers);
        }
        if (body != null && this.client.options().isAutoComputeContentLength() && !headers.containsKey("content-length")) {
            headers.put("content-length", Integer.toString(body.length()));
        }
        Frame frame = new Frame(Frame.Command.SEND, headers, body);
        return this.send(frame, receiptHandler);
    }

    @Override
    public Future<String> subscribe(String destination, Handler<Frame> handler) {
        Promise promise = Promise.promise();
        this.subscribe(destination, null, handler, (Handler<AsyncResult<String>>)promise);
        return promise.future();
    }

    @Override
    public StompClientConnection subscribe(String destination, Handler<Frame> handler, Handler<AsyncResult<String>> receiptHandler) {
        return this.subscribe(destination, null, handler, receiptHandler);
    }

    @Override
    public Future<String> subscribe(String destination, Map<String, String> headers, Handler<Frame> handler) {
        Promise promise = Promise.promise();
        this.subscribe(destination, headers, handler, (Handler<AsyncResult<String>>)promise);
        return promise.future();
    }

    @Override
    public synchronized StompClientConnection subscribe(String destination, Map<String, String> headers, Handler<Frame> handler, Handler<AsyncResult<String>> receiptHandler) {
        Objects.requireNonNull(destination);
        Objects.requireNonNull(handler);
        if (headers == null) {
            headers = Headers.create();
        }
        String id = headers.getOrDefault("id", destination);
        Optional<Subscription> maybeSubscription = this.subscriptions.stream().filter(s -> s.id.equals(id)).findFirst();
        if (maybeSubscription.isPresent()) {
            throw new IllegalArgumentException("The client is already registered  to " + destination);
        }
        this.subscriptions.add(new Subscription(destination, id, handler));
        headers.put("destination", destination);
        if (!headers.containsKey("id")) {
            headers.put("id", id);
        }
        Frame frame = new Frame(Frame.Command.SUBSCRIBE, headers, null);
        this.send(frame, (Handler<AsyncResult<Frame>>)((Handler)ar -> {
            if (receiptHandler != null) {
                receiptHandler.handle((Object)ar.map((Object)id));
            }
        }));
        return this;
    }

    @Override
    public Future<Frame> unsubscribe(String destination) {
        Promise promise = Promise.promise();
        this.unsubscribe(destination, null, (Handler<AsyncResult<Frame>>)promise);
        return promise.future();
    }

    @Override
    public StompClientConnection unsubscribe(String destination, Handler<AsyncResult<Frame>> receiptHandler) {
        return this.unsubscribe(destination, null, receiptHandler);
    }

    @Override
    public Future<Frame> unsubscribe(String destination, Map<String, String> headers) {
        Promise promise = Promise.promise();
        this.unsubscribe(destination, headers, (Handler<AsyncResult<Frame>>)promise);
        return promise.future();
    }

    @Override
    public synchronized StompClientConnection unsubscribe(String destination, Map<String, String> headers, Handler<AsyncResult<Frame>> receiptHandler) {
        Objects.requireNonNull(destination);
        if (headers == null) {
            headers = Headers.create();
        }
        String id = headers.containsKey("id") ? (String)headers.get("id") : destination;
        headers.put("id", id);
        Optional<Subscription> maybeSubscription = this.subscriptions.stream().filter(s -> s.id.equals(id)).findFirst();
        if (maybeSubscription.isPresent()) {
            Subscription subscription = maybeSubscription.get();
            this.subscriptions.remove(subscription);
            this.send(new Frame(Frame.Command.UNSUBSCRIBE, headers, null), receiptHandler);
            return this;
        }
        throw new IllegalArgumentException("No subscription with id " + id);
    }

    @Override
    public synchronized StompClientConnection errorHandler(Handler<Frame> handler) {
        this.errorHandler = handler;
        return this;
    }

    @Override
    public synchronized StompClientConnection closeHandler(Handler<StompClientConnection> handler) {
        this.closeHandler = handler;
        return this;
    }

    @Override
    public synchronized StompClientConnection pingHandler(Handler<StompClientConnection> handler) {
        this.pingHandler = handler;
        return this;
    }

    @Override
    public StompClientConnection beginTX(String id, Handler<AsyncResult<Frame>> receiptHandler) {
        return this.beginTX(id, new Headers(), receiptHandler);
    }

    @Override
    public Future<Frame> beginTX(String id) {
        return this.beginTX(id, new Headers());
    }

    @Override
    public Future<Frame> beginTX(String id, Map<String, String> headers) {
        Promise promise = Promise.promise();
        this.beginTX(id, headers, (Handler<AsyncResult<Frame>>)promise);
        return promise.future();
    }

    @Override
    public StompClientConnection beginTX(String id, Map<String, String> headers, Handler<AsyncResult<Frame>> receiptHandler) {
        Objects.requireNonNull(id);
        Objects.requireNonNull(headers);
        return this.send(new Frame().setCommand(Frame.Command.BEGIN).setTransaction(id), receiptHandler);
    }

    @Override
    public Future<Frame> commit(String id) {
        return this.commit(id, new Headers());
    }

    @Override
    public StompClientConnection commit(String id, Handler<AsyncResult<Frame>> receiptHandler) {
        return this.commit(id, new Headers(), receiptHandler);
    }

    @Override
    public Future<Frame> commit(String id, Map<String, String> headers) {
        Promise promise = Promise.promise();
        this.commit(id, headers, (Handler<AsyncResult<Frame>>)promise);
        return promise.future();
    }

    @Override
    public StompClientConnection commit(String id, Map<String, String> headers, Handler<AsyncResult<Frame>> receiptHandler) {
        Objects.requireNonNull(id);
        Objects.requireNonNull(headers);
        return this.send(new Frame().setCommand(Frame.Command.COMMIT).setTransaction(id), receiptHandler);
    }

    @Override
    public Future<Frame> abort(String id) {
        return this.abort(id, new Headers());
    }

    @Override
    public StompClientConnection abort(String id, Handler<AsyncResult<Frame>> receiptHandler) {
        return this.abort(id, new Headers(), receiptHandler);
    }

    @Override
    public Future<Frame> abort(String id, Map<String, String> headers) {
        Promise promise = Promise.promise();
        this.abort(id, headers, (Handler<AsyncResult<Frame>>)promise);
        return promise.future();
    }

    @Override
    public StompClientConnection abort(String id, Map<String, String> headers, Handler<AsyncResult<Frame>> receiptHandler) {
        Objects.requireNonNull(id);
        Objects.requireNonNull(headers);
        return this.send(new Frame().setCommand(Frame.Command.ABORT).setTransaction(id), receiptHandler);
    }

    @Override
    public Future<Frame> disconnect() {
        Promise promise = Promise.promise();
        this.disconnect(new Frame().setCommand(Frame.Command.DISCONNECT), (Handler<AsyncResult<Frame>>)promise);
        return promise.future();
    }

    @Override
    public Future<Frame> disconnect(Frame frame) {
        Promise promise = Promise.promise();
        this.disconnect(frame, (Handler<AsyncResult<Frame>>)promise);
        return promise.future();
    }

    @Override
    public StompClientConnection disconnect(Handler<AsyncResult<Frame>> receiptHandler) {
        return this.disconnect(new Frame().setCommand(Frame.Command.DISCONNECT), receiptHandler);
    }

    @Override
    public StompClientConnection disconnect(Frame frame, Handler<AsyncResult<Frame>> receiptHandler) {
        Objects.requireNonNull(frame);
        this.send(frame, (Handler<AsyncResult<Frame>>)((Handler)f -> {
            if (receiptHandler != null) {
                receiptHandler.handle(f);
            }
            if (!this.closed) {
                this.close();
            }
        }));
        return this;
    }

    @Override
    public Future<Frame> ack(String id) {
        Promise promise = Promise.promise();
        this.ack(id, (Handler<AsyncResult<Frame>>)promise);
        return promise.future();
    }

    @Override
    public StompClientConnection ack(String id, Handler<AsyncResult<Frame>> receiptHandler) {
        Objects.requireNonNull(id);
        this.send(new Frame(Frame.Command.ACK, Headers.create("id", id), null), receiptHandler);
        return this;
    }

    @Override
    public Future<Frame> nack(String id) {
        Promise promise = Promise.promise();
        this.nack(id, (Handler<AsyncResult<Frame>>)promise);
        return promise.future();
    }

    @Override
    public StompClientConnection nack(String id, Handler<AsyncResult<Frame>> receiptHandler) {
        Objects.requireNonNull(id);
        this.send(new Frame(Frame.Command.NACK, Headers.create("id", id), null), receiptHandler);
        return this;
    }

    @Override
    public Future<Frame> ack(String id, String txId) {
        Promise promise = Promise.promise();
        this.ack(id, txId, (Handler<AsyncResult<Frame>>)promise);
        return promise.future();
    }

    @Override
    public StompClientConnection ack(String id, String txId, Handler<AsyncResult<Frame>> receiptHandler) {
        Objects.requireNonNull(id, "A ACK frame must contain the ACK id");
        Objects.requireNonNull(txId);
        this.send(new Frame(Frame.Command.ACK, Headers.create("id", id, "transaction", txId), null), receiptHandler);
        return this;
    }

    @Override
    public Future<Frame> nack(String id, String txId) {
        Promise promise = Promise.promise();
        this.nack(id, txId, (Handler<AsyncResult<Frame>>)promise);
        return promise.future();
    }

    @Override
    public StompClientConnection nack(String id, String txId, Handler<AsyncResult<Frame>> receiptHandler) {
        Objects.requireNonNull(id, "A NACK frame must contain the ACK id");
        Objects.requireNonNull(txId);
        Frame toSend = new Frame(Frame.Command.NACK, Headers.create("id", id, "transaction", txId), null);
        this.send(toSend, receiptHandler);
        return this;
    }

    @Override
    public synchronized StompClientConnection receivedFrameHandler(Handler<Frame> handler) {
        this.receivedFrameHandler = handler;
        return this;
    }

    @Override
    public synchronized StompClientConnection writingFrameHandler(Handler<Frame> handler) {
        this.writingHandler = handler;
        return this;
    }

    @Override
    public synchronized StompClientConnection exceptionHandler(Handler<Throwable> exceptionHandler) {
        this.exceptionHandler = exceptionHandler;
        if (this.connected) {
            this.socket.exceptionHandler(exceptionHandler);
        }
        return this;
    }

    @Override
    public synchronized StompClientConnection connectionDroppedHandler(Handler<StompClientConnection> handler) {
        this.droppedHandler = handler;
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handle(Frame frame) {
        StompClientConnectionImpl stompClientConnectionImpl = this;
        synchronized (stompClientConnectionImpl) {
            if (this.receivedFrameHandler != null) {
                this.receivedFrameHandler.handle((Object)frame);
            }
        }
        switch (frame.getCommand()) {
            case CONNECTED: {
                this.handleConnected(frame);
                break;
            }
            case RECEIPT: {
                this.handleReceipt(frame);
                break;
            }
            case MESSAGE: {
                String id = frame.getHeader("subscription");
                this.subscriptions.stream().filter(s -> s.id.equals(id)).forEach(s -> s.handler.handle((Object)frame));
                break;
            }
            case ERROR: {
                if (this.errorHandler == null) break;
                this.errorHandler.handle((Object)frame);
                break;
            }
        }
    }

    private synchronized void handleReceipt(Frame frame) {
        String receipt = frame.getHeader("receipt-id");
        if (receipt != null) {
            Promise<Void> receiptHandler = this.pendingReceipts.remove(receipt);
            if (receiptHandler == null) {
                throw new IllegalStateException("No receipt handler for receipt " + receipt);
            }
            receiptHandler.complete();
        }
    }

    private synchronized void handleConnected(Frame frame) {
        this.sessionId = frame.getHeader("session");
        this.version = frame.getHeader("version");
        this.server = frame.getHeader("server");
        long ping = Frame.Heartbeat.computePingPeriod(Frame.Heartbeat.create(this.client.options().getHeartbeat()), Frame.Heartbeat.parse(frame.getHeader("heart-beat")));
        long pong = Frame.Heartbeat.computePongPeriod(Frame.Heartbeat.create(this.client.options().getHeartbeat()), Frame.Heartbeat.parse(frame.getHeader("heart-beat")));
        if (ping > 0L) {
            this.pinger = this.client.vertx().setPeriodic(ping, l -> this.pingHandler.handle((Object)this));
        }
        if (pong > 0L) {
            this.ponger = this.client.vertx().setPeriodic(pong, l -> {
                long delta = System.nanoTime() - this.lastServerActivity;
                long deltaInMs = TimeUnit.MILLISECONDS.convert(delta, TimeUnit.NANOSECONDS);
                if (deltaInMs > pong * 2L) {
                    Handler<StompClientConnection> handler;
                    LOGGER.error((Object)("Disconnecting client " + this.client + " - no server activity detected in the last " + deltaInMs + " ms."));
                    this.client.vertx().cancelTimer(this.ponger);
                    this.close();
                    StompClientConnectionImpl stompClientConnectionImpl = this;
                    synchronized (stompClientConnectionImpl) {
                        handler = this.droppedHandler;
                    }
                    if (handler != null) {
                        handler.handle((Object)this);
                    }
                }
            });
        }
        this.socket.exceptionHandler(this.exceptionHandler);
        this.connected = true;
        this.resultHandler.handle((Object)Future.succeededFuture((Object)this));
    }

    public NetSocket socket() {
        return this.socket;
    }

    private static class Subscription {
        final String destination;
        final String id;
        final Handler<Frame> handler;

        private Subscription(String destination, String id, Handler<Frame> handler) {
            this.destination = destination;
            this.id = id;
            this.handler = handler;
        }
    }
}

