package io.vertx.core.eventbus.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.MessageProducer;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;
import io.vertx.core.eventbus.impl.codecs.BooleanMessageCodec;
import io.vertx.core.eventbus.impl.codecs.BufferMessageCodec;
import io.vertx.core.eventbus.impl.codecs.ByteArrayMessageCodec;
import io.vertx.core.eventbus.impl.codecs.ByteMessageCodec;
import io.vertx.core.eventbus.impl.codecs.CharMessageCodec;
import io.vertx.core.eventbus.impl.codecs.DoubleMessageCodec;
import io.vertx.core.eventbus.impl.codecs.FloatMessageCodec;
import io.vertx.core.eventbus.impl.codecs.IntMessageCodec;
import io.vertx.core.eventbus.impl.codecs.JsonArrayMessageCodec;
import io.vertx.core.eventbus.impl.codecs.JsonObjectMessageCodec;
import io.vertx.core.eventbus.impl.codecs.LongMessageCodec;
import io.vertx.core.eventbus.impl.codecs.NullMessageCodec;
import io.vertx.core.eventbus.impl.codecs.PingMessageCodec;
import io.vertx.core.eventbus.impl.codecs.ReplyExceptionMessageCodec;
import io.vertx.core.eventbus.impl.codecs.ShortMessageCodec;
import io.vertx.core.eventbus.impl.codecs.StringMessageCodec;
import io.vertx.core.impl.Arguments;
import io.vertx.core.impl.Closeable;
import io.vertx.core.impl.ContextImpl;
import io.vertx.core.impl.HAManager;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.impl.LoggerFactory;
import io.vertx.core.net.ClientOptionsBase;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.NetClientImpl;
import io.vertx.core.net.impl.ServerID;
import io.vertx.core.parsetools.RecordParser;
import io.vertx.core.spi.cluster.AsyncMultiMap;
import io.vertx.core.spi.cluster.ChoosableIterable;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.metrics.EventBusMetrics;
import io.vertx.core.spi.metrics.MetricsProvider;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.StreamBase;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/* loaded from: input_file:io/vertx/core/eventbus/impl/EventBusImpl.class */
public class EventBusImpl implements EventBus, MetricsProvider {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) EventBusImpl.class);
    private static final MessageCodec<String, String> PING_MESSAGE_CODEC = new PingMessageCodec();
    private static final MessageCodec<String, String> NULL_MESSAGE_CODEC = new NullMessageCodec();
    private static final MessageCodec<String, String> STRING_MESSAGE_CODEC = new StringMessageCodec();
    private static final MessageCodec<Buffer, Buffer> BUFFER_MESSAGE_CODEC = new BufferMessageCodec();
    private static final MessageCodec<JsonObject, JsonObject> JSON_OBJECT_MESSAGE_CODEC = new JsonObjectMessageCodec();
    private static final MessageCodec<JsonArray, JsonArray> JSON_ARRAY_MESSAGE_CODEC = new JsonArrayMessageCodec();
    private static final MessageCodec<byte[], byte[]> BYTE_ARRAY_MESSAGE_CODEC = new ByteArrayMessageCodec();
    private static final MessageCodec<Integer, Integer> INT_MESSAGE_CODEC = new IntMessageCodec();
    private static final MessageCodec<Long, Long> LONG_MESSAGE_CODEC = new LongMessageCodec();
    private static final MessageCodec<Float, Float> FLOAT_MESSAGE_CODEC = new FloatMessageCodec();
    private static final MessageCodec<Double, Double> DOUBLE_MESSAGE_CODEC = new DoubleMessageCodec();
    private static final MessageCodec<Boolean, Boolean> BOOLEAN_MESSAGE_CODEC = new BooleanMessageCodec();
    private static final MessageCodec<Short, Short> SHORT_MESSAGE_CODEC = new ShortMessageCodec();
    private static final MessageCodec<Character, Character> CHAR_MESSAGE_CODEC = new CharMessageCodec();
    private static final MessageCodec<Byte, Byte> BYTE_MESSAGE_CODEC = new ByteMessageCodec();
    private static final MessageCodec<ReplyException, ReplyException> REPLY_EXCEPTION_MESSAGE_CODEC = new ReplyExceptionMessageCodec();
    private static final Buffer PONG = Buffer.buffer(new byte[]{1});
    private static final String PING_ADDRESS = "__vertx_ping";
    private final VertxInternal vertx;
    private final long pingInterval;
    private final long pingReplyInterval;
    private final ConcurrentMap<ServerID, ConnectionHolder> connections;
    private final ConcurrentMap<String, Handlers> handlerMap;
    private final ConcurrentMap<String, MessageCodec> userCodecMap;
    private final ConcurrentMap<Class, MessageCodec> defaultCodecMap;
    private final HAManager haManager;
    private final ClusterManager clusterMgr;
    private final AtomicLong replySequence;
    private final EventBusMetrics metrics;
    private final AsyncMultiMap<String, ServerID> subs;
    private final MessageCodec[] systemCodecs;
    private final ServerID serverID;
    private final NetServer server;
    private volatile boolean sendPong;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vertx/core/eventbus/impl/EventBusImpl$ConnectionHolder.class */
    public class ConnectionHolder {
        final NetClient client;
        final Queue<MessageImpl> pending;
        final ServerID theServerID;
        volatile NetSocket socket;
        volatile boolean connected;
        long timeoutID;
        long pingTimeoutID;

        private ConnectionHolder(ServerID serverID) {
            this.pending = new ArrayDeque();
            this.timeoutID = -1L;
            this.pingTimeoutID = -1L;
            this.theServerID = serverID;
            this.client = new NetClientImpl(EventBusImpl.this.vertx, new NetClientOptions().setConnectTimeout(ClientOptionsBase.DEFAULT_CONNECT_TIMEOUT), false);
        }

        void close(boolean z) {
            if (this.timeoutID != -1) {
                EventBusImpl.this.vertx.cancelTimer(this.timeoutID);
            }
            if (this.pingTimeoutID != -1) {
                EventBusImpl.this.vertx.cancelTimer(this.pingTimeoutID);
            }
            try {
                this.client.close();
            } catch (Exception e) {
            }
            if (EventBusImpl.this.connections.remove(this.theServerID, this)) {
                EventBusImpl.log.debug("Cluster connection closed: " + this.theServerID + " holder " + this);
            }
        }

        void schedulePing() {
            this.pingTimeoutID = EventBusImpl.this.vertx.setTimer(EventBusImpl.this.pingInterval, l -> {
                this.timeoutID = EventBusImpl.this.vertx.setTimer(EventBusImpl.this.pingReplyInterval, l -> {
                    EventBusImpl.log.warn("No pong from server " + EventBusImpl.this.serverID + " - will consider it dead");
                    close(true);
                });
                this.socket.write(new MessageImpl(EventBusImpl.this.serverID, EventBusImpl.PING_ADDRESS, null, null, null, new PingMessageCodec(), true).encodeToWire());
            });
        }

        synchronized void writeMessage(MessageImpl messageImpl) {
            if (!this.connected) {
                this.pending.add(messageImpl);
                return;
            }
            Buffer encodeToWire = messageImpl.encodeToWire();
            EventBusImpl.this.metrics.messageWritten(messageImpl.address(), encodeToWire.length());
            this.socket.write(encodeToWire);
        }

        synchronized void connected(NetSocket netSocket) {
            this.socket = netSocket;
            this.connected = true;
            netSocket.exceptionHandler(th -> {
                close(true);
            });
            netSocket.closeHandler(r4 -> {
                close(false);
            });
            netSocket.handler2(buffer -> {
                EventBusImpl.this.vertx.cancelTimer(this.timeoutID);
                schedulePing();
            });
            schedulePing();
            for (MessageImpl messageImpl : this.pending) {
                Buffer encodeToWire = messageImpl.encodeToWire();
                EventBusImpl.this.metrics.messageWritten(messageImpl.address(), encodeToWire.length());
                netSocket.write(encodeToWire);
            }
            this.pending.clear();
        }

        void connect() {
            this.client.connect(this.theServerID.port, this.theServerID.host, asyncResult -> {
                if (asyncResult.succeeded()) {
                    connected((NetSocket) asyncResult.result());
                } else {
                    close(true);
                }
            });
        }
    }

    /* loaded from: input_file:io/vertx/core/eventbus/impl/EventBusImpl$EventBusNetServer.class */
    public static class EventBusNetServer {
        private final NetServer netServer;
        private Handler<NetSocket> handler;

        public EventBusNetServer(NetServer netServer) {
            this.netServer = netServer;
            netServer.connectHandler(netSocket -> {
                synchronized (this) {
                    this.handler.handle(netSocket);
                }
            });
        }

        public synchronized void setHandler(Handler<NetSocket> handler) {
            this.handler = handler;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vertx/core/eventbus/impl/EventBusImpl$HandlerEntry.class */
    public class HandlerEntry<T> implements Closeable {
        final String address;
        final Handler<Message<T>> handler;

        private HandlerEntry(String str, Handler<Message<T>> handler) {
            this.address = str;
            this.handler = handler;
        }

        public boolean equals(Object obj) {
            if (obj == null) {
                return false;
            }
            if (this == obj) {
                return true;
            }
            if (getClass() != obj.getClass()) {
                return false;
            }
            HandlerEntry handlerEntry = (HandlerEntry) obj;
            return this.address.equals(handlerEntry.address) && this.handler.equals(handlerEntry.handler);
        }

        public int hashCode() {
            return (31 * (this.address != null ? this.address.hashCode() : 0)) + (this.handler != null ? this.handler.hashCode() : 0);
        }

        @Override // io.vertx.core.impl.Closeable
        public void close(Handler<AsyncResult<Void>> handler) {
            EventBusImpl.this.unregisterHandler(this.address, this.handler, null);
            handler.handle(Future.succeededFuture());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vertx/core/eventbus/impl/EventBusImpl$HandlerHolder.class */
    public class HandlerHolder<T> {
        final ContextImpl context;
        final HandlerRegistration<T> handler;
        final boolean replyHandler;
        final boolean localOnly;
        final long timeoutID;
        boolean removed;

        void setRemoved() {
            boolean z = false;
            synchronized (this) {
                if (!this.removed) {
                    this.removed = true;
                    z = true;
                }
            }
            if (z) {
                EventBusImpl.this.metrics.handlerUnregistered(((HandlerRegistration) this.handler).metric);
            }
        }

        synchronized boolean isRemoved() {
            return this.removed;
        }

        HandlerHolder(HandlerRegistration<T> handlerRegistration, boolean z, boolean z2, ContextImpl contextImpl, long j) {
            this.context = contextImpl;
            this.handler = handlerRegistration;
            this.replyHandler = z;
            this.localOnly = z2;
            this.timeoutID = j;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            HandlerHolder handlerHolder = (HandlerHolder) obj;
            return this.handler != null ? this.handler.equals(handlerHolder.handler) : handlerHolder.handler == null;
        }

        public int hashCode() {
            if (this.handler != null) {
                return this.handler.hashCode();
            }
            return 0;
        }
    }

    /* loaded from: input_file:io/vertx/core/eventbus/impl/EventBusImpl$HandlerRegistration.class */
    public class HandlerRegistration<T> implements MessageConsumer<T>, Handler<Message<T>> {
        private final String address;
        private final boolean replyHandler;
        private final boolean localOnly;
        private final long timeoutID;
        private boolean registered;
        private Handler<Message<T>> handler;
        private AsyncResult<Void> result;
        private Handler<AsyncResult<Void>> completionHandler;
        private Handler<Void> endHandler;
        private Handler<Throwable> exceptionHandler;
        private Handler<Message<T>> discardHandler;
        private int maxBufferedMessages;
        private final Queue<Message<T>> pending = new ArrayDeque(8);
        private boolean paused;
        private Object metric;

        public HandlerRegistration(String str, boolean z, boolean z2, long j) {
            this.address = str;
            this.replyHandler = z;
            this.localOnly = z2;
            this.timeoutID = j;
        }

        @Override // io.vertx.core.eventbus.MessageConsumer
        public synchronized MessageConsumer<T> setMaxBufferedMessages(int i) {
            Arguments.require(i >= 0, "Max buffered messages cannot be negative");
            while (this.pending.size() > i) {
                this.pending.poll();
            }
            this.maxBufferedMessages = i;
            return this;
        }

        @Override // io.vertx.core.eventbus.MessageConsumer
        public synchronized int getMaxBufferedMessages() {
            return this.maxBufferedMessages;
        }

        @Override // io.vertx.core.eventbus.MessageConsumer
        public String address() {
            return this.address;
        }

        @Override // io.vertx.core.eventbus.MessageConsumer
        public synchronized void completionHandler(Handler<AsyncResult<Void>> handler) {
            Objects.requireNonNull(handler);
            if (this.result == null) {
                this.completionHandler = handler;
            } else {
                AsyncResult<Void> asyncResult = this.result;
                EventBusImpl.this.vertx.runOnContext(r5 -> {
                    handler.handle(asyncResult);
                });
            }
        }

        @Override // io.vertx.core.eventbus.MessageConsumer
        public synchronized void unregister() {
            doUnregister(null);
        }

        @Override // io.vertx.core.eventbus.MessageConsumer
        public synchronized void unregister(Handler<AsyncResult<Void>> handler) {
            Objects.requireNonNull(handler);
            doUnregister(handler);
        }

        private void doUnregister(Handler<AsyncResult<Void>> handler) {
            if (this.endHandler != null) {
                Handler<Void> handler2 = this.endHandler;
                handler = asyncResult -> {
                    handler2.handle(null);
                    if (handler != null) {
                        handler.handle(asyncResult);
                    }
                };
            }
            if (this.registered) {
                this.registered = false;
                EventBusImpl.this.unregisterHandler(this.address, this, handler);
            } else {
                EventBusImpl.this.callCompletionHandlerAsync(handler);
            }
            this.registered = false;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized void setResult(AsyncResult<Void> asyncResult) {
            this.result = asyncResult;
            if (this.completionHandler != null) {
                if (asyncResult.succeeded()) {
                    this.metric = EventBusImpl.this.metrics.handlerRegistered(this.address, this.replyHandler);
                }
                Handler<AsyncResult<Void>> handler = this.completionHandler;
                EventBusImpl.this.vertx.runOnContext(r5 -> {
                    handler.handle(asyncResult);
                });
                return;
            }
            if (asyncResult.failed()) {
                EventBusImpl.log.error("Failed to propagate registration for handler " + this.handler + " and address " + this.address);
            } else {
                this.metric = EventBusImpl.this.metrics.handlerRegistered(this.address, this.replyHandler);
            }
        }

        @Override // io.vertx.core.Handler
        public synchronized void handle(Message<T> message) {
            if (this.paused) {
                if (this.pending.size() < this.maxBufferedMessages) {
                    this.pending.add(message);
                    return;
                } else {
                    if (this.discardHandler != null) {
                        this.discardHandler.handle(message);
                        return;
                    }
                    return;
                }
            }
            checkNextTick();
            EventBusImpl.this.metrics.beginHandleMessage(this.metric, ((MessageImpl) message).getSocket() == null);
            try {
                this.handler.handle(message);
                EventBusImpl.this.metrics.endHandleMessage(this.metric, null);
            } catch (Exception e) {
                EventBusImpl.this.metrics.endHandleMessage(this.metric, e);
                throw e;
            }
        }

        public synchronized void discardHandler(Handler<Message<T>> handler) {
            this.discardHandler = handler;
        }

        @Override // io.vertx.core.eventbus.MessageConsumer, io.vertx.core.streams.ReadStream
        /* renamed from: handler */
        public synchronized MessageConsumer<T> handler2(Handler<Message<T>> handler) {
            this.handler = handler;
            if (this.handler != null && !this.registered) {
                this.registered = true;
                EventBusImpl.this.registerHandler(this.address, (HandlerRegistration) this, this.replyHandler, this.localOnly, this.timeoutID);
            } else if (this.handler == null && this.registered) {
                unregister();
            }
            return this;
        }

        @Override // io.vertx.core.eventbus.MessageConsumer
        public ReadStream<T> bodyStream() {
            return new BodyReadStream(this);
        }

        @Override // io.vertx.core.eventbus.MessageConsumer
        public synchronized boolean isRegistered() {
            return this.registered;
        }

        @Override // io.vertx.core.eventbus.MessageConsumer, io.vertx.core.streams.ReadStream
        /* renamed from: pause */
        public synchronized MessageConsumer<T> pause2() {
            if (!this.paused) {
                this.paused = true;
            }
            return this;
        }

        @Override // io.vertx.core.eventbus.MessageConsumer, io.vertx.core.streams.ReadStream
        /* renamed from: resume */
        public synchronized MessageConsumer<T> resume2() {
            if (this.paused) {
                this.paused = false;
                checkNextTick();
            }
            return this;
        }

        @Override // io.vertx.core.eventbus.MessageConsumer, io.vertx.core.streams.ReadStream
        public synchronized MessageConsumer<T> endHandler(Handler<Void> handler) {
            if (handler != null) {
                ContextImpl orCreateContext = EventBusImpl.this.vertx.getOrCreateContext();
                this.endHandler = r5 -> {
                    orCreateContext.runOnContext(r4 -> {
                        handler.handle(null);
                    });
                };
            } else {
                this.endHandler = null;
            }
            return this;
        }

        @Override // io.vertx.core.eventbus.MessageConsumer, io.vertx.core.streams.ReadStream, io.vertx.core.streams.StreamBase
        public synchronized MessageConsumer<T> exceptionHandler(Handler<Throwable> handler) {
            this.exceptionHandler = handler;
            return this;
        }

        private void checkNextTick() {
            if (this.pending.isEmpty()) {
                return;
            }
            EventBusImpl.this.vertx.runOnContext(r4 -> {
                Message<T> poll;
                if (this.paused || (poll = this.pending.poll()) == null) {
                    return;
                }
                handle((Message) poll);
            });
        }

        @Override // io.vertx.core.eventbus.MessageConsumer, io.vertx.core.streams.ReadStream
        public /* bridge */ /* synthetic */ ReadStream endHandler(Handler handler) {
            return endHandler((Handler<Void>) handler);
        }

        @Override // io.vertx.core.eventbus.MessageConsumer, io.vertx.core.streams.ReadStream, io.vertx.core.streams.StreamBase
        public /* bridge */ /* synthetic */ ReadStream exceptionHandler(Handler handler) {
            return exceptionHandler((Handler<Throwable>) handler);
        }

        @Override // io.vertx.core.eventbus.MessageConsumer, io.vertx.core.streams.ReadStream, io.vertx.core.streams.StreamBase
        public /* bridge */ /* synthetic */ StreamBase exceptionHandler(Handler handler) {
            return exceptionHandler((Handler<Throwable>) handler);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vertx/core/eventbus/impl/EventBusImpl$Handlers.class */
    public static class Handlers {
        final List<HandlerHolder> list;
        final AtomicInteger pos;

        private Handlers() {
            this.list = new CopyOnWriteArrayList();
            this.pos = new AtomicInteger(0);
        }

        HandlerHolder choose() {
            while (true) {
                int size = this.list.size();
                if (size == 0) {
                    return null;
                }
                int andIncrement = this.pos.getAndIncrement();
                if (andIncrement >= size - 1) {
                    this.pos.set(0);
                }
                try {
                    return this.list.get(andIncrement);
                } catch (IndexOutOfBoundsException e) {
                    this.pos.set(0);
                }
            }
        }
    }

    public EventBusImpl(VertxInternal vertxInternal) {
        this.connections = new ConcurrentHashMap();
        this.handlerMap = new ConcurrentHashMap();
        this.userCodecMap = new ConcurrentHashMap();
        this.defaultCodecMap = new ConcurrentHashMap();
        this.replySequence = new AtomicLong(0L);
        this.sendPong = true;
        this.vertx = vertxInternal;
        this.pingInterval = -1L;
        this.pingReplyInterval = -1L;
        this.serverID = new ServerID(-1, "localhost");
        this.server = null;
        this.subs = null;
        this.clusterMgr = null;
        this.haManager = null;
        this.metrics = vertxInternal.metricsSPI().createMetrics(this);
        this.systemCodecs = systemCodecs();
    }

    public EventBusImpl(VertxInternal vertxInternal, long j, long j2, ClusterManager clusterManager, HAManager hAManager, AsyncMultiMap<String, ServerID> asyncMultiMap, ServerID serverID, EventBusNetServer eventBusNetServer) {
        this.connections = new ConcurrentHashMap();
        this.handlerMap = new ConcurrentHashMap();
        this.userCodecMap = new ConcurrentHashMap();
        this.defaultCodecMap = new ConcurrentHashMap();
        this.replySequence = new AtomicLong(0L);
        this.sendPong = true;
        this.vertx = vertxInternal;
        this.clusterMgr = clusterManager;
        this.haManager = hAManager;
        this.metrics = vertxInternal.metricsSPI().createMetrics(this);
        this.pingInterval = j;
        this.pingReplyInterval = j2;
        this.subs = asyncMultiMap;
        this.systemCodecs = systemCodecs();
        this.serverID = serverID;
        this.server = eventBusNetServer.netServer;
        setServerHandler(eventBusNetServer);
        addFailoverCompleteHandler();
    }

    @Override // io.vertx.core.eventbus.EventBus
    public EventBus send(String str, Object obj) {
        return send(str, obj, new DeliveryOptions(), null);
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> EventBus send(String str, Object obj, Handler<AsyncResult<Message<T>>> handler) {
        return send(str, obj, new DeliveryOptions(), handler);
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> EventBus send(String str, Object obj, DeliveryOptions deliveryOptions) {
        return send(str, obj, deliveryOptions, null);
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> EventBus send(String str, Object obj, DeliveryOptions deliveryOptions, Handler<AsyncResult<Message<T>>> handler) {
        sendOrPub(null, createMessage(true, str, deliveryOptions.getHeaders(), obj, deliveryOptions.getCodecName()), deliveryOptions, handler);
        return this;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> MessageProducer<T> sender(String str) {
        Objects.requireNonNull(str, "address");
        return new MessageProducerImpl(this, str, true, new DeliveryOptions());
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> MessageProducer<T> sender(String str, DeliveryOptions deliveryOptions) {
        Objects.requireNonNull(str, "address");
        Objects.requireNonNull(deliveryOptions, "options");
        return new MessageProducerImpl(this, str, true, deliveryOptions);
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> MessageProducer<T> publisher(String str) {
        Objects.requireNonNull(str, "address");
        return new MessageProducerImpl(this, str, false, new DeliveryOptions());
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> MessageProducer<T> publisher(String str, DeliveryOptions deliveryOptions) {
        Objects.requireNonNull(str, "address");
        Objects.requireNonNull(deliveryOptions, "options");
        return new MessageProducerImpl(this, str, false, deliveryOptions);
    }

    @Override // io.vertx.core.eventbus.EventBus
    public EventBus publish(String str, Object obj) {
        return publish(str, obj, new DeliveryOptions());
    }

    @Override // io.vertx.core.eventbus.EventBus
    public EventBus publish(String str, Object obj, DeliveryOptions deliveryOptions) {
        sendOrPub(null, createMessage(false, str, deliveryOptions.getHeaders(), obj, deliveryOptions.getCodecName()), deliveryOptions, null);
        return this;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> MessageConsumer<T> consumer(String str) {
        Objects.requireNonNull(str, "address");
        return new HandlerRegistration(str, false, false, -1L);
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> MessageConsumer<T> consumer(String str, Handler<Message<T>> handler) {
        Objects.requireNonNull(handler, "handler");
        MessageConsumer<T> consumer = consumer(str);
        consumer.handler2((Handler) handler);
        return consumer;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> MessageConsumer<T> localConsumer(String str) {
        Objects.requireNonNull(str, "address");
        return new HandlerRegistration(str, false, true, -1L);
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> MessageConsumer<T> localConsumer(String str, Handler<Message<T>> handler) {
        Objects.requireNonNull(handler, "handler");
        MessageConsumer<T> localConsumer = localConsumer(str);
        localConsumer.handler2((Handler) handler);
        return localConsumer;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public EventBus registerCodec(MessageCodec messageCodec) {
        Objects.requireNonNull(messageCodec, "codec");
        Objects.requireNonNull(messageCodec.name(), "code.name()");
        checkSystemCodec(messageCodec);
        if (this.userCodecMap.containsKey(messageCodec.name())) {
            throw new IllegalStateException("Already a codec registered with name " + messageCodec.name());
        }
        this.userCodecMap.put(messageCodec.name(), messageCodec);
        return this;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public EventBus unregisterCodec(String str) {
        Objects.requireNonNull(str);
        this.userCodecMap.remove(str);
        return this;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> EventBus registerDefaultCodec(Class<T> cls, MessageCodec<T, ?> messageCodec) {
        Objects.requireNonNull(cls);
        Objects.requireNonNull(messageCodec, "codec");
        Objects.requireNonNull(messageCodec.name(), "code.name()");
        checkSystemCodec(messageCodec);
        if (this.defaultCodecMap.containsKey(cls)) {
            throw new IllegalStateException("Already a default codec registered for class " + cls);
        }
        if (this.userCodecMap.containsKey(messageCodec.name())) {
            throw new IllegalStateException("Already a codec registered with name " + messageCodec.name());
        }
        this.defaultCodecMap.put(cls, messageCodec);
        this.userCodecMap.put(messageCodec.name(), messageCodec);
        return this;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public EventBus unregisterDefaultCodec(Class cls) {
        Objects.requireNonNull(cls);
        MessageCodec remove = this.defaultCodecMap.remove(cls);
        if (remove != null) {
            this.userCodecMap.remove(remove.name());
        }
        return this;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public void close(Handler<AsyncResult<Void>> handler) {
        unregisterAllHandlers();
        if (this.server != null) {
            this.server.close(asyncResult -> {
                if (asyncResult.failed()) {
                    log.error("Failed to close server", asyncResult.cause());
                }
                Iterator<ConnectionHolder> it = this.connections.values().iterator();
                while (it.hasNext()) {
                    it.next().close(false);
                }
                closeClusterManager(handler);
            });
        } else {
            closeClusterManager(handler);
        }
    }

    private void unregisterAllHandlers() {
        Iterator<Handlers> it = this.handlerMap.values().iterator();
        while (it.hasNext()) {
            Iterator<HandlerHolder> it2 = it.next().list.iterator();
            while (it2.hasNext()) {
                it2.next().handler.unregister();
            }
        }
    }

    @Override // io.vertx.core.metrics.Measured
    public boolean isMetricsEnabled() {
        return this.metrics != null && this.metrics.isEnabled();
    }

    @Override // io.vertx.core.spi.metrics.MetricsProvider
    public EventBusMetrics<?> getMetrics() {
        return this.metrics;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> void sendReply(ServerID serverID, MessageImpl messageImpl, DeliveryOptions deliveryOptions, Handler<AsyncResult<Message<T>>> handler) {
        if (messageImpl.address() == null) {
            sendNoHandlersFailure(null, handler);
        } else {
            sendOrPub(serverID, messageImpl, deliveryOptions, handler);
        }
    }

    public void simulateUnresponsive() {
        this.sendPong = false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageImpl createMessage(boolean z, String str, MultiMap multiMap, Object obj, String str2) {
        MessageCodec messageCodec;
        Objects.requireNonNull(str, "no null address accepted");
        if (str2 != null) {
            messageCodec = this.userCodecMap.get(str2);
            if (messageCodec == null) {
                throw new IllegalArgumentException("No message codec for name: " + str2);
            }
        } else if (obj == null) {
            messageCodec = NULL_MESSAGE_CODEC;
        } else if (obj instanceof String) {
            messageCodec = STRING_MESSAGE_CODEC;
        } else if (obj instanceof Buffer) {
            messageCodec = BUFFER_MESSAGE_CODEC;
        } else if (obj instanceof JsonObject) {
            messageCodec = JSON_OBJECT_MESSAGE_CODEC;
        } else if (obj instanceof JsonArray) {
            messageCodec = JSON_ARRAY_MESSAGE_CODEC;
        } else if (obj instanceof byte[]) {
            messageCodec = BYTE_ARRAY_MESSAGE_CODEC;
        } else if (obj instanceof Integer) {
            messageCodec = INT_MESSAGE_CODEC;
        } else if (obj instanceof Long) {
            messageCodec = LONG_MESSAGE_CODEC;
        } else if (obj instanceof Float) {
            messageCodec = FLOAT_MESSAGE_CODEC;
        } else if (obj instanceof Double) {
            messageCodec = DOUBLE_MESSAGE_CODEC;
        } else if (obj instanceof Boolean) {
            messageCodec = BOOLEAN_MESSAGE_CODEC;
        } else if (obj instanceof Short) {
            messageCodec = SHORT_MESSAGE_CODEC;
        } else if (obj instanceof Character) {
            messageCodec = CHAR_MESSAGE_CODEC;
        } else if (obj instanceof Byte) {
            messageCodec = BYTE_MESSAGE_CODEC;
        } else if (obj instanceof ReplyException) {
            messageCodec = REPLY_EXCEPTION_MESSAGE_CODEC;
        } else {
            messageCodec = this.defaultCodecMap.get(obj.getClass());
            if (messageCodec == null) {
                throw new IllegalArgumentException("No message codec for type: " + obj.getClass());
            }
        }
        return new MessageImpl(this.serverID, str, null, multiMap, obj, messageCodec, z);
    }

    private void checkSystemCodec(MessageCodec messageCodec) {
        if (messageCodec.systemCodecID() != -1) {
            throw new IllegalArgumentException("Can't register a system codec");
        }
    }

    private void closeClusterManager(Handler<AsyncResult<Void>> handler) {
        if (this.clusterMgr != null) {
            this.clusterMgr.leave(asyncResult -> {
                if (asyncResult.failed()) {
                    log.error("Failed to leave cluster", asyncResult.cause());
                }
                if (handler != null) {
                    this.vertx.runOnContext(r4 -> {
                        handler.handle(Future.succeededFuture());
                    });
                }
            });
        } else if (handler != null) {
            this.vertx.runOnContext(r4 -> {
                handler.handle(Future.succeededFuture());
            });
        }
    }

    private void setServerHandler(EventBusNetServer eventBusNetServer) {
        eventBusNetServer.setHandler(netSocket -> {
            final RecordParser newFixed = RecordParser.newFixed(4, null);
            newFixed.setOutput(new Handler<Buffer>() { // from class: io.vertx.core.eventbus.impl.EventBusImpl.1
                int size = -1;

                @Override // io.vertx.core.Handler
                public void handle(Buffer buffer) {
                    if (this.size == -1) {
                        this.size = buffer.getInt(0);
                        newFixed.fixedSizeMode(this.size);
                        return;
                    }
                    MessageImpl messageImpl = new MessageImpl();
                    messageImpl.readFromWire(netSocket, buffer, EventBusImpl.this.userCodecMap, EventBusImpl.this.systemCodecs);
                    EventBusImpl.this.metrics.messageRead(messageImpl.address(), buffer.length());
                    newFixed.fixedSizeMode(4);
                    this.size = -1;
                    if (messageImpl.codec() != EventBusImpl.PING_MESSAGE_CODEC) {
                        EventBusImpl.this.receiveMessage(messageImpl, -1L, null, null, false);
                    } else if (EventBusImpl.this.sendPong) {
                        netSocket.write(EventBusImpl.PONG);
                    }
                }
            });
            netSocket.handler2((Handler<Buffer>) newFixed);
        });
    }

    private void addFailoverCompleteHandler() {
        this.haManager.setRemoveSubsHandler((str, jsonObject, z) -> {
            JsonObject jsonObject = jsonObject.getJsonObject("server_id");
            if (jsonObject != null) {
                cleanSubsForServerID(new ServerID(jsonObject.getInteger("port").intValue(), jsonObject.getString("host")));
            }
        });
    }

    private <T> void sendToSubs(ChoosableIterable<ServerID> choosableIterable, MessageImpl messageImpl, long j, Handler<AsyncResult<Message<T>>> handler, Handler<Message<T>> handler2) {
        if (messageImpl.send()) {
            ServerID choose = choosableIterable.choose();
            if (choose.equals(this.serverID)) {
                this.metrics.messageSent(messageImpl.address(), false, true, false);
                receiveMessage(messageImpl, j, handler, handler2, true);
                return;
            } else {
                this.metrics.messageSent(messageImpl.address(), false, false, true);
                sendRemote(choose, messageImpl);
                return;
            }
        }
        boolean z = false;
        boolean z2 = false;
        for (ServerID serverID : choosableIterable) {
            if (serverID.equals(this.serverID)) {
                z = true;
            } else {
                z2 = true;
                sendRemote(serverID, messageImpl);
            }
        }
        this.metrics.messageSent(messageImpl.address(), true, z, z2);
        if (z) {
            receiveMessage(messageImpl, j, null, handler2, true);
        }
    }

    private MessageCodec[] systemCodecs() {
        return codecs(NULL_MESSAGE_CODEC, PING_MESSAGE_CODEC, STRING_MESSAGE_CODEC, BUFFER_MESSAGE_CODEC, JSON_OBJECT_MESSAGE_CODEC, JSON_ARRAY_MESSAGE_CODEC, BYTE_ARRAY_MESSAGE_CODEC, INT_MESSAGE_CODEC, LONG_MESSAGE_CODEC, FLOAT_MESSAGE_CODEC, DOUBLE_MESSAGE_CODEC, BOOLEAN_MESSAGE_CODEC, SHORT_MESSAGE_CODEC, CHAR_MESSAGE_CODEC, BYTE_MESSAGE_CODEC, REPLY_EXCEPTION_MESSAGE_CODEC);
    }

    private MessageCodec[] codecs(MessageCodec... messageCodecArr) {
        MessageCodec[] messageCodecArr2 = new MessageCodec[messageCodecArr.length];
        for (MessageCodec messageCodec : messageCodecArr) {
            messageCodecArr2[messageCodec.systemCodecID()] = messageCodec;
        }
        return messageCodecArr2;
    }

    private String generateReplyAddress() {
        return this.clusterMgr != null ? UUID.randomUUID().toString() : Long.toString(this.replySequence.incrementAndGet());
    }

    private <T> void sendOrPub(ServerID serverID, MessageImpl messageImpl, DeliveryOptions deliveryOptions, Handler<AsyncResult<Message<T>>> handler) {
        checkStarted();
        Handler<Message<T>> handler2 = null;
        long j = -1;
        if (handler != null) {
            messageImpl.setReplyAddress(generateReplyAddress());
            AtomicReference atomicReference = new AtomicReference();
            j = this.vertx.setTimer(deliveryOptions.getSendTimeout(), l -> {
                log.warn("Message reply handler timed out as no reply was received - it will be removed");
                ((MessageConsumer) atomicReference.get()).unregister();
                this.metrics.replyFailure(messageImpl.address(), ReplyFailure.TIMEOUT);
                handler.handle(Future.failedFuture(new ReplyException(ReplyFailure.TIMEOUT, "Timed out waiting for reply")));
            });
            handler2 = convertHandler(handler);
            atomicReference.set(registerHandler(messageImpl.replyAddress(), (Handler) handler2, true, true, j));
        }
        if (serverID != null) {
            if (serverID.equals(this.serverID)) {
                this.metrics.messageSent(messageImpl.address(), !messageImpl.send(), true, false);
                receiveMessage(messageImpl, j, handler, handler2, true);
                return;
            } else {
                this.metrics.messageSent(messageImpl.address(), !messageImpl.send(), false, true);
                sendRemote(serverID, messageImpl);
                return;
            }
        }
        if (this.subs == null) {
            this.metrics.messageSent(messageImpl.address(), !messageImpl.send(), true, false);
            receiveMessage(messageImpl, j, handler, handler2, true);
        } else {
            long j2 = j;
            Handler<Message<T>> handler3 = handler2;
            this.subs.get(messageImpl.address(), asyncResult -> {
                if (!asyncResult.succeeded()) {
                    log.error("Failed to send message", asyncResult.cause());
                    return;
                }
                ChoosableIterable<ServerID> choosableIterable = (ChoosableIterable) asyncResult.result();
                if (choosableIterable != null && !choosableIterable.isEmpty()) {
                    sendToSubs(choosableIterable, messageImpl, j2, handler, handler3);
                } else {
                    this.metrics.messageSent(messageImpl.address(), !messageImpl.send(), true, false);
                    receiveMessage(messageImpl, j2, handler, handler3, true);
                }
            });
        }
    }

    private <T> Handler<Message<T>> convertHandler(Handler<AsyncResult<Message<T>>> handler) {
        return message -> {
            Future succeededFuture;
            if (message.body() instanceof ReplyException) {
                ReplyException replyException = (ReplyException) message.body();
                this.metrics.replyFailure(message.address(), replyException.failureType());
                succeededFuture = Future.failedFuture(replyException);
            } else {
                succeededFuture = Future.succeededFuture(message);
            }
            handler.handle(succeededFuture);
        };
    }

    private <T> MessageConsumer registerHandler(String str, Handler<Message<T>> handler, boolean z, boolean z2, long j) {
        HandlerRegistration handlerRegistration = new HandlerRegistration(str, z, z2, j);
        handlerRegistration.handler2((Handler) handler);
        return handlerRegistration;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> void registerHandler(String str, HandlerRegistration<T> handlerRegistration, boolean z, boolean z2, long j) {
        checkStarted();
        Objects.requireNonNull(str, "address");
        Objects.requireNonNull(((HandlerRegistration) handlerRegistration).handler, "handler");
        ContextImpl context = this.vertx.getContext();
        boolean z3 = context != null;
        if (!z3) {
            context = this.vertx.createEventLoopContext(null, new JsonObject(), Thread.currentThread().getContextClassLoader());
        }
        HandlerHolder handlerHolder = new HandlerHolder(handlerRegistration, z, z2, context, j);
        Handlers handlers = this.handlerMap.get(str);
        if (handlers == null) {
            handlers = new Handlers();
            Handlers putIfAbsent = this.handlerMap.putIfAbsent(str, handlers);
            if (putIfAbsent != null) {
                handlers = putIfAbsent;
            }
            if (this.subs == null || z || z2) {
                handlerRegistration.setResult(Future.succeededFuture());
            } else {
                AsyncMultiMap<String, ServerID> asyncMultiMap = this.subs;
                ServerID serverID = this.serverID;
                handlerRegistration.getClass();
                asyncMultiMap.add(str, serverID, asyncResult -> {
                    handlerRegistration.setResult(asyncResult);
                });
            }
        } else {
            handlerRegistration.setResult(Future.succeededFuture());
        }
        handlers.list.add(handlerHolder);
        if (z3) {
            context.addCloseHook(new HandlerEntry(str, handlerRegistration));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> void unregisterHandler(String str, Handler<Message<T>> handler, Handler<AsyncResult<Void>> handler2) {
        checkStarted();
        Handlers handlers = this.handlerMap.get(str);
        if (handlers != null) {
            synchronized (handlers) {
                int size = handlers.list.size();
                int i = 0;
                while (true) {
                    if (i >= size) {
                        break;
                    }
                    HandlerHolder handlerHolder = handlers.list.get(i);
                    if (handlerHolder.handler == handler) {
                        if (handlerHolder.timeoutID != -1) {
                            this.vertx.cancelTimer(handlerHolder.timeoutID);
                        }
                        handlers.list.remove(i);
                        handlerHolder.setRemoved();
                        if (handlers.list.isEmpty()) {
                            this.handlerMap.remove(str);
                            if (this.subs == null || handlerHolder.localOnly) {
                                callCompletionHandlerAsync(handler2);
                            } else {
                                removeSub(str, this.serverID, handler2);
                            }
                        } else {
                            callCompletionHandlerAsync(handler2);
                        }
                        handlerHolder.context.removeCloseHook(new HandlerEntry(str, handler));
                    } else {
                        i++;
                    }
                }
            }
        }
    }

    private <T> void unregisterHandler(String str, Handler<Message<T>> handler) {
        unregisterHandler(str, handler, null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void callCompletionHandlerAsync(Handler<AsyncResult<Void>> handler) {
        if (handler != null) {
            this.vertx.runOnContext(r4 -> {
                handler.handle(Future.succeededFuture());
            });
        }
    }

    private void cleanSubsForServerID(ServerID serverID) {
        if (this.subs != null) {
            this.subs.removeAllForValue(serverID, asyncResult -> {
            });
        }
    }

    private void sendRemote(ServerID serverID, MessageImpl messageImpl) {
        ConnectionHolder connectionHolder = this.connections.get(serverID);
        if (connectionHolder == null) {
            connectionHolder = new ConnectionHolder(serverID);
            ConnectionHolder putIfAbsent = this.connections.putIfAbsent(serverID, connectionHolder);
            if (putIfAbsent != null) {
                connectionHolder = putIfAbsent;
            } else {
                connectionHolder.connect();
            }
        }
        connectionHolder.writeMessage(messageImpl);
    }

    private void removeSub(String str, ServerID serverID, Handler<AsyncResult<Void>> handler) {
        this.subs.remove(str, serverID, asyncResult -> {
            if (!asyncResult.succeeded()) {
                log.error("Failed to remove sub", asyncResult.cause());
                return;
            }
            if (((Boolean) asyncResult.result()).booleanValue()) {
                if (handler != null) {
                    handler.handle(Future.succeededFuture());
                }
            } else if (handler != null) {
                handler.handle(Future.failedFuture("sub not found"));
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> void receiveMessage(MessageImpl messageImpl, long j, Handler<AsyncResult<Message<T>>> handler, Handler<Message<T>> handler2, boolean z) {
        messageImpl.setBus(this);
        Handlers handlers = this.handlerMap.get(messageImpl.address());
        if (handlers == null) {
            this.metrics.messageReceived(messageImpl.address(), !messageImpl.send(), z, 0);
            if (handler != null) {
                sendNoHandlersFailure(messageImpl.address(), handler);
                if (j != -1) {
                    this.vertx.cancelTimer(j);
                }
                if (handler2 != null) {
                    unregisterHandler(messageImpl.replyAddress(), handler2);
                    return;
                }
                return;
            }
            return;
        }
        if (!messageImpl.send()) {
            this.metrics.messageReceived(messageImpl.address(), !messageImpl.send(), z, handlers.list.size());
            Iterator<HandlerHolder> it = handlers.list.iterator();
            while (it.hasNext()) {
                doReceive(messageImpl, it.next(), z);
            }
            return;
        }
        HandlerHolder choose = handlers.choose();
        if (choose != null) {
            this.metrics.messageReceived(messageImpl.address(), !messageImpl.send(), z, 1);
            doReceive(messageImpl, choose, z);
        }
    }

    private <T> void sendNoHandlersFailure(String str, Handler<AsyncResult<Message<T>>> handler) {
        this.vertx.runOnContext(r8 -> {
            this.metrics.replyFailure(str, ReplyFailure.NO_HANDLERS);
            handler.handle(Future.failedFuture(new ReplyException(ReplyFailure.NO_HANDLERS)));
        });
    }

    private <T> void doReceive(MessageImpl messageImpl, HandlerHolder<T> handlerHolder, boolean z) {
        MessageImpl copyBeforeReceive = messageImpl.copyBeforeReceive();
        handlerHolder.context.runOnContext(r8 -> {
            try {
                if (!handlerHolder.isRemoved()) {
                    handlerHolder.handler.handle(copyBeforeReceive);
                }
            } finally {
                if (handlerHolder.replyHandler) {
                    unregisterHandler(messageImpl.address(), handlerHolder.handler);
                }
            }
        });
    }

    private void checkStarted() {
        if (this.serverID == null) {
            throw new IllegalStateException("Event Bus is not started");
        }
    }

    protected void finalize() throws Throwable {
        close(asyncResult -> {
        });
        super.finalize();
    }
}
