/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.stream;

import javax.annotation.Nonnull;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.bus.EventBus;
import reactor.bus.Observable;
import reactor.bus.registry.Registration;
import reactor.bus.selector.Selector;
import reactor.core.Dispatcher;
import reactor.core.dispatch.SynchronousDispatcher;
import reactor.fn.Consumer;
import reactor.rx.Stream;
import reactor.rx.action.support.SerializedSubscriber;
import reactor.rx.subscription.PushSubscription;

public final class ObservableStream<T>
extends Stream<T> {
    private final Selector selector;
    private final Observable<T> observable;
    private final Dispatcher dispatcher;
    private final boolean ordering;

    public ObservableStream(@Nonnull Observable<T> observable, @Nonnull Selector selector) {
        this.selector = selector;
        this.observable = observable;
        this.dispatcher = EventBus.class.isAssignableFrom(observable.getClass()) ? ((EventBus)observable).getDispatcher() : SynchronousDispatcher.INSTANCE;
        this.ordering = this.dispatcher.supportsOrdering();
    }

    public void subscribe(Subscriber<? super T> s) {
        if (!this.ordering) {
            s = SerializedSubscriber.create(s);
        }
        s.onSubscribe((Subscription)new PushSubscription<T>(this, s){
            final Registration<Consumer<? extends T>> registration;
            {
                this.registration = ObservableStream.this.observable.on(ObservableStream.this.selector, new Consumer<T>(){

                    @Override
                    public void accept(T event) {
                        subscriber.onNext(event);
                    }
                });
            }

            @Override
            public void cancel() {
                super.cancel();
                this.registration.cancel();
            }
        });
    }

    @Override
    public final Dispatcher getDispatcher() {
        return this.ordering ? this.dispatcher : SynchronousDispatcher.INSTANCE;
    }

    @Override
    public String toString() {
        return "ObservableStream{selector=" + this.selector + ", observable=" + this.observable + '}';
    }
}

