/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.action.combination;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.dispatch.SynchronousDispatcher;
import reactor.core.dispatch.TailRecurseDispatcher;
import reactor.core.support.NonBlocking;
import reactor.fn.Consumer;
import reactor.rx.Stream;
import reactor.rx.action.Action;
import reactor.rx.action.combination.DynamicMergeAction;
import reactor.rx.action.combination.FanInSubscription;

public abstract class FanInAction<I, E, O, SUBSCRIBER extends InnerSubscriber<I, E, O>>
extends Action<E, O> {
    protected static final int NOT_STARTED = 0;
    protected static final int RUNNING = 1;
    protected static final int COMPLETING = 2;
    final FanInSubscription<I, E, O, SUBSCRIBER> innerSubscriptions;
    final List<? extends Publisher<? extends I>> publishers;
    final AtomicInteger status = new AtomicInteger();
    protected final Dispatcher dispatcher;
    DynamicMergeAction<?, ?> dynamicMergeAction = null;

    public FanInAction(Dispatcher dispatcher) {
        this(dispatcher, null);
    }

    public FanInAction(Dispatcher dispatcher, List<? extends Publisher<? extends I>> publishers) {
        this.dispatcher = SynchronousDispatcher.INSTANCE == dispatcher ? Environment.tailRecurse() : dispatcher;
        this.publishers = publishers;
        this.innerSubscriptions = this.createFanInSubscription();
        this.upstreamSubscription = this.innerSubscriptions;
        if (publishers != null) {
            FanInSubscription.RUNNING_COMPOSABLE_UPDATER.set(this.innerSubscriptions, publishers.size());
        }
    }

    @Override
    public void subscribe(Subscriber<? super O> subscriber) {
        super.subscribe(subscriber);
        this.doOnSubscribe(this.innerSubscriptions);
    }

    public void addPublisher(Publisher<? extends I> publisher) {
        InnerSubscriber<I, E, O> inlineMerge = this.createSubscriber();
        publisher.subscribe(inlineMerge);
    }

    public void scheduleCompletion() {
        if (this.status.compareAndSet(0, 2)) {
            this.innerSubscriptions.serialComplete();
        } else if (this.innerSubscriptions.runningComposables == 0 && this.status.compareAndSet(1, 2)) {
            this.innerSubscriptions.serialComplete();
        }
    }

    @Override
    public void cancel() {
        if (this.dynamicMergeAction != null) {
            this.dynamicMergeAction.cancel();
        }
        this.innerSubscriptions.cancel();
    }

    public Action<?, ?> dynamicMergeAction() {
        return this.dynamicMergeAction;
    }

    @Override
    protected void doOnSubscribe(Subscription subscription) {
        if (this.status.compareAndSet(0, 1)) {
            this.innerSubscriptions.maxCapacity(this.capacity);
            if (this.publishers != null) {
                this.capacity(this.initUpstreamPublisherAndCapacity());
            }
        }
    }

    protected long initUpstreamPublisherAndCapacity() {
        long maxCapacity = this.capacity;
        for (Publisher<I> composable : this.publishers) {
            if (Stream.class.isAssignableFrom(composable.getClass())) {
                maxCapacity = Math.min(maxCapacity, ((Stream)composable).getCapacity());
            }
            this.addPublisher(composable);
        }
        return maxCapacity;
    }

    protected final boolean checkDynamicMerge() {
        return this.dynamicMergeAction != null && this.dynamicMergeAction.isPublishing();
    }

    @Override
    public void onNext(E ev) {
        long left;
        super.onNext(ev);
        if (this.innerSubscriptions.shouldRequestPendingSignals() && (left = this.upstreamSubscription.pendingRequestSignals()) > 0L) {
            this.upstreamSubscription.updatePendingRequests(-left);
            this.dispatcher.dispatch((Object)left, (Consumer)this.upstreamSubscription, null);
        }
    }

    @Override
    public void requestMore(long n) {
        FanInAction.checkRequest(n);
        this.dispatcher.dispatch((Object)n, (Consumer)this.upstreamSubscription, null);
    }

    @Override
    protected void requestUpstream(long capacity, boolean terminated, long elements) {
        this.requestMore(elements);
        if (this.dynamicMergeAction != null) {
            this.dynamicMergeAction.requestUpstream(capacity, terminated, elements);
        }
    }

    @Override
    public final Dispatcher getDispatcher() {
        return TailRecurseDispatcher.class == this.dispatcher.getClass() ? SynchronousDispatcher.INSTANCE : this.dispatcher;
    }

    @Override
    public String toString() {
        return super.toString() + "{runningComposables=" + this.innerSubscriptions.runningComposables + "}";
    }

    protected FanInSubscription<I, E, O, SUBSCRIBER> createFanInSubscription() {
        return new FanInSubscription(this);
    }

    public FanInSubscription<I, E, O, SUBSCRIBER> getSubscription() {
        return this.innerSubscriptions;
    }

    protected abstract InnerSubscriber<I, E, O> createSubscriber();

    public static abstract class InnerSubscriber<I, E, O>
    implements Subscriber<I>,
    NonBlocking,
    Consumer<Long> {
        final FanInAction<I, E, O, ? extends InnerSubscriber<I, E, O>> outerAction;
        int sequenceId;
        FanInSubscription.InnerSubscription<I, E, InnerSubscriber<I, E, O>> s;
        long pendingRequests = 0L;
        long emittedSignals = 0L;
        volatile int terminated = 0;
        static final AtomicIntegerFieldUpdater<InnerSubscriber> TERMINATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(InnerSubscriber.class, "terminated");

        InnerSubscriber(FanInAction<I, E, O, ? extends InnerSubscriber<I, E, O>> outerAction) {
            this.outerAction = outerAction;
        }

        public void cancel() {
            FanInSubscription.InnerSubscription<I, E, InnerSubscriber<I, E, O>> s = this.s;
            if (s != null) {
                s.cancel();
            }
        }

        void setSubscription(FanInSubscription.InnerSubscription s) {
            long toRequest;
            this.s = s;
            this.sequenceId = this.outerAction.innerSubscriptions.addSubscription(this);
            if (this.outerAction.publishers == null) {
                FanInSubscription.RUNNING_COMPOSABLE_UPDATER.incrementAndGet(this.outerAction.innerSubscriptions);
            }
            long l = this.pendingRequests = (toRequest = this.outerAction.innerSubscriptions.pendingRequestSignals()) != Long.MAX_VALUE ? toRequest / (long)Math.max(this.outerAction.innerSubscriptions.runningComposables, 1) : Long.MAX_VALUE;
            if (this.pendingRequests == 0L && toRequest > 0L) {
                this.pendingRequests = 1L;
            }
        }

        public void accept(Long pendingRequests) {
            try {
                if (pendingRequests > 0L) {
                    this.request(pendingRequests);
                }
            }
            catch (Throwable e) {
                this.outerAction.onError(e);
            }
        }

        public void request(long n) {
            if (this.s == null || n <= 0L) {
                return;
            }
            if ((this.pendingRequests += n) < 0L) {
                this.pendingRequests = Long.MAX_VALUE;
            }
            this.emittedSignals = 0L;
            this.s.request(n);
        }

        public void onError(Throwable t) {
            FanInSubscription.RUNNING_COMPOSABLE_UPDATER.decrementAndGet(this.outerAction.innerSubscriptions);
            this.outerAction.innerSubscriptions.serialError(t);
        }

        public void onComplete() {
            if (TERMINATE_UPDATER.compareAndSet(this, 0, 1)) {
                long left = FanInSubscription.RUNNING_COMPOSABLE_UPDATER.decrementAndGet(this.outerAction.innerSubscriptions);
                left = left < 0L ? 0L : left;
                this.outerAction.innerSubscriptions.remove(this.sequenceId);
                if (this.pendingRequests > 0L) {
                    this.outerAction.requestMore(this.pendingRequests);
                }
                if (left == 0L && !this.outerAction.checkDynamicMerge()) {
                    this.outerAction.scheduleCompletion();
                }
            }
        }

        public boolean isReactivePull(Dispatcher dispatcher, long producerCapacity) {
            return this.outerAction.isReactivePull(dispatcher, producerCapacity);
        }

        public long getCapacity() {
            return ((FanInAction)this.outerAction).capacity;
        }

        public String toString() {
            return "FanInAction.InnerSubscriber{pending=" + this.pendingRequests + ", emitted=" + this.emittedSignals + "}";
        }
    }
}

