/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.action.error;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.dispatch.SynchronousDispatcher;
import reactor.core.dispatch.TailRecurseDispatcher;
import reactor.fn.Consumer;
import reactor.fn.Predicate;
import reactor.rx.action.Action;
import reactor.rx.subscription.PushSubscription;

public class RetryAction<T>
extends Action<T, T> {
    private final long numRetries;
    private final Predicate<Throwable> retryMatcher;
    private final Publisher<? extends T> rootPublisher;
    private final Consumer<Throwable> throwableConsumer = new ThrowableConsumer();
    private long currentNumRetries = 0L;
    private long pendingRequests = 0L;
    private Dispatcher dispatcher;

    public RetryAction(Dispatcher dispatcher, int numRetries, Predicate<Throwable> predicate, Publisher<? extends T> parentStream) {
        this.numRetries = numRetries;
        this.retryMatcher = predicate;
        this.rootPublisher = parentStream;
        this.dispatcher = SynchronousDispatcher.INSTANCE == dispatcher ? Environment.tailRecurse() : dispatcher;
    }

    @Override
    protected void doSubscribe(Subscription subscription) {
        this.dispatcher = Environment.tailRecurse();
        super.doSubscribe(subscription);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void doNext(T ev) {
        this.currentNumRetries = 0L;
        this.broadcastNext(ev);
        if (this.capacity != Long.MAX_VALUE && this.pendingRequests != Long.MAX_VALUE) {
            RetryAction retryAction = this;
            synchronized (retryAction) {
                if (this.pendingRequests != Long.MAX_VALUE) {
                    --this.pendingRequests;
                }
            }
        }
    }

    @Override
    public void onError(Throwable throwable) {
        if (!(this.numRetries == -1L || ++this.currentNumRetries <= this.numRetries || this.retryMatcher != null && this.retryMatcher.test(throwable))) {
            this.doError(throwable);
            this.currentNumRetries = 0L;
        } else {
            this.dispatcher.dispatch(throwable, this.throwableConsumer, null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void requestMore(long n) {
        RetryAction retryAction = this;
        synchronized (retryAction) {
            if ((this.pendingRequests += n) < 0L) {
                this.pendingRequests = Long.MAX_VALUE;
            }
        }
        super.requestMore(n);
    }

    @Override
    public final Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    private class ThrowableConsumer
    implements Consumer<Throwable> {
        private ThrowableConsumer() {
        }

        @Override
        public void accept(Throwable throwable) {
            PushSubscription upstream = RetryAction.this.upstreamSubscription;
            if (upstream != null) {
                long pendingRequests = RetryAction.this.pendingRequests;
                if (RetryAction.this.rootPublisher != null) {
                    if (TailRecurseDispatcher.class.isAssignableFrom(RetryAction.this.dispatcher.getClass())) {
                        RetryAction.this.dispatcher.shutdown();
                        RetryAction.this.dispatcher = Environment.tailRecurse();
                    }
                    RetryAction.this.cancel();
                    RetryAction.this.rootPublisher.subscribe((Subscriber)RetryAction.this);
                    upstream = RetryAction.this.upstreamSubscription;
                }
                if (upstream != null && pendingRequests >= 0L) {
                    upstream.request(pendingRequests != Long.MAX_VALUE ? pendingRequests + 1L : pendingRequests);
                }
            }
        }
    }
}

