/*
 * Decompiled with CFR 0.152.
 */
package reactor.groovy.config;

import java.util.List;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.bus.Event;
import reactor.bus.filter.Filter;
import reactor.bus.registry.Registration;
import reactor.bus.registry.Registry;
import reactor.bus.routing.ConsumerFilteringRouter;
import reactor.fn.Consumer;

public class StreamRouter
extends ConsumerFilteringRouter {
    private final Registry<Object, Processor<Event<?>, Event<?>>> processorRegistry;

    public StreamRouter(Filter filter, Registry<Object, Processor<Event<?>, Event<?>>> processorRegistry) {
        super(filter);
        this.processorRegistry = processorRegistry;
    }

    public <E extends Event<?>> void route(Object key, final E event, final List<Registration<Object, ? extends Consumer<? extends Event<?>>>> consumers, final Consumer<E> completionConsumer, final Consumer<Throwable> errorConsumer) {
        for (Registration registration : this.processorRegistry.select(key)) {
            Processor processor = (Processor)registration.getObject();
            processor.onNext(event);
            processor.subscribe(new Subscriber<Event<?>>(){

                public void onSubscribe(Subscription subscription) {
                    subscription.request(Integer.MAX_VALUE);
                }

                public void onNext(Event<?> hydratedEvent) {
                    StreamRouter.super.route(hydratedEvent.getKey(), hydratedEvent, consumers, completionConsumer, errorConsumer);
                }

                public void onComplete() {
                }

                public void onError(Throwable cause) {
                    event.consumeError(cause);
                }
            });
        }
    }
}

