package io.vertx.ext.stomp.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.NetSocket;
import io.vertx.ext.stomp.Frame;
import io.vertx.ext.stomp.Frames;
import io.vertx.ext.stomp.StompClient;
import io.vertx.ext.stomp.StompClientConnection;
import io.vertx.ext.stomp.utils.Headers;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:io/vertx/ext/stomp/impl/StompClientConnectionImpl.class */
public class StompClientConnectionImpl implements StompClientConnection, Handler<Frame> {
    private static final Logger LOGGER = LoggerFactory.getLogger(StompClientConnectionImpl.class);
    private final StompClient client;
    private final NetSocket socket;
    private final Handler<AsyncResult<StompClientConnection>> resultHandler;
    private final Context context;
    private volatile long lastServerActivity;
    private String version;
    private String sessionId;
    private String server;
    private Handler<StompClientConnection> closeHandler;
    private Handler<Frame> receivedFrameHandler;
    private Handler<Frame> writingHandler;
    private Handler<Frame> errorHandler;
    private volatile boolean closed;
    private Handler<Throwable> exceptionHandler;
    private volatile boolean connected;
    private final Map<String, Handler<Void>> pendingReceipts = new HashMap();
    private final List<Subscription> subscriptions = new CopyOnWriteArrayList();
    private volatile long pinger = -1;
    private volatile long ponger = -1;
    private Handler<StompClientConnection> pingHandler = stompClientConnection -> {
        stompClientConnection.send(Frames.ping());
    };
    private Handler<StompClientConnection> droppedHandler = stompClientConnection -> {
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vertx/ext/stomp/impl/StompClientConnectionImpl$Subscription.class */
    public static class Subscription {
        final String destination;
        final String id;
        final Handler<Frame> handler;

        private Subscription(String str, String str2, Handler<Frame> handler) {
            this.destination = str;
            this.id = str2;
            this.handler = handler;
        }
    }

    public StompClientConnectionImpl(Vertx vertx, NetSocket netSocket, StompClient stompClient, Handler<AsyncResult<StompClientConnection>> handler) {
        this.socket = netSocket;
        this.client = stompClient;
        this.resultHandler = handler;
        this.context = vertx.getOrCreateContext();
        FrameParser frameParser = new FrameParser();
        frameParser.handler(this);
        netSocket.handler(buffer -> {
            this.lastServerActivity = System.nanoTime();
            frameParser.handle(buffer);
        }).closeHandler(r5 -> {
            if (this.closed || stompClient.isClosed()) {
                return;
            }
            close();
            if (this.droppedHandler != null) {
                this.droppedHandler.handle(this);
            }
        });
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public boolean isConnected() {
        return this.connected;
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public synchronized String session() {
        return this.sessionId;
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public synchronized String version() {
        return this.version;
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public synchronized void close() {
        this.closed = true;
        this.connected = false;
        if (this.closeHandler != null) {
            this.context.runOnContext(r4 -> {
                this.closeHandler.handle(this);
            });
        }
        if (this.pinger != -1) {
            this.client.vertx().cancelTimer(this.pinger);
            this.pinger = -1L;
        }
        if (this.ponger != -1) {
            this.client.vertx().cancelTimer(this.ponger);
            this.ponger = -1L;
        }
        this.socket.close();
        this.client.close();
        this.pendingReceipts.clear();
        this.subscriptions.clear();
        this.server = null;
        this.sessionId = null;
        this.version = null;
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public synchronized String server() {
        return this.server;
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection send(Map<String, String> map, Buffer buffer) {
        return send((String) null, map, buffer);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection send(Map<String, String> map, Buffer buffer, Handler<Frame> handler) {
        return send(null, map, buffer, handler);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection send(String str, Buffer buffer) {
        return send(str, (Map<String, String>) null, buffer);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection send(String str, Buffer buffer, Handler<Frame> handler) {
        return send(str, null, buffer, handler);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection send(Frame frame) {
        return send(frame, (Handler<Frame>) null);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public synchronized StompClientConnection send(Frame frame, Handler<Frame> handler) {
        if (handler != null) {
            String uuid = UUID.randomUUID().toString();
            frame.addHeader(Frame.RECEIPT, uuid);
            this.pendingReceipts.put(uuid, r5 -> {
                handler.handle(frame);
            });
        }
        if (this.writingHandler != null) {
            this.writingHandler.handle(frame);
        }
        this.socket.write(frame.toBuffer(this.client.options().isTrailingLine()));
        return this;
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection send(String str, Map<String, String> map, Buffer buffer) {
        return send(str, map, buffer, null);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection send(String str, Map<String, String> map, Buffer buffer, Handler<Frame> handler) {
        if (map == null) {
            map = new Headers();
        }
        if (str != null) {
            map.put(Frame.DESTINATION, str);
        }
        if (map.get(Frame.DESTINATION) == null) {
            throw new IllegalArgumentException("The 'destination' header is mandatory : " + map);
        }
        if (buffer != null && this.client.options().isAutoComputeContentLength() && !map.containsKey(Frame.CONTENT_LENGTH)) {
            map.put(Frame.CONTENT_LENGTH, Integer.toString(buffer.length()));
        }
        return send(new Frame(Frame.Command.SEND, map, buffer), handler);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public String subscribe(String str, Handler<Frame> handler) {
        return subscribe(str, (Map<String, String>) null, handler);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public String subscribe(String str, Handler<Frame> handler, Handler<Frame> handler2) {
        return subscribe(str, null, handler, handler2);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public String subscribe(String str, Map<String, String> map, Handler<Frame> handler) {
        return subscribe(str, map, handler, null);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public synchronized String subscribe(String str, Map<String, String> map, Handler<Frame> handler, Handler<Frame> handler2) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(handler);
        if (map == null) {
            map = Headers.create();
        }
        String orDefault = map.getOrDefault(Frame.ID, str);
        if (this.subscriptions.stream().filter(subscription -> {
            return subscription.id.equals(orDefault);
        }).findFirst().isPresent()) {
            throw new IllegalArgumentException("The client is already registered  to " + str);
        }
        this.subscriptions.add(new Subscription(str, orDefault, handler));
        map.put(Frame.DESTINATION, str);
        if (!map.containsKey(Frame.ID)) {
            map.put(Frame.ID, orDefault);
        }
        send(new Frame(Frame.Command.SUBSCRIBE, map, null), handler2);
        return orDefault;
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection unsubscribe(String str) {
        return unsubscribe(str, null, null);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection unsubscribe(String str, Handler<Frame> handler) {
        return unsubscribe(str, null, handler);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection unsubscribe(String str, Map<String, String> map) {
        return unsubscribe(str, map, null);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public synchronized StompClientConnection unsubscribe(String str, Map<String, String> map, Handler<Frame> handler) {
        Objects.requireNonNull(str);
        if (map == null) {
            map = Headers.create();
        }
        String str2 = map.containsKey(Frame.ID) ? map.get(Frame.ID) : str;
        map.put(Frame.ID, str2);
        Optional<Subscription> findFirst = this.subscriptions.stream().filter(subscription -> {
            return subscription.id.equals(str2);
        }).findFirst();
        if (!findFirst.isPresent()) {
            throw new IllegalArgumentException("No subscription with id " + str2);
        }
        this.subscriptions.remove(findFirst.get());
        send(new Frame(Frame.Command.UNSUBSCRIBE, map, null), handler);
        return this;
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public synchronized StompClientConnection errorHandler(Handler<Frame> handler) {
        this.errorHandler = handler;
        return this;
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public synchronized StompClientConnection closeHandler(Handler<StompClientConnection> handler) {
        this.closeHandler = handler;
        return this;
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public synchronized StompClientConnection pingHandler(Handler<StompClientConnection> handler) {
        this.pingHandler = handler;
        return this;
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection beginTX(String str, Handler<Frame> handler) {
        return beginTX(str, new Headers(), handler);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection beginTX(String str) {
        return beginTX(str, new Headers());
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection beginTX(String str, Map<String, String> map) {
        return beginTX(str, map, null);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection beginTX(String str, Map<String, String> map, Handler<Frame> handler) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(map);
        return send(new Frame().setCommand(Frame.Command.BEGIN).setTransaction(str), handler);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection commit(String str) {
        return commit(str, new Headers());
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection commit(String str, Handler<Frame> handler) {
        return commit(str, new Headers(), handler);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection commit(String str, Map<String, String> map) {
        return commit(str, map, null);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection commit(String str, Map<String, String> map, Handler<Frame> handler) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(map);
        return send(new Frame().setCommand(Frame.Command.COMMIT).setTransaction(str), handler);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection abort(String str) {
        return abort(str, new Headers());
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection abort(String str, Handler<Frame> handler) {
        return abort(str, new Headers(), handler);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection abort(String str, Map<String, String> map) {
        return abort(str, map, null);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection abort(String str, Map<String, String> map, Handler<Frame> handler) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(map);
        return send(new Frame().setCommand(Frame.Command.ABORT).setTransaction(str), handler);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection disconnect() {
        return disconnect(new Frame().setCommand(Frame.Command.DISCONNECT), null);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection disconnect(Frame frame) {
        return disconnect(frame, null);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection disconnect(Handler<Frame> handler) {
        return disconnect(new Frame().setCommand(Frame.Command.DISCONNECT), handler);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection disconnect(Frame frame, Handler<Frame> handler) {
        Objects.requireNonNull(frame);
        send(frame, frame2 -> {
            if (handler != null) {
                handler.handle(frame2);
            }
            if (this.closed) {
                return;
            }
            close();
        });
        return this;
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection ack(String str) {
        return ack(str, (Handler<Frame>) null);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection ack(String str, Handler<Frame> handler) {
        Objects.requireNonNull(str);
        send(new Frame(Frame.Command.ACK, Headers.create(Frame.ID, str), null), handler);
        return this;
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection nack(String str) {
        return nack(str, (Handler<Frame>) null);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection nack(String str, Handler<Frame> handler) {
        Objects.requireNonNull(str);
        send(new Frame(Frame.Command.NACK, Headers.create(Frame.ID, str), null), handler);
        return this;
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection ack(String str, String str2) {
        return ack(str, str2, null);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection ack(String str, String str2, Handler<Frame> handler) {
        Objects.requireNonNull(str, "A ACK frame must contain the ACK id");
        Objects.requireNonNull(str2);
        send(new Frame(Frame.Command.ACK, Headers.create(Frame.ID, str, Frame.TRANSACTION, str2), null), handler);
        return this;
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection nack(String str, String str2) {
        return nack(str, str2, null);
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public StompClientConnection nack(String str, String str2, Handler<Frame> handler) {
        Objects.requireNonNull(str, "A NACK frame must contain the ACK id");
        Objects.requireNonNull(str2);
        send(new Frame(Frame.Command.NACK, Headers.create(Frame.ID, str, Frame.TRANSACTION, str2), null), handler);
        return this;
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public synchronized StompClientConnection receivedFrameHandler(Handler<Frame> handler) {
        this.receivedFrameHandler = handler;
        return this;
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public synchronized StompClientConnection writingFrameHandler(Handler<Frame> handler) {
        this.writingHandler = handler;
        return this;
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public synchronized StompClientConnection exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        if (this.connected) {
            this.socket.exceptionHandler(handler);
        }
        return this;
    }

    @Override // io.vertx.ext.stomp.StompClientConnection
    public synchronized StompClientConnection connectionDroppedHandler(Handler<StompClientConnection> handler) {
        this.droppedHandler = handler;
        return this;
    }

    public void handle(Frame frame) {
        synchronized (this) {
            if (this.receivedFrameHandler != null) {
                this.receivedFrameHandler.handle(frame);
            }
        }
        switch (frame.getCommand()) {
            case CONNECTED:
                handleConnected(frame);
                return;
            case RECEIPT:
                handleReceipt(frame);
                return;
            case MESSAGE:
                String header = frame.getHeader(Frame.SUBSCRIPTION);
                this.subscriptions.stream().filter(subscription -> {
                    return subscription.id.equals(header);
                }).forEach(subscription2 -> {
                    subscription2.handler.handle(frame);
                });
                return;
            case ERROR:
                if (this.errorHandler != null) {
                    this.errorHandler.handle(frame);
                    return;
                }
                return;
            case PING:
            default:
                return;
        }
    }

    private synchronized void handleReceipt(Frame frame) {
        String header = frame.getHeader(Frame.RECEIPT_ID);
        if (header != null) {
            Handler<Void> remove = this.pendingReceipts.remove(header);
            if (remove == null) {
                throw new IllegalStateException("No receipt handler for receipt " + header);
            }
            remove.handle((Object) null);
        }
    }

    private synchronized void handleConnected(Frame frame) {
        this.sessionId = frame.getHeader(Frame.SESSION);
        this.version = frame.getHeader(Frame.VERSION);
        this.server = frame.getHeader(Frame.SERVER);
        long computePingPeriod = Frame.Heartbeat.computePingPeriod(Frame.Heartbeat.create(this.client.options().getHeartbeat()), Frame.Heartbeat.parse(frame.getHeader(Frame.HEARTBEAT)));
        long computePongPeriod = Frame.Heartbeat.computePongPeriod(Frame.Heartbeat.create(this.client.options().getHeartbeat()), Frame.Heartbeat.parse(frame.getHeader(Frame.HEARTBEAT)));
        if (computePingPeriod > 0) {
            this.pinger = this.client.vertx().setPeriodic(computePingPeriod, l -> {
                this.pingHandler.handle(this);
            });
        }
        if (computePongPeriod > 0) {
            this.ponger = this.client.vertx().setPeriodic(computePongPeriod, l2 -> {
                Handler<StompClientConnection> handler;
                long convert = TimeUnit.MILLISECONDS.convert(System.nanoTime() - this.lastServerActivity, TimeUnit.NANOSECONDS);
                if (convert > computePongPeriod * 2) {
                    LOGGER.error("Disconnecting client " + this.client + " - no server activity detected in the last " + convert + " ms.");
                    this.client.vertx().cancelTimer(this.ponger);
                    close();
                    synchronized (this) {
                        handler = this.droppedHandler;
                    }
                    if (handler != null) {
                        handler.handle(this);
                    }
                }
            });
        }
        this.socket.exceptionHandler(this.exceptionHandler);
        this.connected = true;
        this.resultHandler.handle(Future.succeededFuture(this));
    }

    public NetSocket socket() {
        return this.socket;
    }
}
