/*
 * Decompiled with CFR 0.152.
 */
package reactor.core.publisher;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Cancellation;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.core.Loopback;
import reactor.core.MultiProducer;
import reactor.core.Receiver;
import reactor.core.Trackable;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Operators;

final class FluxPublish<T>
extends ConnectableFlux<T>
implements Receiver,
Loopback {
    final Publisher<? extends T> source;
    final int prefetch;
    final Supplier<? extends Queue<T>> queueSupplier;
    volatile State<T> connection;
    static final AtomicReferenceFieldUpdater<FluxPublish, State> CONNECTION = AtomicReferenceFieldUpdater.newUpdater(FluxPublish.class, State.class, "connection");

    public FluxPublish(Publisher<? extends T> source, int prefetch, Supplier<? extends Queue<T>> queueSupplier) {
        if (prefetch <= 0) {
            throw new IllegalArgumentException("bufferSize > 0 required but it was " + prefetch);
        }
        this.source = Objects.requireNonNull(source, "source");
        this.prefetch = prefetch;
        this.queueSupplier = Objects.requireNonNull(queueSupplier, "queueSupplier");
    }

    @Override
    public void connect(Consumer<? super Cancellation> cancelSupport) {
        State<T> s;
        while ((s = this.connection) == null || s.isTerminated()) {
            State u = new State(this.prefetch, this);
            if (!CONNECTION.compareAndSet(this, s, u)) continue;
            s = u;
            break;
        }
        boolean doConnect = s.tryConnect();
        cancelSupport.accept(s);
        if (doConnect) {
            this.source.subscribe(s);
        }
    }

    public void subscribe(Subscriber<? super T> s) {
        InnerSubscription<T> inner = new InnerSubscription<T>(s);
        s.onSubscribe(inner);
        while (!inner.isCancelled()) {
            State<? super T> c = this.connection;
            if (c == null || c.isTerminated()) {
                State u = new State(this.prefetch, this);
                if (!CONNECTION.compareAndSet(this, c, u)) continue;
                c = u;
            }
            if (!c.trySubscribe(inner)) continue;
            break;
        }
    }

    @Override
    public long getPrefetch() {
        return this.prefetch;
    }

    @Override
    public Object connectedOutput() {
        return this.connection;
    }

    @Override
    public Object upstream() {
        return this.source;
    }

    static final class InnerSubscription<T>
    implements Subscription,
    Receiver,
    Trackable {
        final Subscriber<? super T> actual;
        State<T> parent;
        volatile long requested;
        static final AtomicLongFieldUpdater<InnerSubscription> REQUESTED = AtomicLongFieldUpdater.newUpdater(InnerSubscription.class, "requested");
        volatile int cancelled;
        static final AtomicIntegerFieldUpdater<InnerSubscription> CANCELLED = AtomicIntegerFieldUpdater.newUpdater(InnerSubscription.class, "cancelled");

        public InnerSubscription(Subscriber<? super T> actual) {
            this.actual = actual;
        }

        public void request(long n) {
            if (Operators.validate(n)) {
                Operators.getAndAddCap(REQUESTED, this, n);
                State<T> p = this.parent;
                if (p != null) {
                    p.drain();
                }
            }
        }

        public void cancel() {
            State<T> p;
            if (CANCELLED.compareAndSet(this, 0, 1) && (p = this.parent) != null) {
                p.remove(this);
            }
        }

        @Override
        public boolean isCancelled() {
            return this.cancelled != 0;
        }

        @Override
        public Object upstream() {
            return this.parent;
        }

        @Override
        public long requestedFromDownstream() {
            return this.requested;
        }

        void produced(long n) {
            REQUESTED.addAndGet(this, -n);
        }
    }

    static final class State<T>
    implements Subscriber<T>,
    Receiver,
    MultiProducer,
    Trackable,
    Cancellation {
        final int prefetch;
        final FluxPublish<T> parent;
        volatile Subscription s;
        static final AtomicReferenceFieldUpdater<State, Subscription> S = AtomicReferenceFieldUpdater.newUpdater(State.class, Subscription.class, "s");
        volatile InnerSubscription<T>[] subscribers;
        volatile int wip;
        static final AtomicIntegerFieldUpdater<State> WIP = AtomicIntegerFieldUpdater.newUpdater(State.class, "wip");
        volatile int connected;
        static final AtomicIntegerFieldUpdater<State> CONNECTED = AtomicIntegerFieldUpdater.newUpdater(State.class, "connected");
        static final InnerSubscription[] EMPTY = new InnerSubscription[0];
        static final InnerSubscription[] TERMINATED = new InnerSubscription[0];
        volatile Queue<T> queue;
        int sourceMode;
        volatile boolean done;
        volatile Throwable error;
        static final AtomicReferenceFieldUpdater<State, Throwable> ERROR = AtomicReferenceFieldUpdater.newUpdater(State.class, Throwable.class, "error");
        volatile boolean cancelled;

        public State(int prefetch, FluxPublish<T> parent) {
            this.prefetch = prefetch;
            this.parent = parent;
            this.subscribers = EMPTY;
        }

        public void onSubscribe(Subscription s) {
            if (Operators.setOnce(S, this, s)) {
                if (s instanceof Fuseable.QueueSubscription) {
                    Fuseable.QueueSubscription f = (Fuseable.QueueSubscription)s;
                    int m = f.requestFusion(3);
                    if (m == 1) {
                        this.sourceMode = m;
                        this.queue = f;
                        this.done = true;
                        this.drain();
                        return;
                    }
                    if (m == 2) {
                        this.sourceMode = m;
                        this.queue = f;
                        s.request((long)this.prefetch);
                        return;
                    }
                }
                try {
                    this.queue = this.parent.queueSupplier.get();
                }
                catch (Throwable ex) {
                    this.error = Operators.onOperatorError(s, ex);
                    this.done = true;
                    this.drain();
                    return;
                }
                s.request((long)this.prefetch);
            }
        }

        public void onNext(T t) {
            if (this.done) {
                if (t != null) {
                    Operators.onNextDropped(t);
                }
                return;
            }
            if (this.sourceMode == 2) {
                this.drain();
                return;
            }
            if (!this.queue.offer(t)) {
                IllegalStateException ex = new IllegalStateException("Queue full?!");
                if (!Exceptions.addThrowable(ERROR, this, ex)) {
                    Operators.onErrorDropped(ex);
                    return;
                }
                this.done = true;
            }
            this.drain();
        }

        public void onError(Throwable t) {
            if (this.done) {
                Operators.onErrorDropped(t);
                return;
            }
            if (Exceptions.addThrowable(ERROR, this, t)) {
                this.done = true;
                this.drain();
            } else {
                Operators.onErrorDropped(t);
            }
        }

        public void onComplete() {
            this.done = true;
            this.drain();
        }

        @Override
        public void dispose() {
            if (this.cancelled) {
                return;
            }
            if (Operators.terminate(S, this)) {
                this.cancelled = true;
                if (WIP.getAndIncrement(this) != 0) {
                    return;
                }
                this.disconnectAction();
            }
        }

        void disconnectAction() {
            this.queue.clear();
            CancellationException ex = new CancellationException("Disconnected");
            for (InnerSubscription<T> inner : this.terminate()) {
                inner.actual.onError((Throwable)ex);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        boolean add(InnerSubscription<T> inner) {
            if (this.subscribers == TERMINATED) {
                return false;
            }
            State state = this;
            synchronized (state) {
                InnerSubscription<T>[] a = this.subscribers;
                if (a == TERMINATED) {
                    return false;
                }
                int n = a.length;
                InnerSubscription[] b = new InnerSubscription[n + 1];
                System.arraycopy(a, 0, b, 0, n);
                b[n] = inner;
                this.subscribers = b;
                return true;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void remove(InnerSubscription<T> inner) {
            InnerSubscription<T>[] a = this.subscribers;
            if (a == TERMINATED || a == EMPTY) {
                return;
            }
            State state = this;
            synchronized (state) {
                InnerSubscription[] b;
                a = this.subscribers;
                if (a == TERMINATED || a == EMPTY) {
                    return;
                }
                int j = -1;
                int n = a.length;
                for (int i = 0; i < n; ++i) {
                    if (a[i] != inner) continue;
                    j = i;
                    break;
                }
                if (j < 0) {
                    return;
                }
                if (n == 1) {
                    b = EMPTY;
                } else {
                    b = new InnerSubscription[n - 1];
                    System.arraycopy(a, 0, b, 0, j);
                    System.arraycopy(a, j + 1, b, j, n - j - 1);
                }
                this.subscribers = b;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        InnerSubscription<T>[] terminate() {
            InnerSubscription<T>[] a = this.subscribers;
            if (a == TERMINATED) {
                return a;
            }
            State state = this;
            synchronized (state) {
                a = this.subscribers;
                if (a != TERMINATED) {
                    this.subscribers = TERMINATED;
                }
                return a;
            }
        }

        @Override
        public boolean isTerminated() {
            return this.subscribers == TERMINATED;
        }

        boolean tryConnect() {
            return this.connected == 0 && CONNECTED.compareAndSet(this, 0, 1);
        }

        boolean trySubscribe(InnerSubscription<T> inner) {
            if (this.add(inner)) {
                if (inner.isCancelled()) {
                    this.remove(inner);
                } else {
                    inner.parent = this;
                    this.drain();
                }
                return true;
            }
            return false;
        }

        void replenish(long n) {
            if (this.sourceMode != 1) {
                this.s.request(n);
            }
        }

        void drain() {
            if (WIP.getAndIncrement(this) != 0) {
                return;
            }
            Queue<T> q = this.queue;
            int missed = 1;
            while (true) {
                if (q != null) {
                    InnerSubscription<T>[] a = this.subscribers;
                    long r = Long.MAX_VALUE;
                    for (InnerSubscription<T> inner : a) {
                        r = Math.min(r, inner.requested);
                    }
                    if (a.length != 0 && r != 0L) {
                        boolean d;
                        long e;
                        for (e = 0L; e != r; ++e) {
                            boolean empty;
                            T v;
                            d = this.done;
                            try {
                                v = q.poll();
                            }
                            catch (Throwable ex) {
                                Exceptions.addThrowable(ERROR, this, Operators.onOperatorError(this.s, ex));
                                d = true;
                                v = null;
                            }
                            boolean bl = empty = v == null;
                            if (this.checkTerminated(d, empty)) {
                                return;
                            }
                            if (empty) break;
                            for (InnerSubscription<T> inner : a) {
                                inner.actual.onNext(v);
                            }
                        }
                        if (e == r) {
                            boolean empty;
                            d = this.done;
                            try {
                                empty = q.isEmpty();
                            }
                            catch (Throwable ex) {
                                Exceptions.addThrowable(ERROR, this, Operators.onOperatorError(this.s, ex));
                                d = true;
                                empty = true;
                            }
                            if (this.checkTerminated(d, empty)) {
                                return;
                            }
                        }
                        if (e != 0L) {
                            this.replenish(e);
                            if (r != Long.MAX_VALUE) {
                                for (InnerSubscription<T> inner : a) {
                                    inner.produced(e);
                                }
                            }
                        }
                    }
                }
                if ((missed = WIP.addAndGet(this, -missed)) == 0) break;
                if (q != null) continue;
                q = this.queue;
            }
        }

        boolean checkTerminated(boolean d, boolean empty) {
            if (this.cancelled) {
                this.disconnectAction();
                return true;
            }
            if (d) {
                Throwable e = this.error;
                if (e != null && e != Exceptions.TERMINATED) {
                    e = Exceptions.terminate(ERROR, this);
                    this.queue.clear();
                    for (InnerSubscription<T> inner : this.terminate()) {
                        inner.actual.onError(e);
                    }
                    return true;
                }
                if (empty) {
                    for (InnerSubscription<T> inner : this.terminate()) {
                        inner.actual.onComplete();
                    }
                    return true;
                }
            }
            return false;
        }

        @Override
        public long getCapacity() {
            return this.prefetch;
        }

        @Override
        public long getPending() {
            return this.queue.size();
        }

        @Override
        public boolean isCancelled() {
            return this.cancelled;
        }

        @Override
        public boolean isStarted() {
            return !this.cancelled && !this.done && this.s != null;
        }

        @Override
        public Throwable getError() {
            return this.error;
        }

        @Override
        public Iterator<?> downstreams() {
            return Arrays.asList(this.subscribers).iterator();
        }

        @Override
        public long downstreamCount() {
            return this.subscribers.length;
        }

        @Override
        public Object upstream() {
            return this.s;
        }
    }
}

