package io.vertx.core.eventbus.impl.clustered;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.impl.OutboundDeliveryContext;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.PromiseInternal;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;

/* loaded from: input_file:io/vertx/core/eventbus/impl/clustered/Serializer.class */
public class Serializer {
    private final ContextInternal context;
    private final Map<String, SerializerQueue> queues = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vertx/core/eventbus/impl/clustered/Serializer$SerializedTask.class */
    public class SerializedTask<U> implements Handler<AsyncResult<U>> {
        final OutboundDeliveryContext<?> sendContext;
        final BiConsumer<Message<?>, Promise<U>> selectHandler;
        final Promise<U> promise;
        final Promise<U> internalPromise;
        Promise<Void> completion;

        SerializedTask(OutboundDeliveryContext<?> outboundDeliveryContext, BiConsumer<Message<?>, Promise<U>> biConsumer, Promise<U> promise) {
            this.sendContext = outboundDeliveryContext;
            this.selectHandler = biConsumer;
            this.promise = promise;
            this.internalPromise = Serializer.this.context.promise();
            this.internalPromise.future().onComplete2(this);
        }

        void process(Promise<Void> promise) {
            this.completion = promise;
            this.selectHandler.accept(this.sendContext.message, this.internalPromise);
        }

        @Override // io.vertx.core.Handler
        public void handle(AsyncResult<U> asyncResult) {
            if (asyncResult.succeeded()) {
                this.promise.tryComplete(asyncResult.result());
            } else {
                this.promise.tryFail(asyncResult.cause());
            }
            this.completion.complete();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vertx/core/eventbus/impl/clustered/Serializer$SerializerQueue.class */
    public class SerializerQueue {
        final Queue<SerializedTask<?>> tasks = new LinkedList();
        final String address;
        boolean closed;

        SerializerQueue(String str) {
            this.address = str;
        }

        void add(SerializedTask<?> serializedTask) {
            this.tasks.add(serializedTask);
            if (this.tasks.size() == 1) {
                process(serializedTask);
            }
        }

        void process(SerializedTask<?> serializedTask) {
            PromiseInternal promise = Serializer.this.context.promise();
            serializedTask.process(promise);
            promise.future().onComplete2(asyncResult -> {
                processed();
            });
        }

        void processed() {
            if (this.closed) {
                return;
            }
            this.tasks.remove();
            SerializedTask<?> peek = this.tasks.peek();
            if (peek != null) {
                process(peek);
            } else {
                Serializer.this.queues.remove(this.address);
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void close() {
            this.closed = true;
            while (!this.tasks.isEmpty()) {
                this.tasks.remove().promise.tryFail("Context is closing");
            }
        }
    }

    private Serializer(ContextInternal contextInternal) {
        this.context = contextInternal;
        if (contextInternal.isDeployment()) {
            contextInternal.addCloseHook(this::close);
        }
    }

    public static Serializer get(ContextInternal contextInternal) {
        ConcurrentMap<Object, Object> contextData = contextInternal.contextData();
        Serializer serializer = (Serializer) contextData.get(Serializer.class);
        if (serializer == null) {
            Serializer serializer2 = new Serializer(contextInternal);
            Serializer serializer3 = (Serializer) contextData.putIfAbsent(Serializer.class, serializer2);
            serializer = serializer3 == null ? serializer2 : serializer3;
        }
        return serializer;
    }

    public <T> void queue(OutboundDeliveryContext<?> outboundDeliveryContext, BiConsumer<Message<?>, Promise<T>> biConsumer, BiConsumer<OutboundDeliveryContext<?>, T> biConsumer2, BiConsumer<OutboundDeliveryContext<?>, Throwable> biConsumer3) {
        if (((ContextInternal) Vertx.currentContext()) != this.context) {
            this.context.runOnContext(r11 -> {
                queue(outboundDeliveryContext, biConsumer, biConsumer2, biConsumer3);
            });
            return;
        }
        String address = outboundDeliveryContext.message.address();
        PromiseInternal<T> promise = outboundDeliveryContext.ctx.promise();
        promise.future().onComplete2(asyncResult -> {
            if (asyncResult.succeeded()) {
                biConsumer2.accept(outboundDeliveryContext, asyncResult.result());
            } else {
                biConsumer3.accept(outboundDeliveryContext, asyncResult.cause());
            }
        });
        this.queues.computeIfAbsent(address, str -> {
            return new SerializerQueue(str);
        }).add(new SerializedTask<>(outboundDeliveryContext, biConsumer, promise));
    }

    private void close(Promise<Void> promise) {
        this.queues.forEach((str, serializerQueue) -> {
            serializerQueue.close();
        });
        promise.complete();
    }
}
