package io.advantageous.qbit.vertx.eventbus.bridge;

import io.advantageous.boon.core.Predicate;
import io.advantageous.boon.json.JsonException;
import io.advantageous.qbit.json.JsonMapper;
import io.advantageous.qbit.message.MethodCall;
import io.advantageous.qbit.message.MethodCallBuilder;
import io.advantageous.qbit.meta.transformer.StandardRequestTransformer;
import io.advantageous.qbit.queue.SendQueue;
import io.advantageous.qbit.reactive.CallbackBuilder;
import io.advantageous.qbit.util.Timer;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/advantageous/qbit/vertx/eventbus/bridge/VertxEventBusBridge.class */
public class VertxEventBusBridge {
    private final Set<String> addressesToBridge;
    private final SendQueue<MethodCall<Object>> methodCallSendQueue;
    private final JsonMapper jsonMapper;
    private final Vertx vertx;
    private final Predicate<MethodCall<Object>> methodCallPredicate;
    private final int flushIntervalMS;
    private final Timer timer;
    private final EventBus vertxEventBus;
    private final StandardRequestTransformer standardRequestTransformer;
    private final Logger logger = LoggerFactory.getLogger(VertxEventBusBridge.class);
    private long messageId = 0;

    public VertxEventBusBridge(Set<String> set, SendQueue<MethodCall<Object>> sendQueue, JsonMapper jsonMapper, Vertx vertx, EventBus eventBus, Timer timer, Predicate<MethodCall<Object>> predicate, int i, boolean z, StandardRequestTransformer standardRequestTransformer) {
        this.standardRequestTransformer = standardRequestTransformer;
        this.timer = timer;
        this.vertxEventBus = eventBus;
        this.addressesToBridge = Collections.unmodifiableSet(set);
        this.methodCallSendQueue = sendQueue;
        this.jsonMapper = jsonMapper;
        this.vertx = vertx;
        this.methodCallPredicate = predicate;
        this.flushIntervalMS = i;
        if (z) {
            start();
        }
    }

    private void flush() {
        this.methodCallSendQueue.flushSends();
    }

    private void start() {
        this.vertx.periodicStream(this.flushIntervalMS).handler(l -> {
            flush();
        });
        this.addressesToBridge.stream().forEach(str -> {
            this.logger.debug("Registering address {}", str);
            MessageConsumer consumer = this.vertxEventBus.consumer(str);
            consumer.handler(message -> {
                handleIncomingMessage(str, message);
            });
            consumer.exceptionHandler(th -> {
                this.logger.error("Error handling address " + str, th);
            });
        });
    }

    private void handleIncomingMessage(String str, Message<String> message) {
        try {
            try {
                this.logger.debug((String) message.body());
                String str2 = message.headers().get("method");
                String str3 = (String) message.body();
                ArrayList arrayList = new ArrayList();
                MethodCall transFormBridgeBody = this.standardRequestTransformer.transFormBridgeBody(str3, arrayList, str, str2);
                if (arrayList.size() > 0) {
                    this.logger.error("Error marshaling message body to method call to service errors {}", arrayList);
                    message.fail(500, arrayList.toString());
                    return;
                }
                CallbackBuilder callbackBuilder = CallbackBuilder.callbackBuilder();
                callbackBuilder.setOnError(th -> {
                    this.logger.error("Error from calling " + str, th);
                    message.fail(500, th.getMessage());
                });
                callbackBuilder.setCallback(obj -> {
                    message.reply(encodeOutput(obj));
                });
                callbackBuilder.setOnTimeout(() -> {
                    this.logger.error("Timed out call to " + str + " method " + str2);
                    message.fail(408, "Timed out call to " + str + " method " + str2);
                });
                MethodCallBuilder name = MethodCallBuilder.methodCallBuilder().setAddress(str).setBody(transFormBridgeBody.body()).setTimestamp(this.timer.time()).setName(str2);
                long j = this.messageId;
                this.messageId = j + 1;
                MethodCall build = name.setId(j).setCallback(callbackBuilder.build()).build();
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Calling method {} {}", build.name(), message.body());
                }
                if (this.methodCallPredicate.test(build)) {
                    this.methodCallSendQueue.send(build);
                }
            } catch (Exception e) {
                this.logger.error("Error marshaling message body to method call to service", e);
                message.fail(500, e.getClass().getSimpleName() + ": " + e.getMessage());
            }
        } catch (IndexOutOfBoundsException | JsonException e2) {
            this.logger.error("Error marshaling message body to method call to service", e2);
            message.fail(500, "IllegalArgumentException");
        }
    }

    private String encodeOutput(Object obj) {
        return this.jsonMapper.toJson(obj);
    }
}
