/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.action.combination;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.dispatch.SynchronousDispatcher;
import reactor.core.dispatch.TailRecurseDispatcher;
import reactor.fn.Consumer;
import reactor.rx.Stream;
import reactor.rx.action.Action;
import reactor.rx.action.combination.DynamicMergeAction;
import reactor.rx.action.combination.FanInSubscription;
import reactor.rx.action.support.NonBlocking;

public abstract class FanInAction<I, E, O, SUBSCRIBER extends InnerSubscriber<I, E, O>>
extends Action<E, O> {
    final FanInSubscription<I, E, O, SUBSCRIBER> innerSubscriptions;
    final Iterable<? extends Publisher<? extends I>> composables;
    protected static final int NOT_STARTED = 0;
    protected static final int RUNNING = 1;
    protected static final int COMPLETING = 2;
    final AtomicInteger status = new AtomicInteger();
    protected final Dispatcher dispatcher;
    DynamicMergeAction<?, ?> dynamicMergeAction = null;

    public FanInAction(Dispatcher dispatcher) {
        this(dispatcher, null);
    }

    public FanInAction(Dispatcher dispatcher, Iterable<? extends Publisher<? extends I>> composables) {
        this.dispatcher = SynchronousDispatcher.INSTANCE == dispatcher ? Environment.tailRecurse() : dispatcher;
        this.composables = composables;
        this.innerSubscriptions = this.createFanInSubscription();
        this.upstreamSubscription = this.innerSubscriptions;
    }

    @Override
    public void subscribe(Subscriber<? super O> subscriber) {
        this.doSubscribe(this.innerSubscriptions);
        super.subscribe(subscriber);
    }

    public void addPublisher(Publisher<? extends I> publisher) {
        InnerSubscriber<I, E, O> inlineMerge = this.createSubscriber();
        inlineMerge.pendingRequests = this.innerSubscriptions.pendingRequestSignals() / (long)(this.innerSubscriptions.runningComposables + 1);
        publisher.subscribe(inlineMerge);
    }

    @Override
    protected void doStart(long pending) {
        if (this.dynamicMergeAction != null) {
            this.dispatcher.dispatch(pending, this.dynamicMergeAction.getSubscription(), null);
        }
    }

    public void scheduleCompletion() {
        if (this.status.compareAndSet(0, 2)) {
            this.innerSubscriptions.serialComplete();
        } else {
            this.status.set(2);
            if (this.innerSubscriptions.runningComposables == 0) {
                this.innerSubscriptions.serialComplete();
            }
        }
    }

    @Override
    public void cancel() {
        if (this.dynamicMergeAction != null) {
            this.dynamicMergeAction.cancel();
        }
        this.innerSubscriptions.cancel();
    }

    public Action<?, ?> dynamicMergeAction() {
        return this.dynamicMergeAction;
    }

    @Override
    protected void doSubscribe(Subscription subscription) {
        if (this.status.compareAndSet(0, 1)) {
            this.innerSubscriptions.maxCapacity(this.capacity);
            if (this.composables != null) {
                if (this.innerSubscriptions.runningComposables > 0) {
                    this.innerSubscriptions.cancel();
                    return;
                }
                this.capacity(this.initUpstreamPublisherAndCapacity());
            }
        }
    }

    protected long initUpstreamPublisherAndCapacity() {
        long maxCapacity = this.capacity;
        for (Publisher<I> composable : this.composables) {
            if (Stream.class.isAssignableFrom(composable.getClass())) {
                maxCapacity = Math.min(maxCapacity, ((Stream)composable).getCapacity());
            }
            this.addPublisher(composable);
        }
        return maxCapacity;
    }

    protected final boolean checkDynamicMerge() {
        return this.dynamicMergeAction != null && this.dynamicMergeAction.isPublishing();
    }

    @Override
    public void onNext(E ev) {
        long left;
        super.onNext(ev);
        if (this.innerSubscriptions.shouldRequestPendingSignals() && (left = this.upstreamSubscription.pendingRequestSignals()) > 0L) {
            this.upstreamSubscription.updatePendingRequests(-left);
            this.dispatcher.dispatch(left, this.upstreamSubscription, null);
        }
    }

    @Override
    public void requestMore(long n) {
        FanInAction.checkRequest(n);
        this.dispatcher.dispatch(n, this.upstreamSubscription, null);
    }

    @Override
    protected void requestUpstream(long capacity, boolean terminated, long elements) {
        elements = Math.max(this.capacity, elements);
        super.requestUpstream(capacity, terminated, elements);
        if (this.dynamicMergeAction != null) {
            this.dynamicMergeAction.requestUpstream(capacity, terminated, elements);
        }
    }

    @Override
    public final Dispatcher getDispatcher() {
        return TailRecurseDispatcher.class == this.dispatcher.getClass() ? SynchronousDispatcher.INSTANCE : this.dispatcher;
    }

    @Override
    public String toString() {
        return super.toString() + "{runningComposables=" + this.innerSubscriptions.runningComposables + "}";
    }

    protected FanInSubscription<I, E, O, SUBSCRIBER> createFanInSubscription() {
        return new FanInSubscription(this);
    }

    public FanInSubscription<I, E, O, SUBSCRIBER> getSubscription() {
        return this.innerSubscriptions;
    }

    protected abstract InnerSubscriber<I, E, O> createSubscriber();

    public static abstract class InnerSubscriber<I, E, O>
    implements Subscriber<I>,
    NonBlocking,
    Consumer<Long> {
        final FanInAction<I, E, O, ? extends InnerSubscriber<I, E, O>> outerAction;
        int sequenceId;
        FanInSubscription.InnerSubscription<I, E, InnerSubscriber<I, E, O>> s;
        long pendingRequests = 0L;
        long emittedSignals = 0L;
        volatile int terminated = 0;
        static final AtomicIntegerFieldUpdater<InnerSubscriber> TERMINATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(InnerSubscriber.class, "terminated");

        InnerSubscriber(FanInAction<I, E, O, ? extends InnerSubscriber<I, E, O>> outerAction) {
            this.outerAction = outerAction;
        }

        public void cancel() {
            FanInSubscription.InnerSubscription<I, E, InnerSubscriber<I, E, O>> s = this.s;
            if (s != null) {
                s.cancel();
            }
        }

        void setSubscription(FanInSubscription.InnerSubscription s) {
            this.s = s;
            this.sequenceId = this.outerAction.innerSubscriptions.addSubscription(this);
        }

        @Override
        public void accept(Long pendingRequests) {
            try {
                if (pendingRequests > 0L) {
                    this.request(pendingRequests);
                }
            }
            catch (Throwable e) {
                this.outerAction.onError(e);
            }
        }

        public void request(long n) {
            if (this.s == null || n <= 0L) {
                return;
            }
            if ((this.pendingRequests += n) < 0L) {
                this.pendingRequests = Long.MAX_VALUE;
            }
            this.emittedSignals = 0L;
            this.s.request(n);
        }

        public void onError(Throwable t) {
            FanInSubscription.RUNNING_COMPOSABLE_UPDATER.decrementAndGet(this.outerAction.innerSubscriptions);
            this.outerAction.innerSubscriptions.serialError(t);
        }

        public void onComplete() {
            if (TERMINATE_UPDATER.compareAndSet(this, 0, 1)) {
                this.s.cancel();
                this.outerAction.status.set(2);
                long left = FanInSubscription.RUNNING_COMPOSABLE_UPDATER.decrementAndGet(this.outerAction.innerSubscriptions);
                left = left < 0L ? 0L : left;
                this.outerAction.innerSubscriptions.remove(this.sequenceId);
                if (left == 0L && !this.outerAction.checkDynamicMerge()) {
                    this.outerAction.innerSubscriptions.serialComplete();
                }
            }
        }

        @Override
        public boolean isReactivePull(Dispatcher dispatcher, long producerCapacity) {
            return this.outerAction.isReactivePull(dispatcher, producerCapacity);
        }

        @Override
        public long getCapacity() {
            return ((FanInAction)this.outerAction).capacity;
        }

        public String toString() {
            return "FanInAction.InnerSubscriber{pending=" + this.pendingRequests + ", emitted=" + this.emittedSignals + "}";
        }
    }
}

