/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.action.combination;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.dispatch.SynchronousDispatcher;
import reactor.rx.action.Action;
import reactor.rx.action.combination.FanInAction;
import reactor.rx.action.combination.MergeAction;
import reactor.rx.subscription.PushSubscription;

public class DynamicMergeAction<I, O>
extends Action<Publisher<? extends I>, O> {
    private final FanInAction<I, ?, O, ? extends FanInAction.InnerSubscriber<I, ?, O>> fanInAction;
    private volatile int wip = 0;
    protected static final AtomicIntegerFieldUpdater<DynamicMergeAction> WIP_UPDATER = AtomicIntegerFieldUpdater.newUpdater(DynamicMergeAction.class, "wip");
    private volatile long requested = 0L;
    protected static final AtomicLongFieldUpdater<DynamicMergeAction> REQUESTED_UPDATER = AtomicLongFieldUpdater.newUpdater(DynamicMergeAction.class, "requested");

    public DynamicMergeAction(FanInAction<I, ?, O, ? extends FanInAction.InnerSubscriber<I, ?, O>> fanInAction) {
        this.fanInAction = fanInAction == null ? new MergeAction(SynchronousDispatcher.INSTANCE) : fanInAction;
        this.fanInAction.dynamicMergeAction = this;
    }

    @Override
    public void subscribe(Subscriber<? super O> subscriber) {
        this.fanInAction.subscribe(subscriber);
    }

    @Override
    protected PushSubscription<O> createSubscription(Subscriber<? super O> subscriber, boolean reactivePull) {
        throw new IllegalAccessError("Should never use dynamicMergeAction own createSubscription");
    }

    @Override
    public void requestMore(long n) {
        if (this.upstreamSubscription != null) {
            this.upstreamSubscription.accept(n);
        }
    }

    @Override
    protected void doNext(Publisher<? extends I> value) {
        WIP_UPDATER.incrementAndGet(this);
        this.fanInAction.addPublisher(value);
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        super.onSubscribe(subscription);
        long toRequest = REQUESTED_UPDATER.getAndSet(this, 0L);
        if (toRequest > 0L) {
            this.requestMore(toRequest);
        }
    }

    @Override
    protected void requestUpstream(long capacity, boolean terminated, long elements) {
        if (this.upstreamSubscription != null && !terminated) {
            long toRequest = REQUESTED_UPDATER.getAndSet(this, 0L);
            if (toRequest < 0L) {
                toRequest = Long.MAX_VALUE;
            }
            if (toRequest == 0L) {
                toRequest = elements;
            } else if (elements != toRequest) {
                toRequest = elements + toRequest;
            }
            if (toRequest > 0L) {
                this.requestMore(toRequest);
            }
        } else if (REQUESTED_UPDATER.addAndGet(this, elements) < 0L) {
            REQUESTED_UPDATER.set(this, Long.MAX_VALUE);
        }
    }

    @Override
    public void onError(Throwable cause) {
        this.fanInAction.onError(cause);
    }

    @Override
    public void onComplete() {
        if (this.wip == 0) {
            this.fanInAction.scheduleCompletion();
        }
    }

    @Override
    public Action<Publisher<? extends I>, O> capacity(long elements) {
        this.fanInAction.capacity(elements);
        return super.capacity(elements);
    }

    public int decrementWip() {
        return WIP_UPDATER.decrementAndGet(this);
    }

    public FanInAction<I, ?, O, ? extends FanInAction.InnerSubscriber<I, ?, O>> mergedStream() {
        return this.fanInAction;
    }

    @Override
    public String toString() {
        return super.toString() + "{" + "wip=" + this.wip + ", requested=" + this.requested + '}';
    }
}

