/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.edc.connector.core.event;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.eclipse.edc.spi.event.Event;
import org.eclipse.edc.spi.event.EventEnvelope;
import org.eclipse.edc.spi.event.EventRouter;
import org.eclipse.edc.spi.event.EventSubscriber;
import org.eclipse.edc.spi.monitor.Monitor;

public class EventRouterImpl
implements EventRouter {
    private final Map<Class<?>, List<EventSubscriber>> subscribers = new ConcurrentHashMap();
    private final Map<Class<?>, List<EventSubscriber>> syncSubscribers = new ConcurrentHashMap();
    private final Monitor monitor;
    private final ExecutorService executor;

    public EventRouterImpl(Monitor monitor, ExecutorService executor) {
        this.monitor = monitor;
        this.executor = executor;
    }

    public <E extends Event> void registerSync(Class<E> eventKind, EventSubscriber subscriber) {
        this.syncSubscribers.computeIfAbsent(eventKind, s -> new ArrayList()).add(subscriber);
    }

    public <E extends Event> void register(Class<E> eventKind, EventSubscriber subscriber) {
        this.subscribers.computeIfAbsent(eventKind, s -> new ArrayList()).add(subscriber);
    }

    public <E extends Event> void publish(EventEnvelope<E> event) {
        this.subscriberFor(event, this::getSyncSubscribers).forEach(subscriber -> subscriber.on(event));
        this.subscriberFor(event, this::getSubscribers).map(subscriber -> CompletableFuture.runAsync(() -> subscriber.on(event), this.executor).thenApply(v -> subscriber)).forEach(future -> future.whenComplete((subscriber, throwable) -> {
            if (throwable != null) {
                String subscriberName = subscriber.getClass().getSimpleName();
                String eventName = event.getClass().getSimpleName();
                this.monitor.severe(String.format("Subscriber %s failed to handle event %s", subscriberName, eventName), new Throwable[]{throwable});
            }
        }));
    }

    private Map<Class<?>, List<EventSubscriber>> getSubscribers() {
        return this.subscribers;
    }

    private Map<Class<?>, List<EventSubscriber>> getSyncSubscribers() {
        return this.syncSubscribers;
    }

    private <E extends Event> Stream<EventSubscriber> subscriberFor(EventEnvelope<E> envelope, Supplier<Map<Class<?>, List<EventSubscriber>>> supplier) {
        return supplier.get().entrySet().stream().filter(entry -> ((Class)entry.getKey()).isAssignableFrom(envelope.getPayload().getClass())).flatMap(entry -> ((List)entry.getValue()).stream());
    }
}

