package com.nannoq.tools.cluster.services;

import io.vertx.codegen.annotations.Fluent;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.servicediscovery.Record;
import io.vertx.servicediscovery.ServiceDiscovery;
import io.vertx.servicediscovery.ServiceDiscoveryOptions;
import io.vertx.servicediscovery.types.EventBusService;
import io.vertx.servicediscovery.types.HttpEndpoint;
import io.vertx.serviceproxy.ServiceBinder;
import io.vertx.serviceproxy.ServiceException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/nannoq/tools/cluster/services/ServiceManager.class */
public class ServiceManager {
    private static final String NANNOQ_SERVICE_ANNOUNCE_ADDRESS = "com.nannoq.services.manager.announce";
    private static final String NANNOQ_SERVICE_SERVICE_NAME = "nannoq-service-manager-service-discovery";
    private static final int NOT_FOUND = 404;
    private static final int INTERNAL_ERROR = 500;
    private ServiceDiscovery serviceDiscovery;
    private ConcurrentHashMap<String, MessageConsumer<JsonObject>> registeredServices;
    private ConcurrentHashMap<String, Record> registeredRecords;
    private ConcurrentHashMap<String, ConcurrentHashSet<Object>> fetchedServices;
    private Vertx vertx;
    private MessageConsumer<JsonObject> serviceAnnounceConsumer;
    private static final Logger logger = LoggerFactory.getLogger(ServiceManager.class.getSimpleName());
    private static ServiceManager instance = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/nannoq/tools/cluster/services/ServiceManager$KillVerticle.class */
    public class KillVerticle extends AbstractVerticle {
        private KillVerticle() {
        }

        public void stop(Future<Void> future) throws Exception {
            ServiceManager.logger.info("Destroying ServiceManager");
            if (ServiceManager.this.serviceDiscovery == null) {
                ServiceManager.logger.info("Discovery is null...");
                ServiceManager unused = ServiceManager.instance = null;
                future.tryComplete();
            } else {
                ServiceManager.logger.info("Unpublishing all records...");
                ArrayList arrayList = new ArrayList();
                ServiceManager.this.registeredRecords.forEach((str, record) -> {
                    Future future2 = Future.future();
                    ServiceManager.this.serviceDiscovery.unpublish(record.getRegistration(), asyncResult -> {
                        if (asyncResult.failed()) {
                            ServiceManager.logger.info("Failed Unpublish: " + record.getName(), asyncResult.cause());
                            future2.fail(asyncResult.cause());
                        } else {
                            ServiceManager.logger.info("Unpublished: " + record.getName());
                            future2.complete();
                        }
                    });
                    arrayList.add(future2);
                });
                CompositeFuture.all(arrayList).setHandler(asyncResult -> {
                    try {
                        ServiceManager.this.registeredRecords.clear();
                        ServiceManager.logger.info("UnPublish complete, Unregistering all services...");
                        ServiceManager.this.registeredServices.forEach((str2, messageConsumer) -> {
                            new ServiceBinder(this.vertx).setAddress(messageConsumer.address()).unregister(messageConsumer);
                            ServiceManager.logger.info("Unregistering " + messageConsumer.address());
                        });
                        ServiceManager.this.registeredServices.clear();
                        ServiceManager.logger.info("Releasing all consumed service objects...");
                        ServiceManager.this.fetchedServices.values().forEach(concurrentHashSet -> {
                            ServiceDiscovery.releaseServiceObject(ServiceManager.this.serviceDiscovery, concurrentHashSet);
                        });
                        ServiceManager.this.fetchedServices.clear();
                        ServiceManager.this.closeDiscovery(asyncResult -> {
                            ServiceManager.this.serviceAnnounceConsumer = null;
                            ServiceManager.logger.info("Discovery Closed!");
                            ServiceManager unused2 = ServiceManager.instance = null;
                            future.tryComplete();
                            ServiceManager.logger.info("ServiceManager destroyed...");
                        });
                        ServiceManager unused2 = ServiceManager.instance = null;
                        future.tryComplete();
                        ServiceManager.logger.info("ServiceManager destroyed...");
                    } catch (Throwable th) {
                        ServiceManager unused3 = ServiceManager.instance = null;
                        future.tryComplete();
                        ServiceManager.logger.info("ServiceManager destroyed...");
                        throw th;
                    }
                });
            }
        }
    }

    private ServiceManager() {
        this(Vertx.currentContext().owner());
    }

    private ServiceManager(Vertx vertx) {
        this.registeredServices = new ConcurrentHashMap<>();
        this.registeredRecords = new ConcurrentHashMap<>();
        this.fetchedServices = new ConcurrentHashMap<>();
        this.vertx = vertx;
        openDiscovery();
        startServiceManagerKillVerticle();
    }

    private void startServiceManagerKillVerticle() {
        this.vertx.deployVerticle(new KillVerticle());
    }

    public static ServiceManager getInstance() {
        if (instance == null) {
            instance = new ServiceManager();
        }
        return instance;
    }

    public static ServiceManager getInstance(Vertx vertx) {
        if (instance == null) {
            instance = new ServiceManager(vertx);
        }
        return instance;
    }

    private void openDiscovery() {
        logger.debug("Opening Discovery...");
        if (this.serviceDiscovery == null) {
            this.serviceDiscovery = ServiceDiscovery.create(this.vertx, new ServiceDiscoveryOptions().setAnnounceAddress(NANNOQ_SERVICE_ANNOUNCE_ADDRESS).setUsageAddress(NANNOQ_SERVICE_ANNOUNCE_ADDRESS).setName(NANNOQ_SERVICE_SERVICE_NAME));
            logger.debug("Setting Discovery message consumer...");
            this.serviceAnnounceConsumer = this.vertx.eventBus().consumer(NANNOQ_SERVICE_ANNOUNCE_ADDRESS, this::handleServiceEvent);
        }
        logger.debug("Discovery ready...");
    }

    private void handleServiceEvent(Message<JsonObject> message) {
        MultiMap headers = message.headers();
        JsonObject jsonObject = (JsonObject) message.body();
        logger.trace("Service Event:\n" + Json.encodePrettily(message) + "\nHeaders:\n" + Json.encodePrettily(headers) + "\nBody:\n" + Json.encodePrettily(jsonObject));
        String string = jsonObject.getString("name");
        String string2 = jsonObject.getString("status");
        if (string2 == null || !string2.equals("DOWN")) {
            return;
        }
        logger.debug("Removing downed service: " + string);
        this.fetchedServices.remove(string);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeDiscovery(Handler<AsyncResult<Void>> handler) {
        if (this.serviceDiscovery != null) {
            this.serviceDiscovery.close();
        }
        this.serviceDiscovery = null;
        logger.debug("Unregistering Service Event Listener...");
        if (this.serviceAnnounceConsumer != null) {
            this.serviceAnnounceConsumer.unregister(handler);
        }
    }

    @Fluent
    public ServiceManager publishApi(@Nonnull Record record) {
        return publishService(record, record2 -> {
            this.registeredRecords.put(record2.getRegistration(), record2);
        }, this::handlePublishResult);
    }

    @Fluent
    public ServiceManager publishApi(@Nonnull Record record, @Nonnull Handler<AsyncResult<Record>> handler) {
        return publishService(record, record2 -> {
            this.registeredRecords.put(record2.getRegistration(), record2);
        }, handler);
    }

    @Fluent
    public ServiceManager unPublishApi(@Nonnull Record record, @Nonnull Handler<AsyncResult<Void>> handler) {
        this.registeredRecords.remove(record.getRegistration());
        this.serviceDiscovery.unpublish(record.getRegistration(), handler);
        ConcurrentHashSet<Object> concurrentHashSet = this.fetchedServices.get(record.getName());
        if (concurrentHashSet != null && concurrentHashSet.size() > 0) {
            Iterator it = concurrentHashSet.iterator();
            it.next();
            it.remove();
        }
        return this;
    }

    @Fluent
    public <T> ServiceManager publishService(@Nonnull Class<T> cls, @Nonnull T t) {
        String simpleName = cls.getSimpleName();
        return publishService(createRecord(simpleName, cls), record -> {
            this.registeredServices.put(record.getRegistration(), new ServiceBinder(this.vertx).setAddress(simpleName).register(cls, t));
        }, this::handlePublishResult);
    }

    @Fluent
    public <T> ServiceManager publishService(@Nonnull Class<T> cls, @Nonnull String str, @Nonnull T t) {
        return publishService(createRecord(str, cls), record -> {
            this.registeredServices.put(record.getRegistration(), new ServiceBinder(this.vertx).setAddress(str).register(cls, t));
        }, this::handlePublishResult);
    }

    @Fluent
    public <T> ServiceManager publishService(@Nonnull Class<T> cls, @Nonnull T t, @Nonnull Handler<AsyncResult<Record>> handler) {
        return publishService(createRecord(cls), record -> {
            this.registeredServices.put(record.getRegistration(), new ServiceBinder(this.vertx).setAddress(cls.getSimpleName()).register(cls, t));
        }, handler);
    }

    @Fluent
    public <T> ServiceManager publishService(@Nonnull Class<T> cls, @Nonnull String str, @Nonnull T t, @Nonnull Handler<AsyncResult<Record>> handler) {
        return publishService(createRecord(str, cls), record -> {
            this.registeredServices.put(record.getRegistration(), new ServiceBinder(this.vertx).setAddress(str).register(cls, t));
        }, handler);
    }

    @Fluent
    public <T> ServiceManager unPublishService(@Nonnull Class<T> cls, @Nonnull Record record) {
        return unPublishService(cls.getSimpleName(), record);
    }

    @Fluent
    public ServiceManager unPublishService(@Nonnull String str, @Nonnull Record record) {
        return unPublishService(str, record, asyncResult -> {
            logger.info("Unpublish res: " + asyncResult.succeeded());
        });
    }

    @Fluent
    public <T> ServiceManager unPublishService(@Nonnull Class<T> cls, @Nonnull Record record, @Nonnull Handler<AsyncResult<Void>> handler) {
        return unPublishService(cls.getSimpleName(), record, handler);
    }

    @Fluent
    public ServiceManager unPublishService(@Nonnull String str, @Nonnull Record record, @Nonnull Handler<AsyncResult<Void>> handler) {
        new ServiceBinder(this.vertx).setAddress(str).unregister(this.registeredServices.get(record.getRegistration()));
        this.serviceDiscovery.unpublish(record.getRegistration(), handler);
        this.registeredServices.remove(record.getRegistration());
        ConcurrentHashSet<Object> concurrentHashSet = this.fetchedServices.get(record.getName());
        if (concurrentHashSet != null && concurrentHashSet.size() > 0) {
            Iterator it = concurrentHashSet.iterator();
            it.next();
            it.remove();
        }
        return this;
    }

    @Fluent
    public ServiceManager consumeApi(@Nonnull String str, @Nonnull Handler<AsyncResult<HttpClient>> handler) {
        return getApi(str, handler);
    }

    @Fluent
    public <T> ServiceManager consumeService(@Nonnull Class<T> cls, @Nonnull Handler<AsyncResult<T>> handler) {
        return consumeService(cls, cls.getSimpleName(), handler);
    }

    @Fluent
    public <T> ServiceManager consumeService(@Nonnull Class<T> cls, @Nonnull String str, @Nonnull Handler<AsyncResult<T>> handler) {
        return getService(cls, str, handler);
    }

    private ServiceManager getApi(String str, Handler<AsyncResult<HttpClient>> handler) {
        logger.debug("Getting API: " + str);
        ConcurrentHashSet<Object> concurrentHashSet = this.fetchedServices.get(str);
        if (concurrentHashSet == null || concurrentHashSet.size() <= 0) {
            HttpEndpoint.getClient(this.serviceDiscovery, new JsonObject().put("name", str), asyncResult -> {
                if (asyncResult.failed()) {
                    logger.error("Unable to fetch API...");
                    handler.handle(ServiceException.fail(NOT_FOUND, "API not found..."));
                    return;
                }
                HttpClient httpClient = (HttpClient) asyncResult.result();
                ConcurrentHashSet<Object> concurrentHashSet2 = this.fetchedServices.get(str);
                if (concurrentHashSet2 == null) {
                    this.fetchedServices.put(str, new ConcurrentHashSet<>());
                    concurrentHashSet2 = this.fetchedServices.get(str);
                }
                if (!concurrentHashSet2.contains(httpClient)) {
                    concurrentHashSet2.add(httpClient);
                }
                this.fetchedServices.put(str, concurrentHashSet2);
                handler.handle(Future.succeededFuture(httpClient));
            });
        } else {
            logger.debug("Returning fetched Api...");
            ArrayList arrayList = new ArrayList((Collection) concurrentHashSet);
            Collections.shuffle(arrayList);
            handler.handle(Future.succeededFuture((HttpClient) arrayList.get(0)));
        }
        return this;
    }

    private <T> ServiceManager getService(Class<T> cls, Handler<AsyncResult<T>> handler) {
        return getService(cls, cls.getSimpleName(), handler);
    }

    private <T> ServiceManager getService(Class<T> cls, String str, Handler<AsyncResult<T>> handler) {
        logger.debug("Getting service: " + str);
        ConcurrentHashSet<Object> concurrentHashSet = this.fetchedServices.get(str);
        if (concurrentHashSet == null || concurrentHashSet.size() <= 0) {
            EventBusService.getProxy(this.serviceDiscovery, cls, asyncResult -> {
                if (asyncResult.failed()) {
                    logger.error("ERROR: Unable to get service for " + str);
                    handler.handle(ServiceException.fail(NOT_FOUND, "Unable to get service for " + str + " : " + asyncResult.cause()));
                    return;
                }
                Object result = asyncResult.result();
                ConcurrentHashSet<Object> concurrentHashSet2 = this.fetchedServices.get(str);
                if (concurrentHashSet2 == null) {
                    this.fetchedServices.put(str, new ConcurrentHashSet<>());
                    concurrentHashSet2 = this.fetchedServices.get(str);
                }
                if (!concurrentHashSet2.contains(result)) {
                    concurrentHashSet2.add(result);
                }
                this.fetchedServices.put(str, concurrentHashSet2);
                logger.debug("Successful fetch of: " + result.getClass().getSimpleName());
                handler.handle(Future.succeededFuture(result));
            });
        } else {
            logger.debug("Returning fetched Api...");
            ArrayList arrayList = new ArrayList((Collection) concurrentHashSet);
            Collections.shuffle(arrayList);
            handler.handle(Future.succeededFuture(arrayList.get(0)));
        }
        return getInstance();
    }

    private <T> Record createRecord(Class<T> cls) {
        return createRecord(cls.getSimpleName(), cls);
    }

    private <T> Record createRecord(String str, Class<T> cls) {
        return EventBusService.createRecord(str, str, cls);
    }

    private ServiceManager publishService(@Nonnull Record record, @Nonnull Consumer<Record> consumer, @Nonnull Handler<AsyncResult<Record>> handler) {
        this.serviceDiscovery.publish(record, asyncResult -> {
            if (asyncResult.failed()) {
                logger.error("ERROR: Failed publish of " + record.getName() + " to " + record.getLocation().encodePrettily() + " with " + record.getType() + " : " + record.getStatus());
                handler.handle(ServiceException.fail(INTERNAL_ERROR, asyncResult.cause().getMessage()));
                return;
            }
            Record record2 = (Record) asyncResult.result();
            this.registeredRecords.put(record2.getRegistration(), record2);
            consumer.accept(record2);
            logger.debug("Successful publish of: " + record2.getName() + " to " + record2.getLocation().encodePrettily() + " with " + record2.getType() + " : " + record2.getStatus());
            handler.handle(Future.succeededFuture(record2));
        });
        return getInstance();
    }

    private void handlePublishResult(AsyncResult<Record> asyncResult) {
        if (!asyncResult.failed()) {
            Record record = (Record) asyncResult.result();
            logger.debug("Published Service Record: " + record.getName() + " : " + record.getLocation() + " : " + record.getType() + " : " + record.getStatus());
        } else if (!(asyncResult.cause() instanceof ServiceException)) {
            logger.error("Unable to publish service: " + asyncResult.cause());
        } else {
            ServiceException cause = asyncResult.cause();
            logger.error("Unable to publish service: " + cause.failureCode() + " : " + cause.getMessage());
        }
    }

    public static void handleResultFailed(Throwable th) {
        if (!(th instanceof ServiceException)) {
            logger.error(th.getMessage(), th);
        } else {
            ServiceException serviceException = (ServiceException) th;
            logger.error(serviceException.failureCode() + " : " + serviceException.getMessage(), th);
        }
    }
}
