/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.action.control;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.dispatch.SynchronousDispatcher;
import reactor.core.dispatch.TailRecurseDispatcher;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.rx.Stream;
import reactor.rx.action.Action;
import reactor.rx.action.support.NonBlocking;
import reactor.rx.broadcast.Broadcaster;
import reactor.rx.subscription.PushSubscription;

public class RepeatWhenAction<T>
extends Action<T, T> {
    private final Broadcaster<Long> retryStream;
    private final Publisher<? extends T> rootPublisher;
    private Dispatcher dispatcher;

    public RepeatWhenAction(Dispatcher dispatcher, Function<? super Stream<? extends Long>, ? extends Publisher<?>> predicate, Publisher<? extends T> rootPublisher) {
        this.retryStream = Broadcaster.create(null, dispatcher);
        this.dispatcher = SynchronousDispatcher.INSTANCE == dispatcher ? Environment.tailRecurse() : dispatcher;
        this.rootPublisher = rootPublisher;
        Publisher<?> afterRetryPublisher = predicate.apply(this.retryStream);
        afterRetryPublisher.subscribe((Subscriber)new RestartSubscriber());
    }

    @Override
    protected void doNext(T ev) {
        this.broadcastNext(ev);
    }

    @Override
    protected void doComplete() {
        this.retryStream.onComplete();
        super.doComplete();
    }

    protected void doRetry() {
        this.dispatcher.dispatch(null, new Consumer<Void>(){

            @Override
            public void accept(Void aVoid) {
                long pendingRequests = Long.MAX_VALUE;
                if (RepeatWhenAction.this.rootPublisher != null) {
                    PushSubscription upstream = RepeatWhenAction.this.upstreamSubscription;
                    if (upstream == null) {
                        RepeatWhenAction.this.rootPublisher.subscribe((Subscriber)RepeatWhenAction.this);
                        upstream = RepeatWhenAction.this.upstreamSubscription;
                    } else {
                        pendingRequests = upstream.pendingRequestSignals();
                        if (TailRecurseDispatcher.class.isAssignableFrom(RepeatWhenAction.this.dispatcher.getClass())) {
                            RepeatWhenAction.this.dispatcher.shutdown();
                            RepeatWhenAction.this.dispatcher = Environment.tailRecurse();
                        }
                        RepeatWhenAction.this.cancel();
                        RepeatWhenAction.this.rootPublisher.subscribe((Subscriber)RepeatWhenAction.this);
                    }
                    if (upstream != null && pendingRequests != Long.MAX_VALUE) {
                        upstream.request(1L);
                    }
                }
            }
        }, null);
    }

    @Override
    public void onComplete() {
        this.cancel();
        this.retryStream.onNext(System.currentTimeMillis());
    }

    @Override
    public final Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    private class RestartSubscriber
    implements Subscriber<Object>,
    NonBlocking {
        Subscription s;

        private RestartSubscriber() {
        }

        @Override
        public boolean isReactivePull(Dispatcher dispatcher, long producerCapacity) {
            return RepeatWhenAction.this.isReactivePull(dispatcher, producerCapacity);
        }

        @Override
        public long getCapacity() {
            return RepeatWhenAction.this.capacity;
        }

        public void onSubscribe(Subscription s) {
            this.s = s;
            s.request(1L);
        }

        public void onNext(Object o) {
            RepeatWhenAction.this.doRetry();
            this.s.request(1L);
        }

        public void onError(Throwable t) {
            this.s.cancel();
            RepeatWhenAction.this.doError(t);
        }

        public void onComplete() {
            this.s.cancel();
            RepeatWhenAction.this.doComplete();
        }
    }
}

