package reactor.core;

import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.event.Event;
import reactor.event.dispatch.Dispatcher;
import reactor.event.dispatch.SynchronousDispatcher;
import reactor.event.registry.CachingRegistry;
import reactor.event.registry.Registration;
import reactor.event.registry.Registry;
import reactor.event.routing.ArgumentConvertingConsumerInvoker;
import reactor.event.routing.ConsumerFilteringEventRouter;
import reactor.event.routing.EventRouter;
import reactor.event.routing.Linkable;
import reactor.event.selector.ClassSelector;
import reactor.event.selector.Selector;
import reactor.event.selector.Selectors;
import reactor.filter.PassThroughFilter;
import reactor.function.Consumer;
import reactor.function.Function;
import reactor.function.Supplier;
import reactor.function.support.SingleUseConsumer;
import reactor.tuple.Tuple2;
import reactor.util.Assert;
import reactor.util.UUIDUtils;

/* loaded from: input_file:reactor/core/Reactor.class */
public class Reactor implements Observable, Linkable<Observable> {
    private static final EventRouter DEFAULT_EVENT_ROUTER = new ConsumerFilteringEventRouter(new PassThroughFilter(), new ArgumentConvertingConsumerInvoker(null));
    private final Dispatcher dispatcher;
    private final Registry<Consumer<? extends Event<?>>> consumerRegistry;
    private final EventRouter eventRouter;
    private final Object defaultKey;
    private final Selector defaultSelector;
    private final UUID id;
    private final Consumer<Throwable> errorHandler;
    private final Set<Observable> linkedReactors;

    /* loaded from: input_file:reactor/core/Reactor$ReplyToConsumer.class */
    public class ReplyToConsumer<E extends Event<?>, V> implements Consumer<E> {
        private final Function<E, V> fn;

        private ReplyToConsumer(Function<E, V> function) {
            this.fn = function;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v13, types: [reactor.event.Event] */
        /* JADX WARN: Type inference failed for: r0v16, types: [reactor.event.Event] */
        /* JADX WARN: Type inference failed for: r0v19, types: [reactor.event.Event] */
        /* JADX WARN: Type inference failed for: r0v22, types: [reactor.core.Observable] */
        /* JADX WARN: Type inference failed for: r0v4, types: [reactor.core.Observable] */
        @Override // reactor.function.Consumer
        public void accept(E e) {
            E wrap;
            ?? replyToObservable;
            Reactor reactor2 = Reactor.this;
            if (ReplyToEvent.class.isAssignableFrom(e.getClass()) && 0 != (replyToObservable = ((ReplyToEvent) e).getReplyToObservable())) {
                reactor2 = replyToObservable;
            }
            try {
                V apply = this.fn.apply(e);
                if (null == apply) {
                    wrap = new Event(null);
                } else {
                    wrap = Event.class.isAssignableFrom(apply.getClass()) ? (Event) apply : Event.wrap(apply);
                }
                reactor2.notify(e.getReplyTo(), (Object) wrap);
            } catch (Throwable th) {
                reactor2.notify((Object) th.getClass(), (Class<?>) Event.wrap(th));
            }
        }

        public Function<E, V> getDelegate() {
            return this.fn;
        }
    }

    /* loaded from: input_file:reactor/core/Reactor$ReplyToEvent.class */
    public static class ReplyToEvent<T> extends Event<T> {
        private static final long serialVersionUID = 1937884784799135647L;
        private final Observable replyToObservable;

        @Override // reactor.event.Event
        public <X> Event<X> copy(X x) {
            return new ReplyToEvent(getHeaders(), x, getReplyTo(), this.replyToObservable, getErrorConsumer());
        }

        private ReplyToEvent(Event.Headers headers, T t, Object obj, Observable observable, Consumer<Throwable> consumer) {
            super(headers, t, consumer);
            setReplyTo(obj);
            this.replyToObservable = observable;
        }

        private ReplyToEvent(Event<T> event, Observable observable) {
            this(event.getHeaders(), event.getData(), event.getReplyTo(), observable, event.getErrorConsumer());
        }

        public Observable getReplyToObservable() {
            return this.replyToObservable;
        }
    }

    public Reactor(Dispatcher dispatcher) {
        this(dispatcher, null);
    }

    public Reactor(Dispatcher dispatcher, EventRouter eventRouter) {
        this.defaultKey = new Object();
        this.defaultSelector = Selectors.object(this.defaultKey);
        this.id = UUIDUtils.create();
        this.errorHandler = new Consumer<Throwable>() { // from class: reactor.core.Reactor.1
            @Override // reactor.function.Consumer
            public void accept(Throwable th) {
                Reactor.this.dispatcher.dispatch(th.getClass(), Event.wrap(th).setKey(th.getClass()), Reactor.this.consumerRegistry, null, Reactor.this.eventRouter, null);
            }
        };
        this.linkedReactors = Collections.synchronizedSet(new HashSet());
        this.dispatcher = dispatcher == null ? new SynchronousDispatcher() : dispatcher;
        this.eventRouter = eventRouter == null ? DEFAULT_EVENT_ROUTER : eventRouter;
        this.consumerRegistry = new CachingRegistry();
        on(new Consumer<Event>() { // from class: reactor.core.Reactor.2
            @Override // reactor.function.Consumer
            public void accept(Event event) {
                if (Tuple2.class.isInstance(event.getData())) {
                    T1 t1 = ((Tuple2) event.getData()).getT1();
                    Object t2 = ((Tuple2) event.getData()).getT2();
                    if (Consumer.class.isInstance(t1)) {
                        try {
                            ((Consumer) t1).accept(t2);
                        } catch (Throwable th) {
                            Reactor.this.notify((Object) th.getClass(), (Class<?>) Event.wrap(th));
                        }
                    }
                }
            }
        });
        if (LoggerFactory.getLogger(Reactor.class).isDebugEnabled()) {
            on(new ClassSelector(Throwable.class), new Consumer<Event<Throwable>>() { // from class: reactor.core.Reactor.3
                Logger log;

                @Override // reactor.function.Consumer
                public void accept(Event<Throwable> event) {
                    if (null == this.log) {
                        this.log = LoggerFactory.getLogger(Reactor.class);
                    }
                    this.log.error(event.getData().getMessage(), event.getData());
                }
            });
        }
    }

    public UUID getId() {
        return this.id;
    }

    public Registry<Consumer<? extends Event<?>>> getConsumerRegistry() {
        return this.consumerRegistry;
    }

    public Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    public EventRouter getEventRouter() {
        return this.eventRouter;
    }

    @Override // reactor.core.Observable
    public boolean respondsToKey(Object obj) {
        Assert.notNull(obj, "Key cannot be null.");
        return this.consumerRegistry.select(obj).iterator().hasNext();
    }

    @Override // reactor.core.Observable
    public <E extends Event<?>> Registration<Consumer<E>> on(Selector selector, Consumer<E> consumer) {
        Assert.notNull(selector, "Selector cannot be null.");
        Assert.notNull(consumer, "Consumer cannot be null.");
        return (Registration<Consumer<E>>) this.consumerRegistry.register(selector, consumer);
    }

    @Override // reactor.core.Observable
    public <E extends Event<?>> Registration<Consumer<E>> on(Consumer<E> consumer) {
        Assert.notNull(consumer, "Consumer cannot be null.");
        return on(this.defaultSelector, consumer);
    }

    @Override // reactor.core.Observable
    public <E extends Event<?>, V> Registration<Consumer<E>> receive(Selector selector, Function<E, V> function) {
        return on(selector, new ReplyToConsumer(function));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // reactor.core.Observable
    public <E extends Event<?>> Reactor notify(Object obj, E e, Consumer<E> consumer) {
        Assert.notNull(obj, "Key cannot be null.");
        Assert.notNull(e, "Event cannot be null.");
        this.dispatcher.dispatch(obj, e.setKey(obj), this.consumerRegistry, this.errorHandler, this.eventRouter, consumer);
        if (!this.linkedReactors.isEmpty()) {
            Iterator<Observable> it = this.linkedReactors.iterator();
            while (it.hasNext()) {
                it.next().notify(obj, e);
            }
        }
        return this;
    }

    @Override // reactor.core.Observable
    public <E extends Event<?>> Reactor notify(Object obj, E e) {
        return notify(obj, (Object) e, (Consumer<Object>) null);
    }

    @Override // reactor.core.Observable
    public <S extends Supplier<? extends Event<?>>> Reactor notify(Object obj, S s) {
        return notify(obj, s.get(), (Consumer<Object>) null);
    }

    @Override // reactor.core.Observable
    public <E extends Event<?>> Reactor notify(E e) {
        return notify(this.defaultKey, (Object) e, (Consumer<Object>) null);
    }

    @Override // reactor.core.Observable
    public <S extends Supplier<? extends Event<?>>> Reactor notify(S s) {
        return notify(this.defaultKey, s.get(), (Consumer<Object>) null);
    }

    @Override // reactor.core.Observable
    public Reactor notify(Object obj) {
        return notify(obj, (Object) new Event(null), (Consumer<Object>) null);
    }

    @Override // reactor.core.Observable
    public <E extends Event<?>> Reactor send(Object obj, E e) {
        return notify(obj, (Object) new ReplyToEvent(e, this));
    }

    @Override // reactor.core.Observable
    public <S extends Supplier<? extends Event<?>>> Reactor send(Object obj, S s) {
        return notify(obj, (Object) new ReplyToEvent((Event) s.get(), this));
    }

    @Override // reactor.core.Observable
    public <E extends Event<?>> Reactor send(Object obj, E e, Observable observable) {
        return notify(obj, (Object) new ReplyToEvent(e, observable));
    }

    @Override // reactor.core.Observable
    public <S extends Supplier<? extends Event<?>>> Reactor send(Object obj, S s, Observable observable) {
        return notify(obj, (Object) new ReplyToEvent((Event) s.get(), observable));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // reactor.core.Observable
    public <E extends Event<?>> Reactor sendAndReceive(Object obj, E e, Consumer<E> consumer) {
        Tuple2<Selector, Object> anonymous = Selectors.anonymous();
        on(anonymous.getT1(), new SingleUseConsumer(consumer)).cancelAfterUse();
        notify(obj, (Object) e.setReplyTo(anonymous.getT2()));
        return this;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // reactor.core.Observable
    public <E extends Event<?>, S extends Supplier<E>> Reactor sendAndReceive(Object obj, S s, Consumer<E> consumer) {
        return sendAndReceive(obj, s.get(), (Consumer<Object>) consumer);
    }

    @Override // reactor.core.Observable
    public <T> Consumer<Event<T>> prepare(final Object obj) {
        return new Consumer<Event<T>>() { // from class: reactor.core.Reactor.4
            final List<Registration<? extends Consumer<? extends Event<?>>>> regs;
            final int size;

            {
                this.regs = Reactor.this.consumerRegistry.select(obj);
                this.size = this.regs.size();
            }

            @Override // reactor.function.Consumer
            public void accept(Event<T> event) {
                for (int i = 0; i < this.size; i++) {
                    Reactor.this.dispatcher.dispatch(event.setKey(obj), Reactor.this.eventRouter, this.regs.get(i).getObject(), Reactor.this.errorHandler);
                }
            }
        };
    }

    @Override // reactor.event.routing.Linkable
    public Reactor link(Observable observable) {
        this.linkedReactors.add(observable);
        return this;
    }

    @Override // reactor.event.routing.Linkable
    public Reactor unlink(Observable observable) {
        this.linkedReactors.remove(observable);
        return this;
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        return this.id.equals(((Reactor) obj).id);
    }

    public int hashCode() {
        return this.id.hashCode();
    }

    @Override // reactor.core.Observable
    public /* bridge */ /* synthetic */ Observable sendAndReceive(Object obj, Supplier supplier, Consumer consumer) {
        return sendAndReceive(obj, (Object) supplier, consumer);
    }

    @Override // reactor.core.Observable
    public /* bridge */ /* synthetic */ Observable sendAndReceive(Object obj, Event event, Consumer consumer) {
        return sendAndReceive(obj, (Object) event, (Consumer<Object>) consumer);
    }

    @Override // reactor.core.Observable
    public /* bridge */ /* synthetic */ Observable send(Object obj, Supplier supplier, Observable observable) {
        return send(obj, (Object) supplier, observable);
    }

    @Override // reactor.core.Observable
    public /* bridge */ /* synthetic */ Observable send(Object obj, Event event, Observable observable) {
        return send(obj, (Object) event, observable);
    }

    @Override // reactor.core.Observable
    public /* bridge */ /* synthetic */ Observable send(Object obj, Supplier supplier) {
        return send(obj, (Object) supplier);
    }

    @Override // reactor.core.Observable
    public /* bridge */ /* synthetic */ Observable send(Object obj, Event event) {
        return send(obj, (Object) event);
    }

    @Override // reactor.core.Observable
    public /* bridge */ /* synthetic */ Observable notify(Supplier supplier) {
        return notify((Reactor) supplier);
    }

    @Override // reactor.core.Observable
    public /* bridge */ /* synthetic */ Observable notify(Event event) {
        return notify((Reactor) event);
    }

    @Override // reactor.core.Observable
    public /* bridge */ /* synthetic */ Observable notify(Object obj, Supplier supplier) {
        return notify(obj, (Object) supplier);
    }

    @Override // reactor.core.Observable
    public /* bridge */ /* synthetic */ Observable notify(Object obj, Event event) {
        return notify(obj, (Object) event);
    }

    @Override // reactor.core.Observable
    public /* bridge */ /* synthetic */ Observable notify(Object obj, Event event, Consumer consumer) {
        return notify(obj, (Object) event, (Consumer<Object>) consumer);
    }
}
