/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.broadcast;

import org.reactivestreams.Subscriber;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.dispatch.SynchronousDispatcher;
import reactor.core.queue.CompletableQueue;
import reactor.core.support.Assert;
import reactor.fn.Consumer;
import reactor.rx.action.Action;
import reactor.rx.subscription.PushSubscription;
import reactor.rx.subscription.ReactiveSubscription;

public class Broadcaster<O>
extends Action<O, O> {
    protected final Dispatcher dispatcher;
    protected final Environment environment;

    public static <T> Broadcaster<T> create() {
        return new Broadcaster(null, SynchronousDispatcher.INSTANCE, Long.MAX_VALUE);
    }

    public static <T> Broadcaster<T> create(Environment env) {
        return Broadcaster.create(env, env.getDefaultDispatcher());
    }

    public static <T> Broadcaster<T> create(Dispatcher dispatcher) {
        return Broadcaster.create(null, dispatcher);
    }

    public static <T> Broadcaster<T> create(Environment env, Dispatcher dispatcher) {
        Assert.state(dispatcher.supportsOrdering(), "Dispatcher provided doesn't support event ordering.  For concurrent consume, refer to Stream#partition/groupBy() method and assign individual single dispatchers");
        return new Broadcaster(env, dispatcher, Action.evaluateCapacity(dispatcher.backlogSize()));
    }

    protected Broadcaster(Environment environment, Dispatcher dispatcher, long capacity) {
        super(capacity);
        this.dispatcher = dispatcher;
        this.environment = environment;
    }

    @Override
    public final Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    @Override
    protected void doNext(O ev) {
        this.broadcastNext(ev);
    }

    @Override
    public void onNext(O ev) {
        if (ev == null) {
            throw new NullPointerException("Spec 2.13: Signal cannot be null");
        }
        if (!this.dispatcher.inContext()) {
            this.dispatcher.dispatch(ev, this, null);
        } else {
            super.onNext(ev);
        }
    }

    @Override
    public void onError(Throwable cause) {
        if (cause == null) {
            throw new NullPointerException("Spec 2.13: Signal cannot be null");
        }
        if (!this.dispatcher.inContext()) {
            this.dispatcher.dispatch(cause, new Consumer<Throwable>(){

                @Override
                public void accept(Throwable throwable) {
                    Broadcaster.super.doError(throwable);
                }
            }, null);
        } else {
            super.onError(cause);
        }
    }

    @Override
    public void onComplete() {
        if (!this.dispatcher.inContext()) {
            this.dispatcher.dispatch(null, new Consumer<Void>(){

                @Override
                public void accept(Void aVoid) {
                    Broadcaster.super.onComplete();
                }
            }, null);
        } else {
            super.onComplete();
        }
    }

    @Override
    protected PushSubscription<O> createSubscription(Subscriber<? super O> subscriber, CompletableQueue<O> queue) {
        if (queue != null) {
            return new ReactiveSubscription<O>(this, subscriber, queue){

                @Override
                protected void onRequest(long elements) {
                    if (Broadcaster.this.upstreamSubscription != null) {
                        super.onRequest(elements);
                        Broadcaster.this.requestUpstream(Broadcaster.this.capacity, this.buffer.isComplete(), elements);
                    }
                }
            };
        }
        return super.createSubscription(subscriber, null);
    }

    @Override
    protected PushSubscription<O> createSubscription(Subscriber<? super O> subscriber, boolean reactivePull) {
        if (reactivePull) {
            return super.createSubscription(subscriber, true);
        }
        return super.createSubscription(subscriber, this.dispatcher != SynchronousDispatcher.INSTANCE && this.upstreamSubscription != null && !this.upstreamSubscription.hasPublisher());
    }

    @Override
    public Broadcaster<O> capacity(long elements) {
        super.capacity(elements);
        return this;
    }
}

